You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sm...@apache.org on 2014/07/12 06:08:29 UTC
[11/47] Added c++ client samples for integrattion of airavata with
any other application's c++ interface
http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/FunctionRunner.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/FunctionRunner.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/FunctionRunner.h
new file mode 100644
index 0000000..e3b2bf3
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/FunctionRunner.h
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_FUNCTION_RUNNER_H
+#define _THRIFT_CONCURRENCY_FUNCTION_RUNNER_H 1
+
+#include <thrift/cxxfunctional.h>
+#include <thrift/concurrency/Thread.h>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * Convenient implementation of Runnable that will execute arbitrary callbacks.
+ * Interfaces are provided to accept both a generic 'void(void)' callback, and
+ * a 'void* (void*)' pthread_create-style callback.
+ *
+ * Example use:
+ * void* my_thread_main(void* arg);
+ * shared_ptr<ThreadFactory> factory = ...;
+ * // To create a thread that executes my_thread_main once:
+ * shared_ptr<Thread> thread = factory->newThread(
+ * FunctionRunner::create(my_thread_main, some_argument));
+ * thread->start();
+ *
+ * bool A::foo();
+ * A* a = new A();
+ * // To create a thread that executes a.foo() every 100 milliseconds:
+ * factory->newThread(FunctionRunner::create(
+ * apache::thrift::stdcxx::bind(&A::foo, a), 100))->start();
+ *
+ */
+
+class FunctionRunner : public Runnable {
+ public:
+ // This is the type of callback 'pthread_create()' expects.
+ typedef void* (*PthreadFuncPtr)(void *arg);
+ // This a fully-generic void(void) callback for custom bindings.
+ typedef apache::thrift::stdcxx::function<void()> VoidFunc;
+
+ typedef apache::thrift::stdcxx::function<bool()> BoolFunc;
+
+ /**
+ * Syntactic sugar to make it easier to create new FunctionRunner
+ * objects wrapped in shared_ptr.
+ */
+ static boost::shared_ptr<FunctionRunner> create(const VoidFunc& cob) {
+ return boost::shared_ptr<FunctionRunner>(new FunctionRunner(cob));
+ }
+
+ static boost::shared_ptr<FunctionRunner> create(PthreadFuncPtr func,
+ void* arg) {
+ return boost::shared_ptr<FunctionRunner>(new FunctionRunner(func, arg));
+ }
+
+private:
+ static void pthread_func_wrapper(PthreadFuncPtr func, void *arg)
+ {
+ //discard return value
+ func(arg);
+ }
+public:
+ /**
+ * Given a 'pthread_create' style callback, this FunctionRunner will
+ * execute the given callback. Note that the 'void*' return value is ignored.
+ */
+ FunctionRunner(PthreadFuncPtr func, void* arg)
+ : func_(apache::thrift::stdcxx::bind(pthread_func_wrapper, func, arg))
+ { }
+
+ /**
+ * Given a generic callback, this FunctionRunner will execute it.
+ */
+ FunctionRunner(const VoidFunc& cob)
+ : func_(cob)
+ { }
+
+ /**
+ * Given a bool foo(...) type callback, FunctionRunner will execute
+ * the callback repeatedly with 'intervalMs' milliseconds between the calls,
+ * until it returns false. Note that the actual interval between calls will
+ * be intervalMs plus execution time of the callback.
+ */
+ FunctionRunner(const BoolFunc& cob, int intervalMs)
+ : repFunc_(cob), intervalMs_(intervalMs)
+ { }
+
+ void run() {
+ if (repFunc_) {
+ while(repFunc_()) {
+ THRIFT_SLEEP_USEC(intervalMs_*1000);
+ }
+ } else {
+ func_();
+ }
+ }
+
+ private:
+ VoidFunc func_;
+ BoolFunc repFunc_;
+ int intervalMs_;
+};
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_FUNCTION_RUNNER_H
http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/Monitor.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/Monitor.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/Monitor.cpp
new file mode 100644
index 0000000..d94b2a4
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/Monitor.cpp
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/concurrency/Monitor.h>
+#include <thrift/concurrency/Exception.h>
+#include <thrift/concurrency/Util.h>
+#include <thrift/transport/PlatformSocket.h>
+
+#include <boost/scoped_ptr.hpp>
+
+#include <assert.h>
+
+#include <iostream>
+
+#include <pthread.h>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+using boost::scoped_ptr;
+
+/**
+ * Monitor implementation using the POSIX pthread library
+ *
+ * @version $Id:$
+ */
+class Monitor::Impl {
+
+ public:
+
+ Impl()
+ : ownedMutex_(new Mutex()),
+ mutex_(NULL),
+ condInitialized_(false) {
+ init(ownedMutex_.get());
+ }
+
+ Impl(Mutex* mutex)
+ : mutex_(NULL),
+ condInitialized_(false) {
+ init(mutex);
+ }
+
+ Impl(Monitor* monitor)
+ : mutex_(NULL),
+ condInitialized_(false) {
+ init(&(monitor->mutex()));
+ }
+
+ ~Impl() { cleanup(); }
+
+ Mutex& mutex() { return *mutex_; }
+ void lock() { mutex().lock(); }
+ void unlock() { mutex().unlock(); }
+
+ /**
+ * Exception-throwing version of waitForTimeRelative(), called simply
+ * wait(int64) for historical reasons. Timeout is in milliseconds.
+ *
+ * If the condition occurs, this function returns cleanly; on timeout or
+ * error an exception is thrown.
+ */
+ void wait(int64_t timeout_ms) const {
+ int result = waitForTimeRelative(timeout_ms);
+ if (result == THRIFT_ETIMEDOUT) {
+ // pthread_cond_timedwait has been observed to return early on
+ // various platforms, so comment out this assert.
+ //assert(Util::currentTime() >= (now + timeout));
+ throw TimedOutException();
+ } else if (result != 0) {
+ throw TException(
+ "pthread_cond_wait() or pthread_cond_timedwait() failed");
+ }
+ }
+
+ /**
+ * Waits until the specified timeout in milliseconds for the condition to
+ * occur, or waits forever if timeout_ms == 0.
+ *
+ * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
+ */
+ int waitForTimeRelative(int64_t timeout_ms) const {
+ if (timeout_ms == 0LL) {
+ return waitForever();
+ }
+
+ struct THRIFT_TIMESPEC abstime;
+ Util::toTimespec(abstime, Util::currentTime() + timeout_ms);
+ return waitForTime(&abstime);
+ }
+
+ /**
+ * Waits until the absolute time specified using struct THRIFT_TIMESPEC.
+ * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
+ */
+ int waitForTime(const THRIFT_TIMESPEC* abstime) const {
+ assert(mutex_);
+ pthread_mutex_t* mutexImpl =
+ reinterpret_cast<pthread_mutex_t*>(mutex_->getUnderlyingImpl());
+ assert(mutexImpl);
+
+ // XXX Need to assert that caller owns mutex
+ return pthread_cond_timedwait(&pthread_cond_,
+ mutexImpl,
+ abstime);
+ }
+
+ int waitForTime(const struct timeval* abstime) const {
+ struct THRIFT_TIMESPEC temp;
+ temp.tv_sec = abstime->tv_sec;
+ temp.tv_nsec = abstime->tv_usec * 1000;
+ return waitForTime(&temp);
+ }
+ /**
+ * Waits forever until the condition occurs.
+ * Returns 0 if condition occurs, or an error code otherwise.
+ */
+ int waitForever() const {
+ assert(mutex_);
+ pthread_mutex_t* mutexImpl =
+ reinterpret_cast<pthread_mutex_t*>(mutex_->getUnderlyingImpl());
+ assert(mutexImpl);
+ return pthread_cond_wait(&pthread_cond_, mutexImpl);
+ }
+
+
+ void notify() {
+ // XXX Need to assert that caller owns mutex
+ int iret = pthread_cond_signal(&pthread_cond_);
+ THRIFT_UNUSED_VARIABLE(iret);
+ assert(iret == 0);
+ }
+
+ void notifyAll() {
+ // XXX Need to assert that caller owns mutex
+ int iret = pthread_cond_broadcast(&pthread_cond_);
+ THRIFT_UNUSED_VARIABLE(iret);
+ assert(iret == 0);
+ }
+
+ private:
+
+ void init(Mutex* mutex) {
+ mutex_ = mutex;
+
+ if (pthread_cond_init(&pthread_cond_, NULL) == 0) {
+ condInitialized_ = true;
+ }
+
+ if (!condInitialized_) {
+ cleanup();
+ throw SystemResourceException();
+ }
+ }
+
+ void cleanup() {
+ if (condInitialized_) {
+ condInitialized_ = false;
+ int iret = pthread_cond_destroy(&pthread_cond_);
+ THRIFT_UNUSED_VARIABLE(iret);
+ assert(iret == 0);
+ }
+ }
+
+ scoped_ptr<Mutex> ownedMutex_;
+ Mutex* mutex_;
+
+ mutable pthread_cond_t pthread_cond_;
+ mutable bool condInitialized_;
+};
+
+Monitor::Monitor() : impl_(new Monitor::Impl()) {}
+Monitor::Monitor(Mutex* mutex) : impl_(new Monitor::Impl(mutex)) {}
+Monitor::Monitor(Monitor* monitor) : impl_(new Monitor::Impl(monitor)) {}
+
+Monitor::~Monitor() { delete impl_; }
+
+Mutex& Monitor::mutex() const { return impl_->mutex(); }
+
+void Monitor::lock() const { impl_->lock(); }
+
+void Monitor::unlock() const { impl_->unlock(); }
+
+void Monitor::wait(int64_t timeout) const { impl_->wait(timeout); }
+
+int Monitor::waitForTime(const THRIFT_TIMESPEC* abstime) const {
+ return impl_->waitForTime(abstime);
+}
+
+int Monitor::waitForTime(const timeval* abstime) const {
+ return impl_->waitForTime(abstime);
+}
+
+int Monitor::waitForTimeRelative(int64_t timeout_ms) const {
+ return impl_->waitForTimeRelative(timeout_ms);
+}
+
+int Monitor::waitForever() const {
+ return impl_->waitForever();
+}
+
+void Monitor::notify() const { impl_->notify(); }
+
+void Monitor::notifyAll() const { impl_->notifyAll(); }
+
+}}} // apache::thrift::concurrency
http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/Monitor.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/Monitor.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/Monitor.h
new file mode 100644
index 0000000..811e0e1
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/Monitor.h
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_MONITOR_H_
+#define _THRIFT_CONCURRENCY_MONITOR_H_ 1
+
+#include <thrift/concurrency/Exception.h>
+#include <thrift/concurrency/Mutex.h>
+
+#include <boost/utility.hpp>
+
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * A monitor is a combination mutex and condition-event. Waiting and
+ * notifying condition events requires that the caller own the mutex. Mutex
+ * lock and unlock operations can be performed independently of condition
+ * events. This is more or less analogous to java.lang.Object multi-thread
+ * operations.
+ *
+ * Note the Monitor can create a new, internal mutex; alternatively, a
+ * separate Mutex can be passed in and the Monitor will re-use it without
+ * taking ownership. It's the user's responsibility to make sure that the
+ * Mutex is not deallocated before the Monitor.
+ *
+ * Note that all methods are const. Monitors implement logical constness, not
+ * bit constness. This allows const methods to call monitor methods without
+ * needing to cast away constness or change to non-const signatures.
+ *
+ * @version $Id:$
+ */
+class Monitor : boost::noncopyable {
+ public:
+ /** Creates a new mutex, and takes ownership of it. */
+ Monitor();
+
+ /** Uses the provided mutex without taking ownership. */
+ explicit Monitor(Mutex* mutex);
+
+ /** Uses the mutex inside the provided Monitor without taking ownership. */
+ explicit Monitor(Monitor* monitor);
+
+ /** Deallocates the mutex only if we own it. */
+ virtual ~Monitor();
+
+ Mutex& mutex() const;
+
+ virtual void lock() const;
+
+ virtual void unlock() const;
+
+ /**
+ * Waits a maximum of the specified timeout in milliseconds for the condition
+ * to occur, or waits forever if timeout_ms == 0.
+ *
+ * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
+ */
+ int waitForTimeRelative(int64_t timeout_ms) const;
+
+ /**
+ * Waits until the absolute time specified using struct THRIFT_TIMESPEC.
+ * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
+ */
+ int waitForTime(const THRIFT_TIMESPEC* abstime) const;
+
+ /**
+ * Waits until the absolute time specified using struct timeval.
+ * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
+ */
+ int waitForTime(const struct timeval* abstime) const;
+
+ /**
+ * Waits forever until the condition occurs.
+ * Returns 0 if condition occurs, or an error code otherwise.
+ */
+ int waitForever() const;
+
+ /**
+ * Exception-throwing version of waitForTimeRelative(), called simply
+ * wait(int64) for historical reasons. Timeout is in milliseconds.
+ *
+ * If the condition occurs, this function returns cleanly; on timeout or
+ * error an exception is thrown.
+ */
+ void wait(int64_t timeout_ms = 0LL) const;
+
+
+ /** Wakes up one thread waiting on this monitor. */
+ virtual void notify() const;
+
+ /** Wakes up all waiting threads on this monitor. */
+ virtual void notifyAll() const;
+
+ private:
+
+ class Impl;
+
+ Impl* impl_;
+};
+
+class Synchronized {
+ public:
+ Synchronized(const Monitor* monitor) : g(monitor->mutex()) { }
+ Synchronized(const Monitor& monitor) : g(monitor.mutex()) { }
+
+ private:
+ Guard g;
+};
+
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_MONITOR_H_
http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/Mutex.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/Mutex.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/Mutex.cpp
new file mode 100644
index 0000000..3f7bb5b
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/Mutex.cpp
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/thrift-config.h>
+
+#include <thrift/Thrift.h>
+#include <thrift/concurrency/Mutex.h>
+#include <thrift/concurrency/Util.h>
+
+#include <assert.h>
+#ifdef HAVE_PTHREAD_H
+#include <pthread.h>
+#endif
+#include <signal.h>
+
+using boost::shared_ptr;
+
+namespace apache { namespace thrift { namespace concurrency {
+
+#ifndef THRIFT_NO_CONTENTION_PROFILING
+
+static sig_atomic_t mutexProfilingSampleRate = 0;
+static MutexWaitCallback mutexProfilingCallback = 0;
+
+volatile static sig_atomic_t mutexProfilingCounter = 0;
+
+void enableMutexProfiling(int32_t profilingSampleRate,
+ MutexWaitCallback callback) {
+ mutexProfilingSampleRate = profilingSampleRate;
+ mutexProfilingCallback = callback;
+}
+
+#define PROFILE_MUTEX_START_LOCK() \
+ int64_t _lock_startTime = maybeGetProfilingStartTime();
+
+#define PROFILE_MUTEX_NOT_LOCKED() \
+ do { \
+ if (_lock_startTime > 0) { \
+ int64_t endTime = Util::currentTimeUsec(); \
+ (*mutexProfilingCallback)(this, endTime - _lock_startTime); \
+ } \
+ } while (0)
+
+#define PROFILE_MUTEX_LOCKED() \
+ do { \
+ profileTime_ = _lock_startTime; \
+ if (profileTime_ > 0) { \
+ profileTime_ = Util::currentTimeUsec() - profileTime_; \
+ } \
+ } while (0)
+
+#define PROFILE_MUTEX_START_UNLOCK() \
+ int64_t _temp_profileTime = profileTime_; \
+ profileTime_ = 0;
+
+#define PROFILE_MUTEX_UNLOCKED() \
+ do { \
+ if (_temp_profileTime > 0) { \
+ (*mutexProfilingCallback)(this, _temp_profileTime); \
+ } \
+ } while (0)
+
+static inline int64_t maybeGetProfilingStartTime() {
+ if (mutexProfilingSampleRate && mutexProfilingCallback) {
+ // This block is unsynchronized, but should produce a reasonable sampling
+ // rate on most architectures. The main race conditions are the gap
+ // between the decrement and the test, the non-atomicity of decrement, and
+ // potential caching of different values at different CPUs.
+ //
+ // - if two decrements race, the likeliest result is that the counter
+ // decrements slowly (perhaps much more slowly) than intended.
+ //
+ // - many threads could potentially decrement before resetting the counter
+ // to its large value, causing each additional incoming thread to
+ // profile every call. This situation is unlikely to persist for long
+ // as the critical gap is quite short, but profiling could be bursty.
+ sig_atomic_t localValue = --mutexProfilingCounter;
+ if (localValue <= 0) {
+ mutexProfilingCounter = mutexProfilingSampleRate;
+ return Util::currentTimeUsec();
+ }
+ }
+
+ return 0;
+}
+
+#else
+# define PROFILE_MUTEX_START_LOCK()
+# define PROFILE_MUTEX_NOT_LOCKED()
+# define PROFILE_MUTEX_LOCKED()
+# define PROFILE_MUTEX_START_UNLOCK()
+# define PROFILE_MUTEX_UNLOCKED()
+#endif // THRIFT_NO_CONTENTION_PROFILING
+
+/**
+ * Implementation of Mutex class using POSIX mutex
+ *
+ * @version $Id:$
+ */
+class Mutex::impl {
+ public:
+ impl(Initializer init) : initialized_(false) {
+#ifndef THRIFT_NO_CONTENTION_PROFILING
+ profileTime_ = 0;
+#endif
+ init(&pthread_mutex_);
+ initialized_ = true;
+ }
+
+ ~impl() {
+ if (initialized_) {
+ initialized_ = false;
+ int ret = pthread_mutex_destroy(&pthread_mutex_);
+ THRIFT_UNUSED_VARIABLE(ret);
+ assert(ret == 0);
+ }
+ }
+
+ void lock() const {
+ PROFILE_MUTEX_START_LOCK();
+ pthread_mutex_lock(&pthread_mutex_);
+ PROFILE_MUTEX_LOCKED();
+ }
+
+ bool trylock() const { return (0 == pthread_mutex_trylock(&pthread_mutex_)); }
+
+ bool timedlock(int64_t milliseconds) const {
+#if defined(_POSIX_TIMEOUTS) && _POSIX_TIMEOUTS >= 200112L
+ PROFILE_MUTEX_START_LOCK();
+
+ struct THRIFT_TIMESPEC ts;
+ Util::toTimespec(ts, milliseconds + Util::currentTime());
+ int ret = pthread_mutex_timedlock(&pthread_mutex_, &ts);
+ if (ret == 0) {
+ PROFILE_MUTEX_LOCKED();
+ return true;
+ }
+
+ PROFILE_MUTEX_NOT_LOCKED();
+ return false;
+#else
+ /* Otherwise follow solution used by Mono for Android */
+ struct THRIFT_TIMESPEC sleepytime, now, to;
+
+ /* This is just to avoid a completely busy wait */
+ sleepytime.tv_sec = 0;
+ sleepytime.tv_nsec = 10000000L; /* 10ms */
+
+ Util::toTimespec(to, milliseconds + Util::currentTime());
+
+ while ((trylock()) == false) {
+ Util::toTimespec(now, Util::currentTime());
+ if (now.tv_sec >= to.tv_sec && now.tv_nsec >= to.tv_nsec) {
+ return false;
+ }
+ nanosleep(&sleepytime, NULL);
+ }
+
+ return true;
+#endif
+ }
+
+ void unlock() const {
+ PROFILE_MUTEX_START_UNLOCK();
+ pthread_mutex_unlock(&pthread_mutex_);
+ PROFILE_MUTEX_UNLOCKED();
+ }
+
+ void* getUnderlyingImpl() const { return (void*) &pthread_mutex_; }
+
+ private:
+ mutable pthread_mutex_t pthread_mutex_;
+ mutable bool initialized_;
+#ifndef THRIFT_NO_CONTENTION_PROFILING
+ mutable int64_t profileTime_;
+#endif
+};
+
+Mutex::Mutex(Initializer init) : impl_(new Mutex::impl(init)) {}
+
+void* Mutex::getUnderlyingImpl() const { return impl_->getUnderlyingImpl(); }
+
+void Mutex::lock() const { impl_->lock(); }
+
+bool Mutex::trylock() const { return impl_->trylock(); }
+
+bool Mutex::timedlock(int64_t ms) const { return impl_->timedlock(ms); }
+
+void Mutex::unlock() const { impl_->unlock(); }
+
+void Mutex::DEFAULT_INITIALIZER(void* arg) {
+ pthread_mutex_t* pthread_mutex = (pthread_mutex_t*)arg;
+ int ret = pthread_mutex_init(pthread_mutex, NULL);
+ THRIFT_UNUSED_VARIABLE(ret);
+ assert(ret == 0);
+}
+
+#if defined(PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) || defined(PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP)
+static void init_with_kind(pthread_mutex_t* mutex, int kind) {
+ pthread_mutexattr_t mutexattr;
+ int ret = pthread_mutexattr_init(&mutexattr);
+ assert(ret == 0);
+
+ // Apparently, this can fail. Should we really be aborting?
+ ret = pthread_mutexattr_settype(&mutexattr, kind);
+ assert(ret == 0);
+
+ ret = pthread_mutex_init(mutex, &mutexattr);
+ assert(ret == 0);
+
+ ret = pthread_mutexattr_destroy(&mutexattr);
+ assert(ret == 0);
+ THRIFT_UNUSED_VARIABLE(ret);
+}
+#endif
+
+#ifdef PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
+void Mutex::ADAPTIVE_INITIALIZER(void* arg) {
+ // From mysql source: mysys/my_thr_init.c
+ // Set mutex type to "fast" a.k.a "adaptive"
+ //
+ // In this case the thread may steal the mutex from some other thread
+ // that is waiting for the same mutex. This will save us some
+ // context switches but may cause a thread to 'starve forever' while
+ // waiting for the mutex (not likely if the code within the mutex is
+ // short).
+ init_with_kind((pthread_mutex_t*)arg, PTHREAD_MUTEX_ADAPTIVE_NP);
+}
+#endif
+
+#ifdef PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP
+void Mutex::RECURSIVE_INITIALIZER(void* arg) {
+ init_with_kind((pthread_mutex_t*)arg, PTHREAD_MUTEX_RECURSIVE_NP);
+}
+#endif
+
+
+/**
+ * Implementation of ReadWriteMutex class using POSIX rw lock
+ *
+ * @version $Id:$
+ */
+class ReadWriteMutex::impl {
+public:
+ impl() : initialized_(false) {
+#ifndef THRIFT_NO_CONTENTION_PROFILING
+ profileTime_ = 0;
+#endif
+ int ret = pthread_rwlock_init(&rw_lock_, NULL);
+ THRIFT_UNUSED_VARIABLE(ret);
+ assert(ret == 0);
+ initialized_ = true;
+ }
+
+ ~impl() {
+ if(initialized_) {
+ initialized_ = false;
+ int ret = pthread_rwlock_destroy(&rw_lock_);
+ THRIFT_UNUSED_VARIABLE(ret);
+ assert(ret == 0);
+ }
+ }
+
+ void acquireRead() const {
+ PROFILE_MUTEX_START_LOCK();
+ pthread_rwlock_rdlock(&rw_lock_);
+ PROFILE_MUTEX_NOT_LOCKED(); // not exclusive, so use not-locked path
+ }
+
+ void acquireWrite() const {
+ PROFILE_MUTEX_START_LOCK();
+ pthread_rwlock_wrlock(&rw_lock_);
+ PROFILE_MUTEX_LOCKED();
+ }
+
+ bool attemptRead() const { return !pthread_rwlock_tryrdlock(&rw_lock_); }
+
+ bool attemptWrite() const { return !pthread_rwlock_trywrlock(&rw_lock_); }
+
+ void release() const {
+ PROFILE_MUTEX_START_UNLOCK();
+ pthread_rwlock_unlock(&rw_lock_);
+ PROFILE_MUTEX_UNLOCKED();
+ }
+
+private:
+ mutable pthread_rwlock_t rw_lock_;
+ mutable bool initialized_;
+#ifndef THRIFT_NO_CONTENTION_PROFILING
+ mutable int64_t profileTime_;
+#endif
+};
+
+ReadWriteMutex::ReadWriteMutex() : impl_(new ReadWriteMutex::impl()) {}
+
+void ReadWriteMutex::acquireRead() const { impl_->acquireRead(); }
+
+void ReadWriteMutex::acquireWrite() const { impl_->acquireWrite(); }
+
+bool ReadWriteMutex::attemptRead() const { return impl_->attemptRead(); }
+
+bool ReadWriteMutex::attemptWrite() const { return impl_->attemptWrite(); }
+
+void ReadWriteMutex::release() const { impl_->release(); }
+
+NoStarveReadWriteMutex::NoStarveReadWriteMutex() : writerWaiting_(false) {}
+
+void NoStarveReadWriteMutex::acquireRead() const
+{
+ if (writerWaiting_) {
+ // writer is waiting, block on the writer's mutex until he's done with it
+ mutex_.lock();
+ mutex_.unlock();
+ }
+
+ ReadWriteMutex::acquireRead();
+}
+
+void NoStarveReadWriteMutex::acquireWrite() const
+{
+ // if we can acquire the rwlock the easy way, we're done
+ if (attemptWrite()) {
+ return;
+ }
+
+ // failed to get the rwlock, do it the hard way:
+ // locking the mutex and setting writerWaiting will cause all new readers to
+ // block on the mutex rather than on the rwlock.
+ mutex_.lock();
+ writerWaiting_ = true;
+ ReadWriteMutex::acquireWrite();
+ writerWaiting_ = false;
+ mutex_.unlock();
+}
+
+}}} // apache::thrift::concurrency
+
http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/Mutex.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/Mutex.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/Mutex.h
new file mode 100644
index 0000000..3cd8440
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/Mutex.h
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_MUTEX_H_
+#define _THRIFT_CONCURRENCY_MUTEX_H_ 1
+
+#include <boost/shared_ptr.hpp>
+#include <boost/noncopyable.hpp>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+#ifndef THRIFT_NO_CONTENTION_PROFILING
+
+/**
+ * Determines if the Thrift Mutex and ReadWriteMutex classes will attempt to
+ * profile their blocking acquire methods. If this value is set to non-zero,
+ * Thrift will attempt to invoke the callback once every profilingSampleRate
+ * times. However, as the sampling is not synchronized the rate is not
+ * guranateed, and could be subject to big bursts and swings. Please ensure
+ * your sampling callback is as performant as your application requires.
+ *
+ * The callback will get called with the wait time taken to lock the mutex in
+ * usec and a (void*) that uniquely identifies the Mutex (or ReadWriteMutex)
+ * being locked.
+ *
+ * The enableMutexProfiling() function is unsynchronized; calling this function
+ * while profiling is already enabled may result in race conditions. On
+ * architectures where a pointer assignment is atomic, this is safe but there
+ * is no guarantee threads will agree on a single callback within any
+ * particular time period.
+ */
+typedef void (*MutexWaitCallback)(const void* id, int64_t waitTimeMicros);
+void enableMutexProfiling(int32_t profilingSampleRate,
+ MutexWaitCallback callback);
+
+#endif
+
+/**
+ * A simple mutex class
+ *
+ * @version $Id:$
+ */
+class Mutex {
+ public:
+ typedef void (*Initializer)(void*);
+
+ Mutex(Initializer init = DEFAULT_INITIALIZER);
+ virtual ~Mutex() {}
+ virtual void lock() const;
+ virtual bool trylock() const;
+ virtual bool timedlock(int64_t milliseconds) const;
+ virtual void unlock() const;
+
+ void* getUnderlyingImpl() const;
+
+ static void DEFAULT_INITIALIZER(void*);
+ static void ADAPTIVE_INITIALIZER(void*);
+ static void RECURSIVE_INITIALIZER(void*);
+
+ private:
+
+ class impl;
+ boost::shared_ptr<impl> impl_;
+};
+
+class ReadWriteMutex {
+public:
+ ReadWriteMutex();
+ virtual ~ReadWriteMutex() {}
+
+ // these get the lock and block until it is done successfully
+ virtual void acquireRead() const;
+ virtual void acquireWrite() const;
+
+ // these attempt to get the lock, returning false immediately if they fail
+ virtual bool attemptRead() const;
+ virtual bool attemptWrite() const;
+
+ // this releases both read and write locks
+ virtual void release() const;
+
+private:
+
+ class impl;
+ boost::shared_ptr<impl> impl_;
+};
+
+/**
+ * A ReadWriteMutex that guarantees writers will not be starved by readers:
+ * When a writer attempts to acquire the mutex, all new readers will be
+ * blocked from acquiring the mutex until the writer has acquired and
+ * released it. In some operating systems, this may already be guaranteed
+ * by a regular ReadWriteMutex.
+ */
+class NoStarveReadWriteMutex : public ReadWriteMutex {
+public:
+ NoStarveReadWriteMutex();
+
+ virtual void acquireRead() const;
+ virtual void acquireWrite() const;
+
+private:
+ Mutex mutex_;
+ mutable volatile bool writerWaiting_;
+};
+
+class Guard : boost::noncopyable {
+ public:
+ Guard(const Mutex& value, int64_t timeout = 0) : mutex_(&value) {
+ if (timeout == 0) {
+ value.lock();
+ } else if (timeout < 0) {
+ if (!value.trylock()) {
+ mutex_ = NULL;
+ }
+ } else {
+ if (!value.timedlock(timeout)) {
+ mutex_ = NULL;
+ }
+ }
+ }
+ ~Guard() {
+ if (mutex_) {
+ mutex_->unlock();
+ }
+ }
+
+ operator bool() const {
+ return (mutex_ != NULL);
+ }
+
+ private:
+ const Mutex* mutex_;
+};
+
+// Can be used as second argument to RWGuard to make code more readable
+// as to whether we're doing acquireRead() or acquireWrite().
+enum RWGuardType {
+ RW_READ = 0,
+ RW_WRITE = 1
+};
+
+
+class RWGuard : boost::noncopyable {
+ public:
+ RWGuard(const ReadWriteMutex& value, bool write = false)
+ : rw_mutex_(value) {
+ if (write) {
+ rw_mutex_.acquireWrite();
+ } else {
+ rw_mutex_.acquireRead();
+ }
+ }
+
+ RWGuard(const ReadWriteMutex& value, RWGuardType type)
+ : rw_mutex_(value) {
+ if (type == RW_WRITE) {
+ rw_mutex_.acquireWrite();
+ } else {
+ rw_mutex_.acquireRead();
+ }
+ }
+ ~RWGuard() {
+ rw_mutex_.release();
+ }
+ private:
+ const ReadWriteMutex& rw_mutex_;
+};
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_MUTEX_H_
http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/PlatformThreadFactory.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/PlatformThreadFactory.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/PlatformThreadFactory.h
new file mode 100644
index 0000000..6e46dfc
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/PlatformThreadFactory.h
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_
+#define _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_ 1
+
+#include <thrift/thrift-config.h>
+#if USE_BOOST_THREAD
+# include <thrift/concurrency/BoostThreadFactory.h>
+#elif USE_STD_THREAD
+# include <thrift/concurrency/StdThreadFactory.h>
+#else
+# include <thrift/concurrency/PosixThreadFactory.h>
+#endif
+
+namespace apache { namespace thrift { namespace concurrency {
+
+#ifdef USE_BOOST_THREAD
+ typedef BoostThreadFactory PlatformThreadFactory;
+#elif USE_STD_THREAD
+ typedef StdThreadFactory PlatformThreadFactory;
+#else
+ typedef PosixThreadFactory PlatformThreadFactory;
+#endif
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_
http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/PosixThreadFactory.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/PosixThreadFactory.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/PosixThreadFactory.cpp
new file mode 100644
index 0000000..52ceead
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/PosixThreadFactory.cpp
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/thrift-config.h>
+
+#include <thrift/concurrency/PosixThreadFactory.h>
+#include <thrift/concurrency/Exception.h>
+
+#if GOOGLE_PERFTOOLS_REGISTER_THREAD
+# include <google/profiler.h>
+#endif
+
+#include <assert.h>
+#include <pthread.h>
+
+#include <iostream>
+
+#include <boost/weak_ptr.hpp>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+using boost::shared_ptr;
+using boost::weak_ptr;
+
+/**
+ * The POSIX thread class.
+ *
+ * @version $Id:$
+ */
+class PthreadThread: public Thread {
+ public:
+
+ enum STATE {
+ uninitialized,
+ starting,
+ started,
+ stopping,
+ stopped
+ };
+
+ static const int MB = 1024 * 1024;
+
+ static void* threadMain(void* arg);
+
+ private:
+ pthread_t pthread_;
+ STATE state_;
+ int policy_;
+ int priority_;
+ int stackSize_;
+ weak_ptr<PthreadThread> self_;
+ bool detached_;
+
+ public:
+
+ PthreadThread(int policy, int priority, int stackSize, bool detached, shared_ptr<Runnable> runnable) :
+
+#ifndef _WIN32
+ pthread_(0),
+#endif // _WIN32
+
+ state_(uninitialized),
+ policy_(policy),
+ priority_(priority),
+ stackSize_(stackSize),
+ detached_(detached) {
+
+ this->Thread::runnable(runnable);
+ }
+
+ ~PthreadThread() {
+ /* Nothing references this thread, if is is not detached, do a join
+ now, otherwise the thread-id and, possibly, other resources will
+ be leaked. */
+ if(!detached_) {
+ try {
+ join();
+ } catch(...) {
+ // We're really hosed.
+ }
+ }
+ }
+
+ void start() {
+ if (state_ != uninitialized) {
+ return;
+ }
+
+ pthread_attr_t thread_attr;
+ if (pthread_attr_init(&thread_attr) != 0) {
+ throw SystemResourceException("pthread_attr_init failed");
+ }
+
+ if(pthread_attr_setdetachstate(&thread_attr,
+ detached_ ?
+ PTHREAD_CREATE_DETACHED :
+ PTHREAD_CREATE_JOINABLE) != 0) {
+ throw SystemResourceException("pthread_attr_setdetachstate failed");
+ }
+
+ // Set thread stack size
+ if (pthread_attr_setstacksize(&thread_attr, MB * stackSize_) != 0) {
+ throw SystemResourceException("pthread_attr_setstacksize failed");
+ }
+
+ // Set thread policy
+ #ifdef _WIN32
+ //WIN32 Pthread implementation doesn't seem to support sheduling policies other then PosixThreadFactory::OTHER - runtime error
+ policy_ = PosixThreadFactory::OTHER;
+ #endif
+
+ if (pthread_attr_setschedpolicy(&thread_attr, policy_) != 0) {
+ throw SystemResourceException("pthread_attr_setschedpolicy failed");
+ }
+
+ struct sched_param sched_param;
+ sched_param.sched_priority = priority_;
+
+ // Set thread priority
+ if (pthread_attr_setschedparam(&thread_attr, &sched_param) != 0) {
+ throw SystemResourceException("pthread_attr_setschedparam failed");
+ }
+
+ // Create reference
+ shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
+ *selfRef = self_.lock();
+
+ state_ = starting;
+
+ if (pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) {
+ throw SystemResourceException("pthread_create failed");
+ }
+ }
+
+ void join() {
+ if (!detached_ && state_ != uninitialized) {
+ void* ignore;
+ /* XXX
+ If join fails it is most likely due to the fact
+ that the last reference was the thread itself and cannot
+ join. This results in leaked threads and will eventually
+ cause the process to run out of thread resources.
+ We're beyond the point of throwing an exception. Not clear how
+ best to handle this. */
+ int res = pthread_join(pthread_, &ignore);
+ detached_ = (res == 0);
+ if (res != 0) {
+ GlobalOutput.printf("PthreadThread::join(): fail with code %d", res);
+ }
+ } else {
+ GlobalOutput.printf("PthreadThread::join(): detached thread");
+ }
+ }
+
+ Thread::id_t getId() {
+
+#ifndef _WIN32
+ return (Thread::id_t)pthread_;
+#else
+ return (Thread::id_t)pthread_.p;
+#endif // _WIN32
+ }
+
+ shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
+
+ void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); }
+
+ void weakRef(shared_ptr<PthreadThread> self) {
+ assert(self.get() == this);
+ self_ = weak_ptr<PthreadThread>(self);
+ }
+};
+
+void* PthreadThread::threadMain(void* arg) {
+ shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg;
+ delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg);
+
+ if (thread == NULL) {
+ return (void*)0;
+ }
+
+ if (thread->state_ != starting) {
+ return (void*)0;
+ }
+
+#if GOOGLE_PERFTOOLS_REGISTER_THREAD
+ ProfilerRegisterThread();
+#endif
+
+ thread->state_ = started;
+ thread->runnable()->run();
+ if (thread->state_ != stopping && thread->state_ != stopped) {
+ thread->state_ = stopping;
+ }
+
+ return (void*)0;
+}
+
+/**
+ * POSIX Thread factory implementation
+ */
+class PosixThreadFactory::Impl {
+
+ private:
+ POLICY policy_;
+ PRIORITY priority_;
+ int stackSize_;
+ bool detached_;
+
+ /**
+ * Converts generic posix thread schedule policy enums into pthread
+ * API values.
+ */
+ static int toPthreadPolicy(POLICY policy) {
+ switch (policy) {
+ case OTHER:
+ return SCHED_OTHER;
+ case FIFO:
+ return SCHED_FIFO;
+ case ROUND_ROBIN:
+ return SCHED_RR;
+ }
+ return SCHED_OTHER;
+ }
+
+ /**
+ * Converts relative thread priorities to absolute value based on posix
+ * thread scheduler policy
+ *
+ * The idea is simply to divide up the priority range for the given policy
+ * into the correpsonding relative priority level (lowest..highest) and
+ * then pro-rate accordingly.
+ */
+ static int toPthreadPriority(POLICY policy, PRIORITY priority) {
+ int pthread_policy = toPthreadPolicy(policy);
+ int min_priority = 0;
+ int max_priority = 0;
+#ifdef HAVE_SCHED_GET_PRIORITY_MIN
+ min_priority = sched_get_priority_min(pthread_policy);
+#endif
+#ifdef HAVE_SCHED_GET_PRIORITY_MAX
+ max_priority = sched_get_priority_max(pthread_policy);
+#endif
+ int quanta = (HIGHEST - LOWEST) + 1;
+ float stepsperquanta = (float)(max_priority - min_priority) / quanta;
+
+ if (priority <= HIGHEST) {
+ return (int)(min_priority + stepsperquanta * priority);
+ } else {
+ // should never get here for priority increments.
+ assert(false);
+ return (int)(min_priority + stepsperquanta * NORMAL);
+ }
+ }
+
+ public:
+
+ Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
+ policy_(policy),
+ priority_(priority),
+ stackSize_(stackSize),
+ detached_(detached) {}
+
+ /**
+ * Creates a new POSIX thread to run the runnable object
+ *
+ * @param runnable A runnable object
+ */
+ shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
+ shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, detached_, runnable));
+ result->weakRef(result);
+ runnable->thread(result);
+ return result;
+ }
+
+ int getStackSize() const { return stackSize_; }
+
+ void setStackSize(int value) { stackSize_ = value; }
+
+ PRIORITY getPriority() const { return priority_; }
+
+ /**
+ * Sets priority.
+ *
+ * XXX
+ * Need to handle incremental priorities properly.
+ */
+ void setPriority(PRIORITY value) { priority_ = value; }
+
+ bool isDetached() const { return detached_; }
+
+ void setDetached(bool value) { detached_ = value; }
+
+ Thread::id_t getCurrentThreadId() const {
+
+#ifndef _WIN32
+ return (Thread::id_t)pthread_self();
+#else
+ return (Thread::id_t)pthread_self().p;
+#endif // _WIN32
+
+ }
+
+};
+
+PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
+ impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
+
+shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
+
+int PosixThreadFactory::getStackSize() const { return impl_->getStackSize(); }
+
+void PosixThreadFactory::setStackSize(int value) { impl_->setStackSize(value); }
+
+PosixThreadFactory::PRIORITY PosixThreadFactory::getPriority() const { return impl_->getPriority(); }
+
+void PosixThreadFactory::setPriority(PosixThreadFactory::PRIORITY value) { impl_->setPriority(value); }
+
+bool PosixThreadFactory::isDetached() const { return impl_->isDetached(); }
+
+void PosixThreadFactory::setDetached(bool value) { impl_->setDetached(value); }
+
+Thread::id_t PosixThreadFactory::getCurrentThreadId() const { return impl_->getCurrentThreadId(); }
+
+}}} // apache::thrift::concurrency
http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/PosixThreadFactory.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/PosixThreadFactory.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/PosixThreadFactory.h
new file mode 100644
index 0000000..72368ca
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/PosixThreadFactory.h
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_
+#define _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_ 1
+
+#include <thrift/concurrency/Thread.h>
+
+#include <boost/shared_ptr.hpp>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * A thread factory to create posix threads
+ *
+ * @version $Id:$
+ */
+class PosixThreadFactory : public ThreadFactory {
+
+ public:
+
+ /**
+ * POSIX Thread scheduler policies
+ */
+ enum POLICY {
+ OTHER,
+ FIFO,
+ ROUND_ROBIN
+ };
+
+ /**
+ * POSIX Thread scheduler relative priorities,
+ *
+ * Absolute priority is determined by scheduler policy and OS. This
+ * enumeration specifies relative priorities such that one can specify a
+ * priority withing a giving scheduler policy without knowing the absolute
+ * value of the priority.
+ */
+ enum PRIORITY {
+ LOWEST = 0,
+ LOWER = 1,
+ LOW = 2,
+ NORMAL = 3,
+ HIGH = 4,
+ HIGHER = 5,
+ HIGHEST = 6,
+ INCREMENT = 7,
+ DECREMENT = 8
+ };
+
+ /**
+ * Posix thread (pthread) factory. All threads created by a factory are reference-counted
+ * via boost::shared_ptr and boost::weak_ptr. The factory guarantees that threads and
+ * the Runnable tasks they host will be properly cleaned up once the last strong reference
+ * to both is given up.
+ *
+ * Threads are created with the specified policy, priority, stack-size and detachable-mode
+ * detached means the thread is free-running and will release all system resources the
+ * when it completes. A detachable thread is not joinable. The join method
+ * of a detachable thread will return immediately with no error.
+ *
+ * By default threads are not joinable.
+ */
+
+ PosixThreadFactory(POLICY policy=ROUND_ROBIN, PRIORITY priority=NORMAL, int stackSize=1, bool detached=true);
+
+ // From ThreadFactory;
+ boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const;
+
+ // From ThreadFactory;
+ Thread::id_t getCurrentThreadId() const;
+
+ /**
+ * Gets stack size for created threads
+ *
+ * @return int size in megabytes
+ */
+ virtual int getStackSize() const;
+
+ /**
+ * Sets stack size for created threads
+ *
+ * @param value size in megabytes
+ */
+ virtual void setStackSize(int value);
+
+ /**
+ * Gets priority relative to current policy
+ */
+ virtual PRIORITY getPriority() const;
+
+ /**
+ * Sets priority relative to current policy
+ */
+ virtual void setPriority(PRIORITY priority);
+
+ /**
+ * Sets detached mode of threads
+ */
+ virtual void setDetached(bool detached);
+
+ /**
+ * Gets current detached mode
+ */
+ virtual bool isDetached() const;
+
+ private:
+ class Impl;
+ boost::shared_ptr<Impl> impl_;
+};
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_
http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/StdMonitor.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/StdMonitor.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/StdMonitor.cpp
new file mode 100644
index 0000000..cf257e6
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/StdMonitor.cpp
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/thrift-config.h>
+
+#include <thrift/concurrency/Monitor.h>
+#include <thrift/concurrency/Exception.h>
+#include <thrift/concurrency/Util.h>
+#include <thrift/transport/PlatformSocket.h>
+#include <assert.h>
+
+#include <condition_variable>
+#include <chrono>
+#include <thread>
+#include <mutex>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * Monitor implementation using the std thread library
+ *
+ * @version $Id:$
+ */
+class Monitor::Impl {
+
+ public:
+
+ Impl()
+ : ownedMutex_(new Mutex()),
+ conditionVariable_(),
+ mutex_(NULL) {
+ init(ownedMutex_.get());
+ }
+
+ Impl(Mutex* mutex)
+ : ownedMutex_(),
+ conditionVariable_(),
+ mutex_(NULL) {
+ init(mutex);
+ }
+
+ Impl(Monitor* monitor)
+ : ownedMutex_(),
+ conditionVariable_(),
+ mutex_(NULL) {
+ init(&(monitor->mutex()));
+ }
+
+ Mutex& mutex() { return *mutex_; }
+ void lock() { mutex_->lock(); }
+ void unlock() { mutex_->unlock(); }
+
+ /**
+ * Exception-throwing version of waitForTimeRelative(), called simply
+ * wait(int64) for historical reasons. Timeout is in milliseconds.
+ *
+ * If the condition occurs, this function returns cleanly; on timeout or
+ * error an exception is thrown.
+ */
+ void wait(int64_t timeout_ms) {
+ int result = waitForTimeRelative(timeout_ms);
+ if (result == THRIFT_ETIMEDOUT) {
+ throw TimedOutException();
+ } else if (result != 0) {
+ throw TException(
+ "Monitor::wait() failed");
+ }
+ }
+
+ /**
+ * Waits until the specified timeout in milliseconds for the condition to
+ * occur, or waits forever if timeout_ms == 0.
+ *
+ * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
+ */
+ int waitForTimeRelative(int64_t timeout_ms) {
+ if (timeout_ms == 0LL) {
+ return waitForever();
+ }
+
+ assert(mutex_);
+ std::timed_mutex* mutexImpl =
+ static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl());
+ assert(mutexImpl);
+
+ std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock);
+ bool timedout = (conditionVariable_.wait_for(lock, std::chrono::milliseconds(timeout_ms)) == std::cv_status::timeout);
+ lock.release();
+ return (timedout ? THRIFT_ETIMEDOUT : 0);
+ }
+
+ /**
+ * Waits until the absolute time specified using struct THRIFT_TIMESPEC.
+ * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
+ */
+ int waitForTime(const THRIFT_TIMESPEC* abstime) {
+ struct timeval temp;
+ temp.tv_sec = static_cast<long>(abstime->tv_sec);
+ temp.tv_usec = static_cast<long>(abstime->tv_nsec) / 1000;
+ return waitForTime(&temp);
+ }
+
+ /**
+ * Waits until the absolute time specified using struct timeval.
+ * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
+ */
+ int waitForTime(const struct timeval* abstime) {
+ assert(mutex_);
+ std::timed_mutex* mutexImpl =
+ static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl());
+ assert(mutexImpl);
+
+ struct timeval currenttime;
+ Util::toTimeval(currenttime, Util::currentTime());
+
+ long tv_sec = static_cast<long>(abstime->tv_sec - currenttime.tv_sec);
+ long tv_usec = static_cast<long>(abstime->tv_usec - currenttime.tv_usec);
+ if(tv_sec < 0)
+ tv_sec = 0;
+ if(tv_usec < 0)
+ tv_usec = 0;
+
+ std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock);
+ bool timedout = (conditionVariable_.wait_for(lock,
+ std::chrono::seconds(tv_sec) +
+ std::chrono::microseconds(tv_usec)) == std::cv_status::timeout);
+ lock.release();
+ return (timedout ? THRIFT_ETIMEDOUT : 0);
+ }
+
+ /**
+ * Waits forever until the condition occurs.
+ * Returns 0 if condition occurs, or an error code otherwise.
+ */
+ int waitForever() {
+ assert(mutex_);
+ std::timed_mutex* mutexImpl =
+ static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl());
+ assert(mutexImpl);
+
+ std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock);
+ conditionVariable_.wait(lock);
+ lock.release();
+ return 0;
+ }
+
+
+ void notify() {
+ conditionVariable_.notify_one();
+ }
+
+ void notifyAll() {
+ conditionVariable_.notify_all();
+ }
+
+ private:
+
+ void init(Mutex* mutex) {
+ mutex_ = mutex;
+ }
+
+ const std::unique_ptr<Mutex> ownedMutex_;
+ std::condition_variable_any conditionVariable_;
+ Mutex* mutex_;
+};
+
+Monitor::Monitor() : impl_(new Monitor::Impl()) {}
+Monitor::Monitor(Mutex* mutex) : impl_(new Monitor::Impl(mutex)) {}
+Monitor::Monitor(Monitor* monitor) : impl_(new Monitor::Impl(monitor)) {}
+
+Monitor::~Monitor() { delete impl_; }
+
+Mutex& Monitor::mutex() const { return const_cast<Monitor::Impl*>(impl_)->mutex(); }
+
+void Monitor::lock() const { const_cast<Monitor::Impl*>(impl_)->lock(); }
+
+void Monitor::unlock() const { const_cast<Monitor::Impl*>(impl_)->unlock(); }
+
+void Monitor::wait(int64_t timeout) const { const_cast<Monitor::Impl*>(impl_)->wait(timeout); }
+
+int Monitor::waitForTime(const THRIFT_TIMESPEC* abstime) const {
+ return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime);
+}
+
+int Monitor::waitForTime(const timeval* abstime) const {
+ return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime);
+}
+
+int Monitor::waitForTimeRelative(int64_t timeout_ms) const {
+ return const_cast<Monitor::Impl*>(impl_)->waitForTimeRelative(timeout_ms);
+}
+
+int Monitor::waitForever() const {
+ return const_cast<Monitor::Impl*>(impl_)->waitForever();
+}
+
+void Monitor::notify() const { const_cast<Monitor::Impl*>(impl_)->notify(); }
+
+void Monitor::notifyAll() const { const_cast<Monitor::Impl*>(impl_)->notifyAll(); }
+
+}}} // apache::thrift::concurrency
http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/StdMutex.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/StdMutex.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/StdMutex.cpp
new file mode 100644
index 0000000..28f889a
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/StdMutex.cpp
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/thrift-config.h>
+
+#include <thrift/concurrency/Mutex.h>
+#include <thrift/concurrency/Util.h>
+
+#include <cassert>
+#include <chrono>
+#include <mutex>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * Implementation of Mutex class using C++11 std::timed_mutex
+ *
+ * @version $Id:$
+ */
+class Mutex::impl : public std::timed_mutex {
+};
+
+Mutex::Mutex(Initializer init) : impl_(new Mutex::impl()) {}
+
+void* Mutex::getUnderlyingImpl() const { return impl_.get(); }
+
+void Mutex::lock() const { impl_->lock(); }
+
+bool Mutex::trylock() const { return impl_->try_lock(); }
+
+bool Mutex::timedlock(int64_t ms) const { return impl_->try_lock_for(std::chrono::milliseconds(ms)); }
+
+void Mutex::unlock() const { impl_->unlock(); }
+
+void Mutex::DEFAULT_INITIALIZER(void* arg) {
+}
+
+}}} // apache::thrift::concurrency
+
http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/StdThreadFactory.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/StdThreadFactory.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/StdThreadFactory.cpp
new file mode 100644
index 0000000..3239bd9
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/StdThreadFactory.cpp
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/thrift-config.h>
+
+#include <thrift/concurrency/StdThreadFactory.h>
+#include <thrift/concurrency/Exception.h>
+
+#include <cassert>
+
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/weak_ptr.hpp>
+#include <thread>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * The C++11 thread class.
+ *
+ * Note that we use boost shared_ptr rather than std shared_ptrs here
+ * because the Thread/Runnable classes use those and we don't want to
+ * mix them.
+ *
+ * @version $Id:$
+ */
+class StdThread: public Thread, public boost::enable_shared_from_this<StdThread> {
+ public:
+
+ enum STATE {
+ uninitialized,
+ starting,
+ started,
+ stopping,
+ stopped
+ };
+
+ static void threadMain(boost::shared_ptr<StdThread> thread);
+
+ private:
+ std::unique_ptr<std::thread> thread_;
+ STATE state_;
+ bool detached_;
+
+ public:
+
+ StdThread(bool detached, boost::shared_ptr<Runnable> runnable) :
+ state_(uninitialized),
+ detached_(detached) {
+ this->Thread::runnable(runnable);
+ }
+
+ ~StdThread() {
+ if(!detached_) {
+ try {
+ join();
+ } catch(...) {
+ // We're really hosed.
+ }
+ }
+ }
+
+ void start() {
+ if (state_ != uninitialized) {
+ return;
+ }
+
+ boost::shared_ptr<StdThread> selfRef = shared_from_this();
+ state_ = starting;
+
+ thread_ = std::unique_ptr<std::thread>(new std::thread(threadMain, selfRef));
+
+ if(detached_)
+ thread_->detach();
+ }
+
+ void join() {
+ if (!detached_ && state_ != uninitialized) {
+ thread_->join();
+ }
+ }
+
+ Thread::id_t getId() {
+ return thread_.get() ? thread_->get_id() : std::thread::id();
+ }
+
+ boost::shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
+
+ void runnable(boost::shared_ptr<Runnable> value) { Thread::runnable(value); }
+};
+
+void StdThread::threadMain(boost::shared_ptr<StdThread> thread) {
+ if (thread == NULL) {
+ return;
+ }
+
+ if (thread->state_ != starting) {
+ return;
+ }
+
+ thread->state_ = started;
+ thread->runnable()->run();
+
+ if (thread->state_ != stopping && thread->state_ != stopped) {
+ thread->state_ = stopping;
+ }
+
+ return;
+}
+
+/**
+ * std::thread factory implementation
+ */
+class StdThreadFactory::Impl {
+
+ private:
+ bool detached_;
+
+ public:
+
+ Impl(bool detached) :
+ detached_(detached) {}
+
+ /**
+ * Creates a new std::thread to run the runnable object
+ *
+ * @param runnable A runnable object
+ */
+ boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const {
+ boost::shared_ptr<StdThread> result = boost::shared_ptr<StdThread>(new StdThread(detached_, runnable));
+ runnable->thread(result);
+ return result;
+ }
+
+ bool isDetached() const { return detached_; }
+
+ void setDetached(bool value) { detached_ = value; }
+
+ Thread::id_t getCurrentThreadId() const {
+ return std::this_thread::get_id();
+ }
+
+};
+
+StdThreadFactory::StdThreadFactory(bool detached) :
+ impl_(new StdThreadFactory::Impl(detached)) {}
+
+boost::shared_ptr<Thread> StdThreadFactory::newThread(boost::shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
+
+bool StdThreadFactory::isDetached() const { return impl_->isDetached(); }
+
+void StdThreadFactory::setDetached(bool value) { impl_->setDetached(value); }
+
+Thread::id_t StdThreadFactory::getCurrentThreadId() const { return impl_->getCurrentThreadId(); }
+
+}}} // apache::thrift::concurrency
http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/StdThreadFactory.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/StdThreadFactory.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/StdThreadFactory.h
new file mode 100644
index 0000000..307f970
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/StdThreadFactory.h
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_STDTHREADFACTORY_H_
+#define _THRIFT_CONCURRENCY_STDTHREADFACTORY_H_ 1
+
+#include <thrift/concurrency/Thread.h>
+
+#include <boost/shared_ptr.hpp>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * A thread factory to create std::threads.
+ *
+ * @version $Id:$
+ */
+class StdThreadFactory : public ThreadFactory {
+
+ public:
+
+ /**
+ * Std thread factory. All threads created by a factory are reference-counted
+ * via boost::shared_ptr and boost::weak_ptr. The factory guarantees that threads and
+ * the Runnable tasks they host will be properly cleaned up once the last strong reference
+ * to both is given up.
+ *
+ * By default threads are not joinable.
+ */
+
+ StdThreadFactory(bool detached=true);
+
+ // From ThreadFactory;
+ boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const;
+
+ // From ThreadFactory;
+ Thread::id_t getCurrentThreadId() const;
+
+ /**
+ * Sets detached mode of threads
+ */
+ virtual void setDetached(bool detached);
+
+ /**
+ * Gets current detached mode
+ */
+ virtual bool isDetached() const;
+
+private:
+ class Impl;
+ boost::shared_ptr<Impl> impl_;
+};
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_STDTHREADFACTORY_H_
http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/Thread.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/Thread.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/Thread.h
new file mode 100755
index 0000000..7012933
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/Thread.h
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_THREAD_H_
+#define _THRIFT_CONCURRENCY_THREAD_H_ 1
+
+#include <stdint.h>
+#include <boost/shared_ptr.hpp>
+#include <boost/weak_ptr.hpp>
+
+#include <thrift/thrift-config.h>
+
+#if USE_BOOST_THREAD
+# include <boost/thread.hpp>
+#elif USE_STD_THREAD
+# include <thread>
+#else
+# ifdef HAVE_PTHREAD_H
+# include <pthread.h>
+# endif
+#endif
+
+namespace apache { namespace thrift { namespace concurrency {
+
+class Thread;
+
+/**
+ * Minimal runnable class. More or less analogous to java.lang.Runnable.
+ *
+ * @version $Id:$
+ */
+class Runnable {
+
+ public:
+ virtual ~Runnable() {};
+ virtual void run() = 0;
+
+ /**
+ * Gets the thread object that is hosting this runnable object - can return
+ * an empty boost::shared pointer if no references remain on thet thread object
+ */
+ virtual boost::shared_ptr<Thread> thread() { return thread_.lock(); }
+
+ /**
+ * Sets the thread that is executing this object. This is only meant for
+ * use by concrete implementations of Thread.
+ */
+ virtual void thread(boost::shared_ptr<Thread> value) { thread_ = value; }
+
+ private:
+ boost::weak_ptr<Thread> thread_;
+};
+
+/**
+ * Minimal thread class. Returned by thread factory bound to a Runnable object
+ * and ready to start execution. More or less analogous to java.lang.Thread
+ * (minus all the thread group, priority, mode and other baggage, since that
+ * is difficult to abstract across platforms and is left for platform-specific
+ * ThreadFactory implemtations to deal with
+ *
+ * @see apache::thrift::concurrency::ThreadFactory)
+ */
+class Thread {
+
+ public:
+
+#if USE_BOOST_THREAD
+ typedef boost::thread::id id_t;
+
+ static inline bool is_current(id_t t) { return t == boost::this_thread::get_id(); }
+ static inline id_t get_current() { return boost::this_thread::get_id(); }
+#elif USE_STD_THREAD
+ typedef std::thread::id id_t;
+
+ static inline bool is_current(id_t t) { return t == std::this_thread::get_id(); }
+ static inline id_t get_current() { return std::this_thread::get_id(); }
+#else
+ typedef pthread_t id_t;
+
+ static inline bool is_current(id_t t) { return pthread_equal(pthread_self(), t); }
+ static inline id_t get_current() { return pthread_self(); }
+#endif
+
+ virtual ~Thread() {};
+
+ /**
+ * Starts the thread. Does platform specific thread creation and
+ * configuration then invokes the run method of the Runnable object bound
+ * to this thread.
+ */
+ virtual void start() = 0;
+
+ /**
+ * Join this thread. Current thread blocks until this target thread
+ * completes.
+ */
+ virtual void join() = 0;
+
+ /**
+ * Gets the thread's platform-specific ID
+ */
+ virtual id_t getId() = 0;
+
+ /**
+ * Gets the runnable object this thread is hosting
+ */
+ virtual boost::shared_ptr<Runnable> runnable() const { return _runnable; }
+
+ protected:
+ virtual void runnable(boost::shared_ptr<Runnable> value) { _runnable = value; }
+
+ private:
+ boost::shared_ptr<Runnable> _runnable;
+
+};
+
+/**
+ * Factory to create platform-specific thread object and bind them to Runnable
+ * object for execution
+ */
+class ThreadFactory {
+
+ public:
+ virtual ~ThreadFactory() {}
+ virtual boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const = 0;
+
+ /** Gets the current thread id or unknown_thread_id if the current thread is not a thrift thread */
+
+ static const Thread::id_t unknown_thread_id;
+
+ virtual Thread::id_t getCurrentThreadId() const = 0;
+};
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_THREAD_H_
http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/ThreadManager.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/ThreadManager.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/ThreadManager.cpp
new file mode 100644
index 0000000..f2c0fa5
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/concurrency/ThreadManager.cpp
@@ -0,0 +1,583 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/thrift-config.h>
+
+#include <thrift/concurrency/ThreadManager.h>
+#include <thrift/concurrency/Exception.h>
+#include <thrift/concurrency/Monitor.h>
+#include <thrift/concurrency/Util.h>
+
+#include <boost/shared_ptr.hpp>
+
+#include <assert.h>
+#include <queue>
+#include <set>
+
+#if defined(DEBUG)
+#include <iostream>
+#endif //defined(DEBUG)
+
+namespace apache { namespace thrift { namespace concurrency {
+
+using boost::shared_ptr;
+using boost::dynamic_pointer_cast;
+
+/**
+ * ThreadManager class
+ *
+ * This class manages a pool of threads. It uses a ThreadFactory to create
+ * threads. It never actually creates or destroys worker threads, rather
+ * it maintains statistics on number of idle threads, number of active threads,
+ * task backlog, and average wait and service times.
+ *
+ * @version $Id:$
+ */
+class ThreadManager::Impl : public ThreadManager {
+
+ public:
+ Impl() :
+ workerCount_(0),
+ workerMaxCount_(0),
+ idleCount_(0),
+ pendingTaskCountMax_(0),
+ expiredCount_(0),
+ state_(ThreadManager::UNINITIALIZED),
+ monitor_(&mutex_),
+ maxMonitor_(&mutex_) {}
+
+ ~Impl() { stop(); }
+
+ void start();
+
+ void stop() { stopImpl(false); }
+
+ void join() { stopImpl(true); }
+
+ ThreadManager::STATE state() const {
+ return state_;
+ }
+
+ shared_ptr<ThreadFactory> threadFactory() const {
+ Synchronized s(monitor_);
+ return threadFactory_;
+ }
+
+ void threadFactory(shared_ptr<ThreadFactory> value) {
+ Synchronized s(monitor_);
+ threadFactory_ = value;
+ }
+
+ void addWorker(size_t value);
+
+ void removeWorker(size_t value);
+
+ size_t idleWorkerCount() const {
+ return idleCount_;
+ }
+
+ size_t workerCount() const {
+ Synchronized s(monitor_);
+ return workerCount_;
+ }
+
+ size_t pendingTaskCount() const {
+ Synchronized s(monitor_);
+ return tasks_.size();
+ }
+
+ size_t totalTaskCount() const {
+ Synchronized s(monitor_);
+ return tasks_.size() + workerCount_ - idleCount_;
+ }
+
+ size_t pendingTaskCountMax() const {
+ Synchronized s(monitor_);
+ return pendingTaskCountMax_;
+ }
+
+ size_t expiredTaskCount() {
+ Synchronized s(monitor_);
+ size_t result = expiredCount_;
+ expiredCount_ = 0;
+ return result;
+ }
+
+ void pendingTaskCountMax(const size_t value) {
+ Synchronized s(monitor_);
+ pendingTaskCountMax_ = value;
+ }
+
+ bool canSleep();
+
+ void add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration);
+
+ void remove(shared_ptr<Runnable> task);
+
+ shared_ptr<Runnable> removeNextPending();
+
+ void removeExpiredTasks();
+
+ void setExpireCallback(ExpireCallback expireCallback);
+
+private:
+ void stopImpl(bool join);
+
+ size_t workerCount_;
+ size_t workerMaxCount_;
+ size_t idleCount_;
+ size_t pendingTaskCountMax_;
+ size_t expiredCount_;
+ ExpireCallback expireCallback_;
+
+ ThreadManager::STATE state_;
+ shared_ptr<ThreadFactory> threadFactory_;
+
+
+ friend class ThreadManager::Task;
+ std::queue<shared_ptr<Task> > tasks_;
+ Mutex mutex_;
+ Monitor monitor_;
+ Monitor maxMonitor_;
+ Monitor workerMonitor_;
+
+ friend class ThreadManager::Worker;
+ std::set<shared_ptr<Thread> > workers_;
+ std::set<shared_ptr<Thread> > deadWorkers_;
+ std::map<const Thread::id_t, shared_ptr<Thread> > idMap_;
+};
+
+class ThreadManager::Task : public Runnable {
+
+ public:
+ enum STATE {
+ WAITING,
+ EXECUTING,
+ CANCELLED,
+ COMPLETE
+ };
+
+ Task(shared_ptr<Runnable> runnable, int64_t expiration=0LL) :
+ runnable_(runnable),
+ state_(WAITING),
+ expireTime_(expiration != 0LL ? Util::currentTime() + expiration : 0LL) {}
+
+ ~Task() {}
+
+ void run() {
+ if (state_ == EXECUTING) {
+ runnable_->run();
+ state_ = COMPLETE;
+ }
+ }
+
+ shared_ptr<Runnable> getRunnable() {
+ return runnable_;
+ }
+
+ int64_t getExpireTime() const {
+ return expireTime_;
+ }
+
+ private:
+ shared_ptr<Runnable> runnable_;
+ friend class ThreadManager::Worker;
+ STATE state_;
+ int64_t expireTime_;
+};
+
+class ThreadManager::Worker: public Runnable {
+ enum STATE {
+ UNINITIALIZED,
+ STARTING,
+ STARTED,
+ STOPPING,
+ STOPPED
+ };
+
+ public:
+ Worker(ThreadManager::Impl* manager) :
+ manager_(manager),
+ state_(UNINITIALIZED),
+ idle_(false) {}
+
+ ~Worker() {}
+
+ private:
+ bool isActive() const {
+ return
+ (manager_->workerCount_ <= manager_->workerMaxCount_) ||
+ (manager_->state_ == JOINING && !manager_->tasks_.empty());
+ }
+
+ public:
+ /**
+ * Worker entry point
+ *
+ * As long as worker thread is running, pull tasks off the task queue and
+ * execute.
+ */
+ void run() {
+ bool active = false;
+ bool notifyManager = false;
+
+ /**
+ * Increment worker semaphore and notify manager if worker count reached
+ * desired max
+ *
+ * Note: We have to release the monitor and acquire the workerMonitor
+ * since that is what the manager blocks on for worker add/remove
+ */
+ {
+ Synchronized s(manager_->monitor_);
+ active = manager_->workerCount_ < manager_->workerMaxCount_;
+ if (active) {
+ manager_->workerCount_++;
+ notifyManager = manager_->workerCount_ == manager_->workerMaxCount_;
+ }
+ }
+
+ if (notifyManager) {
+ Synchronized s(manager_->workerMonitor_);
+ manager_->workerMonitor_.notify();
+ notifyManager = false;
+ }
+
+ while (active) {
+ shared_ptr<ThreadManager::Task> task;
+
+ /**
+ * While holding manager monitor block for non-empty task queue (Also
+ * check that the thread hasn't been requested to stop). Once the queue
+ * is non-empty, dequeue a task, release monitor, and execute. If the
+ * worker max count has been decremented such that we exceed it, mark
+ * ourself inactive, decrement the worker count and notify the manager
+ * (technically we're notifying the next blocked thread but eventually
+ * the manager will see it.
+ */
+ {
+ Guard g(manager_->mutex_);
+ active = isActive();
+
+ while (active && manager_->tasks_.empty()) {
+ manager_->idleCount_++;
+ idle_ = true;
+ manager_->monitor_.wait();
+ active = isActive();
+ idle_ = false;
+ manager_->idleCount_--;
+ }
+
+ if (active) {
+ manager_->removeExpiredTasks();
+
+ if (!manager_->tasks_.empty()) {
+ task = manager_->tasks_.front();
+ manager_->tasks_.pop();
+ if (task->state_ == ThreadManager::Task::WAITING) {
+ task->state_ = ThreadManager::Task::EXECUTING;
+ }
+
+ /* If we have a pending task max and we just dropped below it, wakeup any
+ thread that might be blocked on add. */
+ if (manager_->pendingTaskCountMax_ != 0 &&
+ manager_->tasks_.size() <= manager_->pendingTaskCountMax_ - 1) {
+ manager_->maxMonitor_.notify();
+ }
+ }
+ } else {
+ idle_ = true;
+ manager_->workerCount_--;
+ notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_);
+ }
+ }
+
+ if (task) {
+ if (task->state_ == ThreadManager::Task::EXECUTING) {
+ try {
+ task->run();
+ } catch(...) {
+ // XXX need to log this
+ }
+ }
+ }
+ }
+
+ {
+ Synchronized s(manager_->workerMonitor_);
+ manager_->deadWorkers_.insert(this->thread());
+ if (notifyManager) {
+ manager_->workerMonitor_.notify();
+ }
+ }
+
+ return;
+ }
+
+ private:
+ ThreadManager::Impl* manager_;
+ friend class ThreadManager::Impl;
+ STATE state_;
+ bool idle_;
+};
+
+
+ void ThreadManager::Impl::addWorker(size_t value) {
+ std::set<shared_ptr<Thread> > newThreads;
+ for (size_t ix = 0; ix < value; ix++) {
+ shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
+ newThreads.insert(threadFactory_->newThread(worker));
+ }
+
+ {
+ Synchronized s(monitor_);
+ workerMaxCount_ += value;
+ workers_.insert(newThreads.begin(), newThreads.end());
+ }
+
+ for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
+ shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
+ worker->state_ = ThreadManager::Worker::STARTING;
+ (*ix)->start();
+ idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->getId(), *ix));
+ }
+
+ {
+ Synchronized s(workerMonitor_);
+ while (workerCount_ != workerMaxCount_) {
+ workerMonitor_.wait();
+ }
+ }
+}
+
+void ThreadManager::Impl::start() {
+
+ if (state_ == ThreadManager::STOPPED) {
+ return;
+ }
+
+ {
+ Synchronized s(monitor_);
+ if (state_ == ThreadManager::UNINITIALIZED) {
+ if (!threadFactory_) {
+ throw InvalidArgumentException();
+ }
+ state_ = ThreadManager::STARTED;
+ monitor_.notifyAll();
+ }
+
+ while (state_ == STARTING) {
+ monitor_.wait();
+ }
+ }
+}
+
+void ThreadManager::Impl::stopImpl(bool join) {
+ bool doStop = false;
+ if (state_ == ThreadManager::STOPPED) {
+ return;
+ }
+
+ {
+ Synchronized s(monitor_);
+ if (state_ != ThreadManager::STOPPING &&
+ state_ != ThreadManager::JOINING &&
+ state_ != ThreadManager::STOPPED) {
+ doStop = true;
+ state_ = join ? ThreadManager::JOINING : ThreadManager::STOPPING;
+ }
+ }
+
+ if (doStop) {
+ removeWorker(workerCount_);
+ }
+
+ // XXX
+ // should be able to block here for transition to STOPPED since we're no
+ // using shared_ptrs
+
+ {
+ Synchronized s(monitor_);
+ state_ = ThreadManager::STOPPED;
+ }
+
+}
+
+void ThreadManager::Impl::removeWorker(size_t value) {
+ std::set<shared_ptr<Thread> > removedThreads;
+ {
+ Synchronized s(monitor_);
+ if (value > workerMaxCount_) {
+ throw InvalidArgumentException();
+ }
+
+ workerMaxCount_ -= value;
+
+ if (idleCount_ < value) {
+ for (size_t ix = 0; ix < idleCount_; ix++) {
+ monitor_.notify();
+ }
+ } else {
+ monitor_.notifyAll();
+ }
+ }
+
+ {
+ Synchronized s(workerMonitor_);
+
+ while (workerCount_ != workerMaxCount_) {
+ workerMonitor_.wait();
+ }
+
+ for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin(); ix != deadWorkers_.end(); ix++) {
+ idMap_.erase((*ix)->getId());
+ workers_.erase(*ix);
+ }
+
+ deadWorkers_.clear();
+ }
+}
+
+ bool ThreadManager::Impl::canSleep() {
+ const Thread::id_t id = threadFactory_->getCurrentThreadId();
+ return idMap_.find(id) == idMap_.end();
+ }
+
+ void ThreadManager::Impl::add(shared_ptr<Runnable> value,
+ int64_t timeout,
+ int64_t expiration) {
+ Guard g(mutex_, timeout);
+
+ if (!g) {
+ throw TimedOutException();
+ }
+
+ if (state_ != ThreadManager::STARTED) {
+ throw IllegalStateException("ThreadManager::Impl::add ThreadManager "
+ "not started");
+ }
+
+ removeExpiredTasks();
+ if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
+ if (canSleep() && timeout >= 0) {
+ while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
+ // This is thread safe because the mutex is shared between monitors.
+ maxMonitor_.wait(timeout);
+ }
+ } else {
+ throw TooManyPendingTasksException();
+ }
+ }
+
+ tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value, expiration)));
+
+ // If idle thread is available notify it, otherwise all worker threads are
+ // running and will get around to this task in time.
+ if (idleCount_ > 0) {
+ monitor_.notify();
+ }
+ }
+
+void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
+ (void) task;
+ Synchronized s(monitor_);
+ if (state_ != ThreadManager::STARTED) {
+ throw IllegalStateException("ThreadManager::Impl::remove ThreadManager not "
+ "started");
+ }
+}
+
+boost::shared_ptr<Runnable> ThreadManager::Impl::removeNextPending() {
+ Guard g(mutex_);
+ if (state_ != ThreadManager::STARTED) {
+ throw IllegalStateException("ThreadManager::Impl::removeNextPending "
+ "ThreadManager not started");
+ }
+
+ if (tasks_.empty()) {
+ return boost::shared_ptr<Runnable>();
+ }
+
+ shared_ptr<ThreadManager::Task> task = tasks_.front();
+ tasks_.pop();
+
+ return task->getRunnable();
+}
+
+void ThreadManager::Impl::removeExpiredTasks() {
+ int64_t now = 0LL; // we won't ask for the time untile we need it
+
+ // note that this loop breaks at the first non-expiring task
+ while (!tasks_.empty()) {
+ shared_ptr<ThreadManager::Task> task = tasks_.front();
+ if (task->getExpireTime() == 0LL) {
+ break;
+ }
+ if (now == 0LL) {
+ now = Util::currentTime();
+ }
+ if (task->getExpireTime() > now) {
+ break;
+ }
+ if (expireCallback_) {
+ expireCallback_(task->getRunnable());
+ }
+ tasks_.pop();
+ expiredCount_++;
+ }
+}
+
+
+void ThreadManager::Impl::setExpireCallback(ExpireCallback expireCallback) {
+ expireCallback_ = expireCallback;
+}
+
+class SimpleThreadManager : public ThreadManager::Impl {
+
+ public:
+ SimpleThreadManager(size_t workerCount=4, size_t pendingTaskCountMax=0) :
+ workerCount_(workerCount),
+ pendingTaskCountMax_(pendingTaskCountMax),
+ firstTime_(true) {
+ }
+
+ void start() {
+ ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_);
+ ThreadManager::Impl::start();
+ addWorker(workerCount_);
+ }
+
+ private:
+ const size_t workerCount_;
+ const size_t pendingTaskCountMax_;
+ bool firstTime_;
+ Monitor monitor_;
+};
+
+
+shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
+ return shared_ptr<ThreadManager>(new ThreadManager::Impl());
+}
+
+shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count, size_t pendingTaskCountMax) {
+ return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax));
+}
+
+}}} // apache::thrift::concurrency
+