You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by jk...@apache.org on 2016/11/12 20:21:59 UTC

thrift git commit: THRIFT-3932 fixed ThreadManager concurrency issues, added more tests in that area, did a little refactoring and prettying up along the way Client: C++

Repository: thrift
Updated Branches:
  refs/heads/master ea5ea8b4c -> df89913b8


THRIFT-3932 fixed ThreadManager concurrency issues, added more tests in that area, did a little refactoring and prettying up along the way
Client: C++

This closes #1103


Project: http://git-wip-us.apache.org/repos/asf/thrift/repo
Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/df89913b
Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/df89913b
Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/df89913b

Branch: refs/heads/master
Commit: df89913b8a952a46bc91264e0d96df9c69969efb
Parents: ea5ea8b
Author: James E. King, III <jk...@apache.org>
Authored: Sat Nov 12 15:16:30 2016 -0500
Committer: James E. King, III <jk...@apache.org>
Committed: Sat Nov 12 15:16:30 2016 -0500

----------------------------------------------------------------------
 appveyor.yml                                    |   5 +-
 build/docker/scripts/cmake.sh                   |   3 +-
 .../thrift/concurrency/BoostThreadFactory.cpp   |  47 +-
 .../src/thrift/concurrency/BoostThreadFactory.h |  15 +-
 .../thrift/concurrency/PosixThreadFactory.cpp   | 177 +++-----
 .../src/thrift/concurrency/PosixThreadFactory.h |  28 +-
 .../src/thrift/concurrency/StdThreadFactory.cpp |  46 +-
 .../src/thrift/concurrency/StdThreadFactory.h   |  15 +-
 lib/cpp/src/thrift/concurrency/Thread.h         |  30 +-
 .../src/thrift/concurrency/ThreadManager.cpp    | 382 ++++++++--------
 lib/cpp/src/thrift/concurrency/ThreadManager.h  |  34 +-
 lib/cpp/src/thrift/concurrency/Util.h           |   2 +-
 lib/cpp/src/thrift/server/TThreadPoolServer.cpp |   2 +-
 lib/cpp/src/thrift/server/TThreadedServer.cpp   |   5 +-
 lib/cpp/src/thrift/server/TThreadedServer.h     |  10 +-
 lib/cpp/test/TPipeInterruptTest.cpp             |   2 +-
 lib/cpp/test/TServerIntegrationTest.cpp         |  22 +-
 lib/cpp/test/concurrency/Tests.cpp              |  75 +++-
 lib/cpp/test/concurrency/ThreadFactoryTests.h   |  65 +--
 lib/cpp/test/concurrency/ThreadManagerTests.h   | 439 ++++++++++++++++---
 lib/cpp/test/concurrency/TimerManagerTests.h    |  21 +-
 lib/cpp/test/processor/ProcessorTest.cpp        |   2 +-
 test/cpp/src/TestServer.cpp                     |   2 +-
 23 files changed, 805 insertions(+), 624 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/thrift/blob/df89913b/appveyor.yml
----------------------------------------------------------------------
diff --git a/appveyor.yml b/appveyor.yml
index cfd8b51..03ee295 100755
--- a/appveyor.yml
+++ b/appveyor.yml
@@ -89,6 +89,5 @@ build_script:
 # CTest fails to invoke ant seemingly due to "ant.bat" v.s. "ant" (shell script) conflict.
 # Currently, everything that involves OpenSSL seems to hang forever on our Appveyor setup.
 # Also a few C++ tests hang (on Appveyor or on Windows in general).
-- ctest -C Release --timeout 600 -VV -E "(concurrency_test|processor_test|TInterruptTest|StressTestNonBlocking|OpenSSLManualInitTest|SecurityTest|PythonTestSSLSocket|python_test$|^Java)"
-
-#TODO make it perfect ;-r
+- ctest -C Release --timeout 600 -VV -E "(StressTestNonBlocking|PythonTestSSLSocket|python_test$|^Java)"
+# TODO make it perfect ;-r

http://git-wip-us.apache.org/repos/asf/thrift/blob/df89913b/build/docker/scripts/cmake.sh
----------------------------------------------------------------------
diff --git a/build/docker/scripts/cmake.sh b/build/docker/scripts/cmake.sh
index 26ccb10..6508e71 100755
--- a/build/docker/scripts/cmake.sh
+++ b/build/docker/scripts/cmake.sh
@@ -19,4 +19,5 @@ for LIB in $BUILD_LIBS; do
 done
 $MAKEPROG -j3
 cpack
-ctest -VV -E "(concurrency_test|processor_test)"
+ctest -VV 
+# was: -E "(concurrency_test|processor_test)"

http://git-wip-us.apache.org/repos/asf/thrift/blob/df89913b/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp b/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp
index 96cb6d6..a72d38b 100644
--- a/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp
+++ b/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp
@@ -126,54 +126,19 @@ void* BoostThread::threadMain(void* arg) {
   return (void*)0;
 }
 
-/**
- * POSIX Thread factory implementation
- */
-class BoostThreadFactory::Impl {
-
-private:
-  bool detached_;
-
-public:
-  Impl(bool detached) : detached_(detached) {}
-
-  /**
-   * Creates a new POSIX thread to run the runnable object
-   *
-   * @param runnable A runnable object
-   */
-  shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
-    shared_ptr<BoostThread> result = shared_ptr<BoostThread>(new BoostThread(detached_, runnable));
-    result->weakRef(result);
-    runnable->thread(result);
-    return result;
-  }
-
-  bool isDetached() const { return detached_; }
-
-  void setDetached(bool value) { detached_ = value; }
-
-  Thread::id_t getCurrentThreadId() const { return boost::this_thread::get_id(); }
-};
-
 BoostThreadFactory::BoostThreadFactory(bool detached)
-  : impl_(new BoostThreadFactory::Impl(detached)) {
+  : ThreadFactory(detached) {
 }
 
 shared_ptr<Thread> BoostThreadFactory::newThread(shared_ptr<Runnable> runnable) const {
-  return impl_->newThread(runnable);
-}
-
-bool BoostThreadFactory::isDetached() const {
-  return impl_->isDetached();
-}
-
-void BoostThreadFactory::setDetached(bool value) {
-  impl_->setDetached(value);
+  shared_ptr<BoostThread> result = shared_ptr<BoostThread>(new BoostThread(isDetached(), runnable));
+  result->weakRef(result);
+  runnable->thread(result);
+  return result;
 }
 
 Thread::id_t BoostThreadFactory::getCurrentThreadId() const {
-  return impl_->getCurrentThreadId();
+  return boost::this_thread::get_id();
 }
 }
 }

http://git-wip-us.apache.org/repos/asf/thrift/blob/df89913b/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h b/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h
index e6d1a56..7973245 100644
--- a/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h
+++ b/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h
@@ -55,21 +55,8 @@ public:
 
   // From ThreadFactory;
   Thread::id_t getCurrentThreadId() const;
-
-  /**
-   * Sets detached mode of threads
-   */
-  virtual void setDetached(bool detached);
-
-  /**
-   * Gets current detached mode
-   */
-  virtual bool isDetached() const;
-
-private:
-  class Impl;
-  boost::shared_ptr<Impl> impl_;
 };
+
 }
 }
 } // apache::thrift::concurrency

http://git-wip-us.apache.org/repos/asf/thrift/blob/df89913b/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp
index 05a3c02..6a0b47c 100644
--- a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp
+++ b/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp
@@ -214,149 +214,104 @@ void* PthreadThread::threadMain(void* arg) {
 }
 
 /**
- * POSIX Thread factory implementation
+ * Converts generic posix thread schedule policy enums into pthread
+ * API values.
  */
-class PosixThreadFactory::Impl {
-
-private:
-  POLICY policy_;
-  PRIORITY priority_;
-  int stackSize_;
-  bool detached_;
-
-  /**
-   * Converts generic posix thread schedule policy enums into pthread
-   * API values.
-   */
-  static int toPthreadPolicy(POLICY policy) {
-    switch (policy) {
-    case OTHER:
-      return SCHED_OTHER;
-    case FIFO:
-      return SCHED_FIFO;
-    case ROUND_ROBIN:
-      return SCHED_RR;
-    }
+static int toPthreadPolicy(PosixThreadFactory::POLICY policy) {
+  switch (policy) {
+  case PosixThreadFactory::OTHER:
     return SCHED_OTHER;
+  case PosixThreadFactory::FIFO:
+    return SCHED_FIFO;
+  case PosixThreadFactory::ROUND_ROBIN:
+    return SCHED_RR;
   }
+  return SCHED_OTHER;
+}
 
-  /**
-   * Converts relative thread priorities to absolute value based on posix
-   * thread scheduler policy
-   *
-   *  The idea is simply to divide up the priority range for the given policy
-   * into the correpsonding relative priority level (lowest..highest) and
-   * then pro-rate accordingly.
-   */
-  static int toPthreadPriority(POLICY policy, PRIORITY priority) {
-    int pthread_policy = toPthreadPolicy(policy);
-    int min_priority = 0;
-    int max_priority = 0;
+/**
+ * Converts relative thread priorities to absolute value based on posix
+ * thread scheduler policy
+ *
+ *  The idea is simply to divide up the priority range for the given policy
+ * into the correpsonding relative priority level (lowest..highest) and
+ * then pro-rate accordingly.
+ */
+static int toPthreadPriority(PosixThreadFactory::POLICY policy, PosixThreadFactory::PRIORITY priority) {
+  int pthread_policy = toPthreadPolicy(policy);
+  int min_priority = 0;
+  int max_priority = 0;
 #ifdef HAVE_SCHED_GET_PRIORITY_MIN
-    min_priority = sched_get_priority_min(pthread_policy);
+  min_priority = sched_get_priority_min(pthread_policy);
 #endif
 #ifdef HAVE_SCHED_GET_PRIORITY_MAX
-    max_priority = sched_get_priority_max(pthread_policy);
+  max_priority = sched_get_priority_max(pthread_policy);
 #endif
-    int quanta = (HIGHEST - LOWEST) + 1;
-    float stepsperquanta = (float)(max_priority - min_priority) / quanta;
-
-    if (priority <= HIGHEST) {
-      return (int)(min_priority + stepsperquanta * priority);
-    } else {
-      // should never get here for priority increments.
-      assert(false);
-      return (int)(min_priority + stepsperquanta * NORMAL);
-    }
-  }
-
-public:
-  Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached)
-    : policy_(policy), priority_(priority), stackSize_(stackSize), detached_(detached) {}
-
-  /**
-   * Creates a new POSIX thread to run the runnable object
-   *
-   * @param runnable A runnable object
-   */
-  shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
-    shared_ptr<PthreadThread> result
-        = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_),
-                                                      toPthreadPriority(policy_, priority_),
-                                                      stackSize_,
-                                                      detached_,
-                                                      runnable));
-    result->weakRef(result);
-    runnable->thread(result);
-    return result;
-  }
-
-  int getStackSize() const { return stackSize_; }
-
-  void setStackSize(int value) { stackSize_ = value; }
-
-  PRIORITY getPriority() const { return priority_; }
-
-  /**
-   * Sets priority.
-   *
-   *  XXX
-   *  Need to handle incremental priorities properly.
-   */
-  void setPriority(PRIORITY value) { priority_ = value; }
-
-  bool isDetached() const { return detached_; }
-
-  void setDetached(bool value) { detached_ = value; }
-
-  Thread::id_t getCurrentThreadId() const {
-
-#ifndef _WIN32
-    return (Thread::id_t)pthread_self();
-#else
-    return (Thread::id_t)pthread_self().p;
-#endif // _WIN32
+  int quanta = (PosixThreadFactory::HIGHEST - PosixThreadFactory::LOWEST) + 1;
+  float stepsperquanta = (float)(max_priority - min_priority) / quanta;
+
+  if (priority <= PosixThreadFactory::HIGHEST) {
+    return (int)(min_priority + stepsperquanta * priority);
+  } else {
+    // should never get here for priority increments.
+    assert(false);
+    return (int)(min_priority + stepsperquanta * PosixThreadFactory::NORMAL);
   }
-};
+}
 
 PosixThreadFactory::PosixThreadFactory(POLICY policy,
                                        PRIORITY priority,
                                        int stackSize,
                                        bool detached)
-  : impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {
+  : ThreadFactory(detached),
+    policy_(policy),
+    priority_(priority),
+    stackSize_(stackSize) {
+}
+
+PosixThreadFactory::PosixThreadFactory(bool detached)
+  : ThreadFactory(detached),
+    policy_(ROUND_ROBIN),
+    priority_(NORMAL),
+    stackSize_(1) {
 }
 
 shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const {
-  return impl_->newThread(runnable);
+  shared_ptr<PthreadThread> result
+      = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_),
+                                                    toPthreadPriority(policy_, priority_),
+                                                    stackSize_,
+                                                    isDetached(),
+                                                    runnable));
+  result->weakRef(result);
+  runnable->thread(result);
+  return result;
 }
 
 int PosixThreadFactory::getStackSize() const {
-  return impl_->getStackSize();
+  return stackSize_;
 }
 
 void PosixThreadFactory::setStackSize(int value) {
-  impl_->setStackSize(value);
+  stackSize_ = value;
 }
 
 PosixThreadFactory::PRIORITY PosixThreadFactory::getPriority() const {
-  return impl_->getPriority();
-}
-
-void PosixThreadFactory::setPriority(PosixThreadFactory::PRIORITY value) {
-  impl_->setPriority(value);
-}
-
-bool PosixThreadFactory::isDetached() const {
-  return impl_->isDetached();
+  return priority_;
 }
 
-void PosixThreadFactory::setDetached(bool value) {
-  impl_->setDetached(value);
+void PosixThreadFactory::setPriority(PRIORITY value) {
+  priority_ = value;
 }
 
 Thread::id_t PosixThreadFactory::getCurrentThreadId() const {
-  return impl_->getCurrentThreadId();
+#ifndef _WIN32
+  return (Thread::id_t)pthread_self();
+#else
+  return (Thread::id_t)pthread_self().p;
+#endif // _WIN32
 }
+
 }
 }
 } // apache::thrift::concurrency

http://git-wip-us.apache.org/repos/asf/thrift/blob/df89913b/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h b/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h
index b26d296..c1bbe5c 100644
--- a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h
+++ b/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h
@@ -74,12 +74,19 @@ public:
    *
    * By default threads are not joinable.
    */
-
   PosixThreadFactory(POLICY policy = ROUND_ROBIN,
                      PRIORITY priority = NORMAL,
                      int stackSize = 1,
                      bool detached = true);
 
+  /**
+   * Provide a constructor compatible with the other factories
+   * The default policy is POLICY::ROUND_ROBIN.
+   * The default priority is PRIORITY::NORMAL.
+   * The default stackSize is 1.
+   */
+  PosixThreadFactory(bool detached);
+
   // From ThreadFactory;
   boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const;
 
@@ -87,14 +94,14 @@ public:
   Thread::id_t getCurrentThreadId() const;
 
   /**
-   * Gets stack size for created threads
+   * Gets stack size for newly created threads
    *
    * @return int size in megabytes
    */
   virtual int getStackSize() const;
 
   /**
-   * Sets stack size for created threads
+   * Sets stack size for newly created threads
    *
    * @param value size in megabytes
    */
@@ -110,19 +117,10 @@ public:
    */
   virtual void setPriority(PRIORITY priority);
 
-  /**
-   * Sets detached mode of threads
-   */
-  virtual void setDetached(bool detached);
-
-  /**
-   * Gets current detached mode
-   */
-  virtual bool isDetached() const;
-
 private:
-  class Impl;
-  boost::shared_ptr<Impl> impl_;
+  POLICY policy_;
+  PRIORITY priority_;
+  int stackSize_;
 };
 }
 }

http://git-wip-us.apache.org/repos/asf/thrift/blob/df89913b/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp b/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp
index d57e7ec..66c7e75 100644
--- a/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp
+++ b/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp
@@ -116,53 +116,17 @@ void StdThread::threadMain(boost::shared_ptr<StdThread> thread) {
   return;
 }
 
-/**
- * std::thread factory implementation
- */
-class StdThreadFactory::Impl {
-
-private:
-  bool detached_;
-
-public:
-  Impl(bool detached) : detached_(detached) {}
-
-  /**
-   * Creates a new std::thread to run the runnable object
-   *
-   * @param runnable A runnable object
-   */
-  boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const {
-    boost::shared_ptr<StdThread> result
-        = boost::shared_ptr<StdThread>(new StdThread(detached_, runnable));
-    runnable->thread(result);
-    return result;
-  }
-
-  bool isDetached() const { return detached_; }
-
-  void setDetached(bool value) { detached_ = value; }
-
-  Thread::id_t getCurrentThreadId() const { return std::this_thread::get_id(); }
-};
-
-StdThreadFactory::StdThreadFactory(bool detached) : impl_(new StdThreadFactory::Impl(detached)) {
+StdThreadFactory::StdThreadFactory(bool detached) : ThreadFactory(detached) {
 }
 
 boost::shared_ptr<Thread> StdThreadFactory::newThread(boost::shared_ptr<Runnable> runnable) const {
-  return impl_->newThread(runnable);
-}
-
-bool StdThreadFactory::isDetached() const {
-  return impl_->isDetached();
-}
-
-void StdThreadFactory::setDetached(bool value) {
-  impl_->setDetached(value);
+  boost::shared_ptr<StdThread> result = boost::shared_ptr<StdThread>(new StdThread(isDetached(), runnable));
+  runnable->thread(result);
+  return result;
 }
 
 Thread::id_t StdThreadFactory::getCurrentThreadId() const {
-  return impl_->getCurrentThreadId();
+  return std::this_thread::get_id();
 }
 }
 }

http://git-wip-us.apache.org/repos/asf/thrift/blob/df89913b/lib/cpp/src/thrift/concurrency/StdThreadFactory.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/StdThreadFactory.h b/lib/cpp/src/thrift/concurrency/StdThreadFactory.h
index fb86bbf..88f00be 100644
--- a/lib/cpp/src/thrift/concurrency/StdThreadFactory.h
+++ b/lib/cpp/src/thrift/concurrency/StdThreadFactory.h
@@ -52,21 +52,8 @@ public:
 
   // From ThreadFactory;
   Thread::id_t getCurrentThreadId() const;
-
-  /**
-   * Sets detached mode of threads
-   */
-  virtual void setDetached(bool detached);
-
-  /**
-   * Gets current detached mode
-   */
-  virtual bool isDetached() const;
-
-private:
-  class Impl;
-  boost::shared_ptr<Impl> impl_;
 };
+
 }
 }
 } // apache::thrift::concurrency

http://git-wip-us.apache.org/repos/asf/thrift/blob/df89913b/lib/cpp/src/thrift/concurrency/Thread.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/Thread.h b/lib/cpp/src/thrift/concurrency/Thread.h
index f7c7bd6..2e15489 100644
--- a/lib/cpp/src/thrift/concurrency/Thread.h
+++ b/lib/cpp/src/thrift/concurrency/Thread.h
@@ -108,8 +108,9 @@ public:
   virtual void start() = 0;
 
   /**
-   * Join this thread. Current thread blocks until this target thread
-   * completes.
+   * Join this thread. If this thread is joinable, the calling thread blocks
+   * until this thread completes.  If the target thread is not joinable, then
+   * nothing happens.
    */
   virtual void join() = 0;
 
@@ -135,30 +136,39 @@ private:
  * object for execution
  */
 class ThreadFactory {
+protected:
+  ThreadFactory(bool detached) : detached_(detached) { }
+
 public:
-  virtual ~ThreadFactory() {}
+  virtual ~ThreadFactory() { }
 
   /**
    * Gets current detached mode
    */
-  virtual bool isDetached() const = 0;
+  bool isDetached() const { return detached_; }
 
   /**
-   * Create a new thread.
+   * Sets the detached disposition of newly created threads.
    */
-  virtual boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const = 0;
+  void setDetached(bool detached) { detached_ = detached; }
 
   /**
-   * Sets detached mode of threads
+   * Create a new thread.
    */
-  virtual void setDetached(bool detached) = 0;
-
-  static const Thread::id_t unknown_thread_id;
+  virtual boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const = 0;
 
   /**
    * Gets the current thread id or unknown_thread_id if the current thread is not a thrift thread
    */
   virtual Thread::id_t getCurrentThreadId() const = 0;
+
+  /**
+   * For code readability define the unknown/undefined thread id
+   */
+  static const Thread::id_t unknown_thread_id;
+
+private:
+  bool detached_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/thrift/blob/df89913b/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/ThreadManager.cpp b/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
index 24bfeec..c4726dd 100644
--- a/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
+++ b/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
@@ -26,8 +26,8 @@
 
 #include <boost/shared_ptr.hpp>
 
-#include <assert.h>
-#include <queue>
+#include <stdexcept>
+#include <deque>
 #include <set>
 
 #if defined(DEBUG)
@@ -49,6 +49,9 @@ using boost::dynamic_pointer_cast;
  * it maintains statistics on number of idle threads, number of active threads,
  * task backlog, and average wait and service times.
  *
+ * There are three different monitors used for signaling different conditions
+ * however they all share the same mutex_.
+ *
  * @version $Id:$
  */
 class ThreadManager::Impl : public ThreadManager {
@@ -62,25 +65,26 @@ public:
       expiredCount_(0),
       state_(ThreadManager::UNINITIALIZED),
       monitor_(&mutex_),
-      maxMonitor_(&mutex_) {}
+      maxMonitor_(&mutex_),
+      workerMonitor_(&mutex_) {}
 
   ~Impl() { stop(); }
 
   void start();
-
-  void stop() { stopImpl(false); }
-
-  void join() { stopImpl(true); }
+  void stop();
 
   ThreadManager::STATE state() const { return state_; }
 
   shared_ptr<ThreadFactory> threadFactory() const {
-    Synchronized s(monitor_);
+    Guard g(mutex_);
     return threadFactory_;
   }
 
   void threadFactory(shared_ptr<ThreadFactory> value) {
-    Synchronized s(monitor_);
+    Guard g(mutex_);
+    if (threadFactory_ && threadFactory_->isDetached() != value->isDetached()) {
+      throw InvalidArgumentException();
+    }
     threadFactory_ = value;
   }
 
@@ -91,51 +95,65 @@ public:
   size_t idleWorkerCount() const { return idleCount_; }
 
   size_t workerCount() const {
-    Synchronized s(monitor_);
+    Guard g(mutex_);
     return workerCount_;
   }
 
   size_t pendingTaskCount() const {
-    Synchronized s(monitor_);
+    Guard g(mutex_);
     return tasks_.size();
   }
 
   size_t totalTaskCount() const {
-    Synchronized s(monitor_);
+    Guard g(mutex_);
     return tasks_.size() + workerCount_ - idleCount_;
   }
 
   size_t pendingTaskCountMax() const {
-    Synchronized s(monitor_);
+    Guard g(mutex_);
     return pendingTaskCountMax_;
   }
 
   size_t expiredTaskCount() {
-    Synchronized s(monitor_);
-    size_t result = expiredCount_;
-    expiredCount_ = 0;
-    return result;
+    Guard g(mutex_);
+    return expiredCount_;
   }
 
   void pendingTaskCountMax(const size_t value) {
-    Synchronized s(monitor_);
+    Guard g(mutex_);
     pendingTaskCountMax_ = value;
   }
 
-  bool canSleep();
-
   void add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration);
 
   void remove(shared_ptr<Runnable> task);
 
   shared_ptr<Runnable> removeNextPending();
 
-  void removeExpiredTasks();
+  void removeExpiredTasks() {
+    removeExpired(false);
+  }
 
   void setExpireCallback(ExpireCallback expireCallback);
 
 private:
-  void stopImpl(bool join);
+  /**
+   * Remove one or more expired tasks.
+   * \param[in]  justOne  if true, try to remove just one task and return
+   */
+  void removeExpired(bool justOne);
+
+  /**
+   * \returns whether it is acceptable to block, depending on the current thread id
+   */
+  bool canSleep() const;
+
+  /**
+   * Lowers the maximum worker count and blocks until enough worker threads complete
+   * to get to the new maximum worker limit.  The caller is responsible for acquiring
+   * a lock on the class mutex_.
+   */
+  void removeWorkersUnderLock(size_t value);
 
   size_t workerCount_;
   size_t workerMaxCount_;
@@ -148,11 +166,12 @@ private:
   shared_ptr<ThreadFactory> threadFactory_;
 
   friend class ThreadManager::Task;
-  std::queue<shared_ptr<Task> > tasks_;
+  typedef std::deque<shared_ptr<Task> > TaskQueue;
+  TaskQueue tasks_;
   Mutex mutex_;
   Monitor monitor_;
   Monitor maxMonitor_;
-  Monitor workerMonitor_;
+  Monitor workerMonitor_;       // used to synchronize changes in worker count
 
   friend class ThreadManager::Worker;
   std::set<shared_ptr<Thread> > workers_;
@@ -163,7 +182,7 @@ private:
 class ThreadManager::Task : public Runnable {
 
 public:
-  enum STATE { WAITING, EXECUTING, CANCELLED, COMPLETE };
+  enum STATE { WAITING, EXECUTING, TIMEDOUT, COMPLETE };
 
   Task(shared_ptr<Runnable> runnable, int64_t expiration = 0LL)
     : runnable_(runnable),
@@ -194,7 +213,7 @@ class ThreadManager::Worker : public Runnable {
   enum STATE { UNINITIALIZED, STARTING, STARTED, STOPPING, STOPPED };
 
 public:
-  Worker(ThreadManager::Impl* manager) : manager_(manager), state_(UNINITIALIZED), idle_(false) {}
+  Worker(ThreadManager::Impl* manager) : manager_(manager), state_(UNINITIALIZED) {}
 
   ~Worker() {}
 
@@ -212,78 +231,82 @@ public:
    * execute.
    */
   void run() {
-    bool active = false;
+    Guard g(manager_->mutex_);
+
+    /**
+     * This method has three parts; one is to check for and account for
+     * admitting a task which happens under a lock.  Then the lock is released
+     * and the task itself is executed.  Finally we do some accounting
+     * under lock again when the task completes.
+     */
+
+    /**
+     * Admitting
+     */
+
     /**
      * Increment worker semaphore and notify manager if worker count reached
      * desired max
-     *
-     * Note: We have to release the monitor and acquire the workerMonitor
-     * since that is what the manager blocks on for worker add/remove
      */
-    {
-      bool notifyManager = false;
-      {
-        Synchronized s(manager_->monitor_);
-        active = manager_->workerCount_ < manager_->workerMaxCount_;
-        if (active) {
-          manager_->workerCount_++;
-          notifyManager = manager_->workerCount_ == manager_->workerMaxCount_;
-        }
-      }
-
-      if (notifyManager) {
-        Synchronized s(manager_->workerMonitor_);
+    bool active = manager_->workerCount_ < manager_->workerMaxCount_;
+    if (active) {
+      if (++manager_->workerCount_ == manager_->workerMaxCount_) {
         manager_->workerMonitor_.notify();
       }
     }
 
     while (active) {
-      shared_ptr<ThreadManager::Task> task;
-
       /**
-       * While holding manager monitor block for non-empty task queue (Also
-       * check that the thread hasn't been requested to stop). Once the queue
-       * is non-empty, dequeue a task, release monitor, and execute. If the
-       * worker max count has been decremented such that we exceed it, mark
-       * ourself inactive, decrement the worker count and notify the manager
-       * (technically we're notifying the next blocked thread but eventually
-       * the manager will see it.
-       */
-      {
-        Guard g(manager_->mutex_);
+        * While holding manager monitor block for non-empty task queue (Also
+        * check that the thread hasn't been requested to stop). Once the queue
+        * is non-empty, dequeue a task, release monitor, and execute. If the
+        * worker max count has been decremented such that we exceed it, mark
+        * ourself inactive, decrement the worker count and notify the manager
+        * (technically we're notifying the next blocked thread but eventually
+        * the manager will see it.
+        */
+      active = isActive();
+
+      while (active && manager_->tasks_.empty()) {
+        manager_->idleCount_++;
+        manager_->monitor_.wait();
         active = isActive();
+        manager_->idleCount_--;
+      }
 
-        while (active && manager_->tasks_.empty()) {
-          manager_->idleCount_++;
-          idle_ = true;
-          manager_->monitor_.wait();
-          active = isActive();
-          idle_ = false;
-          manager_->idleCount_--;
-        }
-
-        if (active) {
-          manager_->removeExpiredTasks();
+      shared_ptr<ThreadManager::Task> task;
 
-          if (!manager_->tasks_.empty()) {
-            task = manager_->tasks_.front();
-            manager_->tasks_.pop();
-            if (task->state_ == ThreadManager::Task::WAITING) {
-              task->state_ = ThreadManager::Task::EXECUTING;
-            }
+      if (active) {
+        if (!manager_->tasks_.empty()) {
+          task = manager_->tasks_.front();
+          manager_->tasks_.pop_front();
+          if (task->state_ == ThreadManager::Task::WAITING) {
+            // If the state is changed to anything other than EXECUTING or TIMEDOUT here
+            // then the execution loop needs to be changed below.
+            task->state_ =
+                (task->getExpireTime() && task->getExpireTime() < Util::currentTime()) ?
+                    ThreadManager::Task::TIMEDOUT :
+                    ThreadManager::Task::EXECUTING;
           }
+        }
 
-          /* If we have a pending task max and we just dropped below it, wakeup any
-             thread that might be blocked on add. */
-          if (manager_->pendingTaskCountMax_ != 0
-              && manager_->tasks_.size() <= manager_->pendingTaskCountMax_ - 1) {
-            manager_->maxMonitor_.notify();
-          }
+        /* If we have a pending task max and we just dropped below it, wakeup any
+            thread that might be blocked on add. */
+        if (manager_->pendingTaskCountMax_ != 0
+            && manager_->tasks_.size() <= manager_->pendingTaskCountMax_ - 1) {
+          manager_->maxMonitor_.notify();
         }
       }
 
+      /**
+       * Execution - not holding a lock
+       */
       if (task) {
         if (task->state_ == ThreadManager::Task::EXECUTING) {
+
+          // Release the lock so we can run the task without blocking the thread manager
+          manager_->mutex_.unlock();
+
           try {
             task->run();
           } catch (const std::exception& e) {
@@ -291,29 +314,31 @@ public:
           } catch (...) {
             GlobalOutput.printf("[ERROR] task->run() raised an unknown exception");
           }
+
+          // Re-acquire the lock to proceed in the thread manager
+          manager_->mutex_.lock();
+
+        } else if (manager_->expireCallback_) {
+          // The only other state the task could have been in is TIMEDOUT (see above)
+          manager_->expireCallback_(task->getRunnable());
+          manager_->expiredCount_++;
         }
       }
     }
 
-    {
-      Synchronized s(manager_->workerMonitor_);
-      manager_->deadWorkers_.insert(this->thread());
-      idle_ = true;
-      manager_->workerCount_--;
-      bool notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_);
-      if (notifyManager) {
-        manager_->workerMonitor_.notify();
-      }
+    /**
+     * Final accounting for the worker thread that is done working
+     */
+    manager_->deadWorkers_.insert(this->thread());
+    if (--manager_->workerCount_ == manager_->workerMaxCount_) {
+      manager_->workerMonitor_.notify();
     }
-
-    return;
   }
 
 private:
   ThreadManager::Impl* manager_;
   friend class ThreadManager::Impl;
   STATE state_;
-  bool idle_;
 };
 
 void ThreadManager::Impl::addWorker(size_t value) {
@@ -324,11 +349,9 @@ void ThreadManager::Impl::addWorker(size_t value) {
     newThreads.insert(threadFactory_->newThread(worker));
   }
 
-  {
-    Synchronized s(monitor_);
-    workerMaxCount_ += value;
-    workers_.insert(newThreads.begin(), newThreads.end());
-  }
+  Guard g(mutex_);
+  workerMaxCount_ += value;
+  workers_.insert(newThreads.begin(), newThreads.end());
 
   for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end();
        ++ix) {
@@ -339,103 +362,92 @@ void ThreadManager::Impl::addWorker(size_t value) {
     idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->getId(), *ix));
   }
 
-  {
-    Synchronized s(workerMonitor_);
-    while (workerCount_ != workerMaxCount_) {
-      workerMonitor_.wait();
-    }
+  while (workerCount_ != workerMaxCount_) {
+    workerMonitor_.wait();
   }
 }
 
 void ThreadManager::Impl::start() {
-
+  Guard g(mutex_);
   if (state_ == ThreadManager::STOPPED) {
     return;
   }
 
-  {
-    Synchronized s(monitor_);
-    if (state_ == ThreadManager::UNINITIALIZED) {
-      if (!threadFactory_) {
-        throw InvalidArgumentException();
-      }
-      state_ = ThreadManager::STARTED;
-      monitor_.notifyAll();
+  if (state_ == ThreadManager::UNINITIALIZED) {
+    if (!threadFactory_) {
+      throw InvalidArgumentException();
     }
+    state_ = ThreadManager::STARTED;
+    monitor_.notifyAll();
+  }
 
-    while (state_ == STARTING) {
-      monitor_.wait();
-    }
+  while (state_ == STARTING) {
+    monitor_.wait();
   }
 }
 
-void ThreadManager::Impl::stopImpl(bool join) {
+void ThreadManager::Impl::stop() {
+  Guard g(mutex_);
   bool doStop = false;
-  if (state_ == ThreadManager::STOPPED) {
-    return;
-  }
 
-  {
-    Synchronized s(monitor_);
-    if (state_ != ThreadManager::STOPPING && state_ != ThreadManager::JOINING
-        && state_ != ThreadManager::STOPPED) {
-      doStop = true;
-      state_ = join ? ThreadManager::JOINING : ThreadManager::STOPPING;
-    }
+  if (state_ != ThreadManager::STOPPING && state_ != ThreadManager::JOINING
+      && state_ != ThreadManager::STOPPED) {
+    doStop = true;
+    state_ = ThreadManager::JOINING;
   }
 
   if (doStop) {
-    removeWorker(workerCount_);
+    removeWorkersUnderLock(workerCount_);
   }
 
-  // XXX
-  // should be able to block here for transition to STOPPED since we're no
-  // using shared_ptrs
-
-  {
-    Synchronized s(monitor_);
-    state_ = ThreadManager::STOPPED;
-  }
+  state_ = ThreadManager::STOPPED;
 }
 
 void ThreadManager::Impl::removeWorker(size_t value) {
-  std::set<shared_ptr<Thread> > removedThreads;
-  {
-    Synchronized s(monitor_);
-    if (value > workerMaxCount_) {
-      throw InvalidArgumentException();
-    }
+  Guard g(mutex_);
+  removeWorkersUnderLock(value);
+}
+
+void ThreadManager::Impl::removeWorkersUnderLock(size_t value) {
+  if (value > workerMaxCount_) {
+    throw InvalidArgumentException();
+  }
 
-    workerMaxCount_ -= value;
+  workerMaxCount_ -= value;
 
-    if (idleCount_ < value) {
-      for (size_t ix = 0; ix < idleCount_; ix++) {
-        monitor_.notify();
-      }
-    } else {
-      monitor_.notifyAll();
+  if (idleCount_ > value) {
+    // There are more idle workers than we need to remove,
+    // so notify enough of them so they can terminate.
+    for (size_t ix = 0; ix < value; ix++) {
+      monitor_.notify();
     }
+  } else {
+    // There are as many or less idle workers than we need to remove,
+    // so just notify them all so they can terminate.
+    monitor_.notifyAll();
   }
 
-  {
-    Synchronized s(workerMonitor_);
+  while (workerCount_ != workerMaxCount_) {
+    workerMonitor_.wait();
+  }
 
-    while (workerCount_ != workerMaxCount_) {
-      workerMonitor_.wait();
-    }
+  for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin();
+       ix != deadWorkers_.end();
+       ++ix) {
 
-    for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin();
-         ix != deadWorkers_.end();
-         ++ix) {
-      idMap_.erase((*ix)->getId());
-      workers_.erase(*ix);
+    // when used with a joinable thread factory, we join the threads as we remove them
+    if (!threadFactory_->isDetached()) {
+      (*ix)->join();
     }
 
-    deadWorkers_.clear();
+    idMap_.erase((*ix)->getId());
+    workers_.erase(*ix);
   }
+
+  deadWorkers_.clear();
 }
 
-bool ThreadManager::Impl::canSleep() {
+bool ThreadManager::Impl::canSleep() const {
   const Thread::id_t id = threadFactory_->getCurrentThreadId();
   return idMap_.find(id) == idMap_.end();
 }
@@ -453,7 +465,11 @@ void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout, int64
         "not started");
   }
 
-  removeExpiredTasks();
+  // if we're at a limit, remove an expired task to see if the limit clears
+  if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
+    removeExpired(true);
+  }
+
   if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
     if (canSleep() && timeout >= 0) {
       while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
@@ -465,7 +481,7 @@ void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout, int64
     }
   }
 
-  tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value, expiration)));
+  tasks_.push_back(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value, expiration)));
 
   // If idle thread is available notify it, otherwise all worker threads are
   // running and will get around to this task in time.
@@ -475,13 +491,21 @@ void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout, int64
 }
 
 void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
-  (void)task;
-  Synchronized s(monitor_);
+  Guard g(mutex_);
   if (state_ != ThreadManager::STARTED) {
     throw IllegalStateException(
         "ThreadManager::Impl::remove ThreadManager not "
         "started");
   }
+
+  for (TaskQueue::iterator it = tasks_.begin(); it != tasks_.end(); ++it)
+  {
+    if ((*it)->getRunnable() == task)
+    {
+      tasks_.erase(it);
+      return;
+    }
+  }
 }
 
 boost::shared_ptr<Runnable> ThreadManager::Impl::removeNextPending() {
@@ -497,35 +521,40 @@ boost::shared_ptr<Runnable> ThreadManager::Impl::removeNextPending() {
   }
 
   shared_ptr<ThreadManager::Task> task = tasks_.front();
-  tasks_.pop();
+  tasks_.pop_front();
 
   return task->getRunnable();
 }
 
-void ThreadManager::Impl::removeExpiredTasks() {
-  int64_t now = 0LL; // we won't ask for the time untile we need it
+void ThreadManager::Impl::removeExpired(bool justOne) {
+  // this is always called under a lock
+  int64_t now = 0LL;
 
-  // note that this loop breaks at the first non-expiring task
-  while (!tasks_.empty()) {
-    shared_ptr<ThreadManager::Task> task = tasks_.front();
-    if (task->getExpireTime() == 0LL) {
-      break;
-    }
+  for (TaskQueue::iterator it = tasks_.begin(); it != tasks_.end(); )
+  {
     if (now == 0LL) {
       now = Util::currentTime();
     }
-    if (task->getExpireTime() > now) {
-      break;
+
+    if ((*it)->getExpireTime() > 0LL && (*it)->getExpireTime() < now) {
+      if (expireCallback_) {
+        expireCallback_((*it)->getRunnable());
+      }
+      it = tasks_.erase(it);
+      ++expiredCount_;
+      if (justOne) {
+        return;
+      }
     }
-    if (expireCallback_) {
-      expireCallback_(task->getRunnable());
+    else
+    {
+      ++it;
     }
-    tasks_.pop();
-    expiredCount_++;
   }
 }
 
 void ThreadManager::Impl::setExpireCallback(ExpireCallback expireCallback) {
+  Guard g(mutex_);
   expireCallback_ = expireCallback;
 }
 
@@ -544,7 +573,6 @@ public:
 private:
   const size_t workerCount_;
   const size_t pendingTaskCountMax_;
-  Monitor monitor_;
 };
 
 shared_ptr<ThreadManager> ThreadManager::newThreadManager() {

http://git-wip-us.apache.org/repos/asf/thrift/blob/df89913b/lib/cpp/src/thrift/concurrency/ThreadManager.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/ThreadManager.h b/lib/cpp/src/thrift/concurrency/ThreadManager.h
index 2112845..d8bf71b 100644
--- a/lib/cpp/src/thrift/concurrency/ThreadManager.h
+++ b/lib/cpp/src/thrift/concurrency/ThreadManager.h
@@ -71,30 +71,45 @@ public:
 
   /**
    * Stops the thread manager. Aborts all remaining unprocessed task, shuts
-   * down all created worker threads, and realeases all allocated resources.
+   * down all created worker threads, and releases all allocated resources.
    * This method blocks for all worker threads to complete, thus it can
    * potentially block forever if a worker thread is running a task that
    * won't terminate.
+   *
+   * Worker threads will be joined depending on the threadFactory's detached
+   * disposition.
    */
   virtual void stop() = 0;
 
-  /**
-   * Joins the thread manager. This is the same as stop, except that it will
-   * block until all the workers have finished their work. At that point
-   * the ThreadManager will transition into the STOPPED state.
-   */
-  virtual void join() = 0;
-
   enum STATE { UNINITIALIZED, STARTING, STARTED, JOINING, STOPPING, STOPPED };
 
   virtual STATE state() const = 0;
 
+  /**
+   * \returns the current thread factory
+   */
   virtual boost::shared_ptr<ThreadFactory> threadFactory() const = 0;
 
+  /**
+   * Set the thread factory.
+   * \throws InvalidArgumentException if the new thread factory has a different
+   *                                  detached disposition than the one replacing it
+   */
   virtual void threadFactory(boost::shared_ptr<ThreadFactory> value) = 0;
 
+  /**
+   * Adds worker thread(s).
+   */
   virtual void addWorker(size_t value = 1) = 0;
 
+  /**
+   * Removes worker thread(s).
+   * Threads are joined if the thread factory detached disposition allows it.
+   * Blocks until the number of worker threads reaches the new limit.
+   * \param[in]  value  the number to remove
+   * \throws InvalidArgumentException if the value is greater than the number
+   *                                  of workers
+   */
   virtual void removeWorker(size_t value = 1) = 0;
 
   /**
@@ -123,7 +138,8 @@ public:
   virtual size_t pendingTaskCountMax() const = 0;
 
   /**
-   * Gets the number of tasks which have been expired without being run.
+   * Gets the number of tasks which have been expired without being run
+   * since start() was called.
    */
   virtual size_t expiredTaskCount() = 0;
 

http://git-wip-us.apache.org/repos/asf/thrift/blob/df89913b/lib/cpp/src/thrift/concurrency/Util.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/Util.h b/lib/cpp/src/thrift/concurrency/Util.h
index ba070b6..1a91599 100644
--- a/lib/cpp/src/thrift/concurrency/Util.h
+++ b/lib/cpp/src/thrift/concurrency/Util.h
@@ -98,7 +98,7 @@ public:
    * Converts struct timeval to arbitrary-sized ticks since epoch
    */
   static void toTicks(int64_t& result, const struct timeval& value, int64_t ticksPerSec) {
-    return toTicks(result, value.tv_sec, value.tv_usec, US_PER_S, ticksPerSec);
+    return toTicks(result, (unsigned long)value.tv_sec, (unsigned long)value.tv_usec, US_PER_S, ticksPerSec);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/thrift/blob/df89913b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
index b81a522..63af85c 100644
--- a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
+++ b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
@@ -96,7 +96,7 @@ TThreadPoolServer::~TThreadPoolServer() {
 
 void TThreadPoolServer::serve() {
   TServerFramework::serve();
-  threadManager_->join();
+  threadManager_->stop();
 }
 
 int64_t TThreadPoolServer::getTimeout() const {

http://git-wip-us.apache.org/repos/asf/thrift/blob/df89913b/lib/cpp/src/thrift/server/TThreadedServer.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.cpp b/lib/cpp/src/thrift/server/TThreadedServer.cpp
index e15f8f1..9de1db8 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.cpp
+++ b/lib/cpp/src/thrift/server/TThreadedServer.cpp
@@ -95,7 +95,6 @@ TThreadedServer::~TThreadedServer() {
 }
 
 void TThreadedServer::serve() {
-  threadFactory_->setDetached(false);
   TServerFramework::serve();
 
   // Ensure post-condition of no active clients
@@ -126,7 +125,7 @@ void TThreadedServer::onClientConnected(const shared_ptr<TConnectedClient>& pCli
 
 void TThreadedServer::onClientDisconnected(TConnectedClient* pClient) {
   Synchronized sync(clientMonitor_);
-  drainDeadClients();	// use the outgoing thread to do some maintenance on our dead client backlog
+  drainDeadClients(); // use the outgoing thread to do some maintenance on our dead client backlog
   ClientMap::iterator it = activeClientMap_.find(pClient);
   ClientMap::iterator end = it;
   deadClientMap_.insert(it, ++end);
@@ -153,7 +152,7 @@ void TThreadedServer::TConnectedClientRunner::run() /* override */ {
 }
 
 void TThreadedServer::TConnectedClientRunner::setThread(
-		const boost::shared_ptr<apache::thrift::concurrency::Thread>& pThread) {
+    const boost::shared_ptr<apache::thrift::concurrency::Thread>& pThread) {
   pThread_ = pThread;
 }
 

http://git-wip-us.apache.org/repos/asf/thrift/blob/df89913b/lib/cpp/src/thrift/server/TThreadedServer.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.h b/lib/cpp/src/thrift/server/TThreadedServer.h
index 0f2cce9..758d1d9 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.h
+++ b/lib/cpp/src/thrift/server/TThreadedServer.h
@@ -34,8 +34,6 @@ namespace server {
  * Manage clients using threads - threads are created one for each client and are
  * released when the client disconnects.  This server is used to make a dynamically
  * scalable server up to the concurrent connection limit.
- *
- * The thread factory will be changed to a non-detached type.
  */
 class TThreadedServer : public TServerFramework {
 public:
@@ -46,7 +44,7 @@ public:
       const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory,
       const boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory
       = boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
-          new apache::thrift::concurrency::PlatformThreadFactory));
+          new apache::thrift::concurrency::PlatformThreadFactory(false)));
 
   TThreadedServer(
       const boost::shared_ptr<apache::thrift::TProcessor>& processor,
@@ -55,7 +53,7 @@ public:
       const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory,
       const boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory
       = boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
-          new apache::thrift::concurrency::PlatformThreadFactory));
+          new apache::thrift::concurrency::PlatformThreadFactory(false)));
 
   TThreadedServer(
       const boost::shared_ptr<apache::thrift::TProcessorFactory>& processorFactory,
@@ -66,7 +64,7 @@ public:
       const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory,
       const boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory
       = boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
-          new apache::thrift::concurrency::PlatformThreadFactory));
+          new apache::thrift::concurrency::PlatformThreadFactory(false)));
 
   TThreadedServer(
       const boost::shared_ptr<apache::thrift::TProcessor>& processor,
@@ -77,7 +75,7 @@ public:
       const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory,
       const boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory
       = boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
-          new apache::thrift::concurrency::PlatformThreadFactory));
+          new apache::thrift::concurrency::PlatformThreadFactory(false)));
 
   virtual ~TThreadedServer();
 

http://git-wip-us.apache.org/repos/asf/thrift/blob/df89913b/lib/cpp/test/TPipeInterruptTest.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/test/TPipeInterruptTest.cpp b/lib/cpp/test/TPipeInterruptTest.cpp
index e2f2489..80e4c1f 100644
--- a/lib/cpp/test/TPipeInterruptTest.cpp
+++ b/lib/cpp/test/TPipeInterruptTest.cpp
@@ -63,7 +63,7 @@ static void interruptWorker(TPipeServer *pipe) {
 }
 
 BOOST_AUTO_TEST_CASE(stress_pipe_accept_interruption) {
-  int interruptIters = 100;
+  int interruptIters = 10;
 
   for (int i = 0; i < interruptIters; ++i)
   {

http://git-wip-us.apache.org/repos/asf/thrift/blob/df89913b/lib/cpp/test/TServerIntegrationTest.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/test/TServerIntegrationTest.cpp b/lib/cpp/test/TServerIntegrationTest.cpp
index 2180448..fd7bae2 100644
--- a/lib/cpp/test/TServerIntegrationTest.cpp
+++ b/lib/cpp/test/TServerIntegrationTest.cpp
@@ -156,9 +156,9 @@ public:
                               boost::shared_ptr<TTransportFactory>(new TTransportFactory),
                               boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory))),
       pEventHandler(boost::shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler)),
-	  bStressDone(false),
-	  bStressConnectionCount(0),
-	  bStressRequestCount(0) {
+    bStressDone(false),
+    bStressConnectionCount(0),
+    bStressRequestCount(0) {
     pServer->setServerEventHandler(pEventHandler);
   }
 
@@ -170,8 +170,8 @@ public:
                           boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory))),
       pEventHandler(boost::shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler)),
       bStressDone(false),
-	  bStressConnectionCount(0),
-	  bStressRequestCount(0) {
+    bStressConnectionCount(0),
+    bStressRequestCount(0) {
     pServer->setServerEventHandler(pEventHandler);
   }
 
@@ -217,10 +217,10 @@ public:
    * \param[in]  purpose  a description of the test for logging purposes
    */
   void baseline(int64_t numToMake, int64_t expectedHWM, const std::string& purpose) {
-	BOOST_TEST_MESSAGE(boost::format("Testing %1%: %2% with %3% clients, expect %4% HWM")
-						% typeid(TServerType).name() % purpose % numToMake % expectedHWM);
+    BOOST_TEST_MESSAGE(boost::format("Testing %1%: %2% with %3% clients, expect %4% HWM")
+            % typeid(TServerType).name() % purpose % numToMake % expectedHWM);
 
-	startServer();
+    startServer();
 
     std::vector<boost::shared_ptr<TSocket> > holdSockets;
     std::vector<boost::shared_ptr<boost::thread> > holdThreads;
@@ -303,14 +303,14 @@ public:
    * Helper method to stress the system
    */
   void stressor() {
-	while (!bStressDone) {
+  while (!bStressDone) {
       boost::shared_ptr<TSocket> pSocket(new TSocket("localhost", getServerPort()), autoSocketCloser);
       boost::shared_ptr<TProtocol> pProtocol(new TBinaryProtocol(pSocket));
       ParentServiceClient client(pProtocol);
       pSocket->open();
       bStressConnectionCount.fetch_add(1, boost::memory_order_relaxed);
       for (int i = 0; i < rand() % 1000; ++i) {
-    	client.incrementGeneration();
+      client.incrementGeneration();
         bStressRequestCount.fetch_add(1, boost::memory_order_relaxed);
       }
     }
@@ -459,7 +459,7 @@ BOOST_AUTO_TEST_CASE(test_stop_with_interruptable_clients_connected) {
 BOOST_AUTO_TEST_CASE(test_stop_with_uninterruptable_clients_connected) {
   // This tests pre-THRIFT-2441 behavior: stopping the server blocks until clients
   // disconnect.
-	  BOOST_TEST_MESSAGE("Testing stop with uninterruptable clients");
+    BOOST_TEST_MESSAGE("Testing stop with uninterruptable clients");
 
   boost::dynamic_pointer_cast<TServerSocket>(pServer->getServerTransport())
       ->setInterruptableChildren(false); // returns to pre-THRIFT-2441 behavior

http://git-wip-us.apache.org/repos/asf/thrift/blob/df89913b/lib/cpp/test/concurrency/Tests.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/test/concurrency/Tests.cpp b/lib/cpp/test/concurrency/Tests.cpp
index 0d81d7e..33af392 100644
--- a/lib/cpp/test/concurrency/Tests.cpp
+++ b/lib/cpp/test/concurrency/Tests.cpp
@@ -45,25 +45,38 @@ int main(int argc, char** argv) {
 
     std::cout << "ThreadFactory tests..." << std::endl;
 
-    size_t count = 1000;
-    size_t floodLoops = 1;
-    size_t floodCount = 100000;
+    int reapLoops = 20;
+    int reapCount = 1000;
+    size_t floodLoops = 3;
+    size_t floodCount = 20000;
 
-    std::cout << "\t\tThreadFactory reap N threads test: N = " << count << std::endl;
+    std::cout << "\t\tThreadFactory reap N threads test: N = " << reapLoops << "x" << reapCount << std::endl;
 
-    assert(threadFactoryTests.reapNThreads(count));
+    if (!threadFactoryTests.reapNThreads(reapLoops, reapCount)) {
+      std::cerr << "\t\ttThreadFactory reap N threads FAILED" << std::endl;
+      return 1;
+    }
 
-    std::cout << "\t\tThreadFactory floodN threads test: N = " << floodCount << std::endl;
+    std::cout << "\t\tThreadFactory flood N threads test: N = " << floodLoops << "x" << floodCount << std::endl;
 
-    assert(threadFactoryTests.floodNTest(floodLoops, floodCount));
+    if (!threadFactoryTests.floodNTest(floodLoops, floodCount)) {
+      std::cerr << "\t\ttThreadFactory flood N threads FAILED" << std::endl;
+      return 1;
+    }
 
     std::cout << "\t\tThreadFactory synchronous start test" << std::endl;
 
-    assert(threadFactoryTests.synchStartTest());
+    if (!threadFactoryTests.synchStartTest()) {
+      std::cerr << "\t\ttThreadFactory synchronous start FAILED" << std::endl;
+      return 1;
+    }
 
     std::cout << "\t\tThreadFactory monitor timeout test" << std::endl;
 
-    assert(threadFactoryTests.monitorTimeoutTest());
+    if (!threadFactoryTests.monitorTimeoutTest()) {
+      std::cerr << "\t\ttThreadFactory monitor timeout FAILED" << std::endl;
+      return 1;
+    }
   }
 
   if (runAll || args[0].compare("util") == 0) {
@@ -97,7 +110,10 @@ int main(int argc, char** argv) {
 
     TimerManagerTests timerManagerTests;
 
-    assert(timerManagerTests.test00());
+    if (!timerManagerTests.test00()) {
+      std::cerr << "\t\tTimerManager tests FAILED" << std::endl;
+      return 1;
+    }
   }
 
   if (runAll || args[0].compare("thread-manager") == 0) {
@@ -105,24 +121,34 @@ int main(int argc, char** argv) {
     std::cout << "ThreadManager tests..." << std::endl;
 
     {
-
       size_t workerCount = 100;
+      size_t taskCount = 50000;
+      int64_t delay = 10LL;
 
-      size_t taskCount = 100000;
+      ThreadManagerTests threadManagerTests;
 
-      int64_t delay = 10LL;
+      std::cout << "\t\tThreadManager api test:" << std::endl;
+
+      if (!threadManagerTests.apiTest()) {
+        std::cerr << "\t\tThreadManager apiTest FAILED" << std::endl;
+        return 1;
+      }
 
       std::cout << "\t\tThreadManager load test: worker count: " << workerCount
                 << " task count: " << taskCount << " delay: " << delay << std::endl;
 
-      ThreadManagerTests threadManagerTests;
-
-      assert(threadManagerTests.loadTest(taskCount, delay, workerCount));
+      if (!threadManagerTests.loadTest(taskCount, delay, workerCount)) {
+        std::cerr << "\t\tThreadManager loadTest FAILED" << std::endl;
+        return 1;
+      }
 
       std::cout << "\t\tThreadManager block test: worker count: " << workerCount
                 << " delay: " << delay << std::endl;
 
-      assert(threadManagerTests.blockTest(delay, workerCount));
+      if (!threadManagerTests.blockTest(delay, workerCount)) {
+        std::cerr << "\t\tThreadManager blockTest FAILED" << std::endl;
+        return 1;
+      }
     }
   }
 
@@ -134,13 +160,13 @@ int main(int argc, char** argv) {
 
       size_t minWorkerCount = 2;
 
-      size_t maxWorkerCount = 512;
+      size_t maxWorkerCount = 64;
 
       size_t tasksPerWorker = 1000;
 
-      int64_t delay = 10LL;
+      int64_t delay = 5LL;
 
-      for (size_t workerCount = minWorkerCount; workerCount < maxWorkerCount; workerCount *= 2) {
+      for (size_t workerCount = minWorkerCount; workerCount < maxWorkerCount; workerCount *= 4) {
 
         size_t taskCount = workerCount * tasksPerWorker;
 
@@ -149,8 +175,15 @@ int main(int argc, char** argv) {
 
         ThreadManagerTests threadManagerTests;
 
-        threadManagerTests.loadTest(taskCount, delay, workerCount);
+        if (!threadManagerTests.loadTest(taskCount, delay, workerCount))
+        {
+          std::cerr << "\t\tThreadManager loadTest FAILED" << std::endl;
+          return 1;
+        }
       }
     }
   }
+
+  std::cout << "ALL TESTS PASSED" << std::endl;
+  return 0;
 }

http://git-wip-us.apache.org/repos/asf/thrift/blob/df89913b/lib/cpp/test/concurrency/ThreadFactoryTests.h
----------------------------------------------------------------------
diff --git a/lib/cpp/test/concurrency/ThreadFactoryTests.h b/lib/cpp/test/concurrency/ThreadFactoryTests.h
index 3ad14ca..4fc688c 100644
--- a/lib/cpp/test/concurrency/ThreadFactoryTests.h
+++ b/lib/cpp/test/concurrency/ThreadFactoryTests.h
@@ -43,36 +43,6 @@ using namespace apache::thrift::concurrency;
 class ThreadFactoryTests {
 
 public:
-  static const double TEST_TOLERANCE;
-
-  class Task : public Runnable {
-
-  public:
-    Task() {}
-
-    void run() { std::cout << "\t\t\tHello World" << std::endl; }
-  };
-
-  /**
-   * Hello world test
-   */
-  bool helloWorldTest() {
-
-    PlatformThreadFactory threadFactory = PlatformThreadFactory();
-
-    shared_ptr<Task> task = shared_ptr<Task>(new ThreadFactoryTests::Task());
-
-    shared_ptr<Thread> thread = threadFactory.newThread(task);
-
-    thread->start();
-
-    thread->join();
-
-    std::cout << "\t\t\tSuccess!" << std::endl;
-
-    return true;
-  }
-
   /**
    * Reap N threads
    */
@@ -244,15 +214,22 @@ public:
     return true;
   }
 
-  /** See how accurate monitor timeout is. */
+  /**
+   * The only guarantee a monitor timeout can give you is that
+   * it will take "at least" as long as the timeout, no less.
+   * There is absolutely no guarantee around regaining execution
+   * near the timeout.  On a busy system (like inside a third party
+   * CI environment) it could take quite a bit longer than the
+   * requested timeout, and that's ok.
+   */
 
-  bool monitorTimeoutTest(size_t count = 1000, int64_t timeout = 10) {
+  bool monitorTimeoutTest(int64_t count = 1000, int64_t timeout = 2) {
 
     Monitor monitor;
 
     int64_t startTime = Util::currentTime();
 
-    for (size_t ix = 0; ix < count; ix++) {
+    for (int64_t ix = 0; ix < count; ix++) {
       {
         Synchronized s(monitor);
         try {
@@ -264,18 +241,11 @@ public:
 
     int64_t endTime = Util::currentTime();
 
-    double error = ((endTime - startTime) - (count * timeout)) / (double)(count * timeout);
-
-    if (error < 0.0) {
-
-      error *= 1.0;
-    }
-
-    bool success = error < ThreadFactoryTests::TEST_TOLERANCE;
+  bool success = (endTime - startTime) >= (count * timeout);
 
     std::cout << "\t\t\t" << (success ? "Success" : "Failure")
-              << "! expected time: " << count * timeout
-              << "ms elapsed time: " << endTime - startTime << "ms error%: " << error * 100.0
+              << ": minimum required time to elapse " << count * timeout
+              << "ms; actual elapsed time " << endTime - startTime << "ms"
               << std::endl;
 
     return success;
@@ -285,17 +255,15 @@ public:
   public:
     FloodTask(const size_t id) : _id(id) {}
     ~FloodTask() {
-      if (_id % 1000 == 0) {
+      if (_id % 10000 == 0) {
         std::cout << "\t\tthread " << _id << " done" << std::endl;
       }
     }
 
     void run() {
-      if (_id % 1000 == 0) {
+      if (_id % 10000 == 0) {
         std::cout << "\t\tthread " << _id << " started" << std::endl;
       }
-
-      THRIFT_SLEEP_USEC(1);
     }
     const size_t _id;
   };
@@ -321,8 +289,6 @@ public:
 
           thread->start();
 
-          THRIFT_SLEEP_USEC(1);
-
         } catch (TException& e) {
 
           std::cout << "\t\t\tfailed to start  " << lix* count + tix << " thread " << e.what()
@@ -341,7 +307,6 @@ public:
   }
 };
 
-const double ThreadFactoryTests::TEST_TOLERANCE = .20;
 }
 }
 }

http://git-wip-us.apache.org/repos/asf/thrift/blob/df89913b/lib/cpp/test/concurrency/ThreadManagerTests.h
----------------------------------------------------------------------
diff --git a/lib/cpp/test/concurrency/ThreadManagerTests.h b/lib/cpp/test/concurrency/ThreadManagerTests.h
index b196813..b5925ac 100644
--- a/lib/cpp/test/concurrency/ThreadManagerTests.h
+++ b/lib/cpp/test/concurrency/ThreadManagerTests.h
@@ -24,9 +24,9 @@
 #include <thrift/concurrency/Util.h>
 
 #include <assert.h>
+#include <deque>
 #include <set>
 #include <iostream>
-#include <set>
 #include <stdint.h>
 
 namespace apache {
@@ -36,9 +36,26 @@ namespace test {
 
 using namespace apache::thrift::concurrency;
 
-class ThreadManagerTests {
+static std::deque<boost::shared_ptr<Runnable> > m_expired;
+static void expiredNotifier(boost::shared_ptr<Runnable> runnable)
+{
+  m_expired.push_back(runnable);
+}
 
-  static const double TEST_TOLERANCE;
+static void sleep_(int64_t millisec) {
+  Monitor _sleep;
+  Synchronized s(_sleep);
+
+  try {
+    _sleep.wait(millisec);
+  } catch (TimedOutException&) {
+    ;
+  } catch (...) {
+    assert(0);
+  }
+}
+
+class ThreadManagerTests {
 
 public:
   class Task : public Runnable {
@@ -51,17 +68,7 @@ public:
 
       _startTime = Util::currentTime();
 
-      {
-        Synchronized s(_sleep);
-
-        try {
-          _sleep.wait(_timeout);
-        } catch (TimedOutException&) {
-          ;
-        } catch (...) {
-          assert(0);
-        }
-      }
+      sleep_(_timeout);
 
       _endTime = Util::currentTime();
 
@@ -73,9 +80,7 @@ public:
         // std::cout << "Thread " << _count << " completed " << std::endl;
 
         _count--;
-
-        if (_count == 0) {
-
+        if (_count % 10000 == 0) {
           _monitor.notify();
         }
       }
@@ -130,11 +135,13 @@ public:
       threadManager->add(*ix);
     }
 
+    std::cout << "\t\t\t\tloaded " << count << " tasks to execute" << std::endl;
+
     {
       Synchronized s(monitor);
 
       while (activeCount > 0) {
-
+        std::cout << "\t\t\t\tactiveCount = " << activeCount << std::endl;
         monitor.wait();
       }
     }
@@ -179,23 +186,15 @@ public:
 
     averageTime /= count;
 
-    std::cout << "\t\t\tfirst start: " << firstTime << "ms Last end: " << lastTime
-              << "ms min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime
+    std::cout << "\t\t\tfirst start: " << firstTime << " Last end: " << lastTime
+              << " min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime
               << "ms" << std::endl;
 
-    double expectedTime = (double(count + (workerCount - 1)) / workerCount) * timeout;
-
-    double error = ((time01 - time00) - expectedTime) / expectedTime;
-
-    if (error < 0) {
-      error *= -1.0;
-    }
-
-    bool success = error < TEST_TOLERANCE;
+    bool success = (time01 - time00) >= ((int64_t)count * timeout) / (int64_t)workerCount;
 
     std::cout << "\t\t\t" << (success ? "Success" : "Failure")
-              << "! expected time: " << expectedTime << "ms elapsed time: " << time01 - time00
-              << "ms error%: " << error * 100.0 << std::endl;
+              << "! expected time: " << ((int64_t)count * timeout) / (int64_t)workerCount << "ms elapsed time: " << time01 - time00
+              << "ms" << std::endl;
 
     return success;
   }
@@ -203,30 +202,36 @@ public:
   class BlockTask : public Runnable {
 
   public:
-    BlockTask(Monitor& monitor, Monitor& bmonitor, size_t& count)
-      : _monitor(monitor), _bmonitor(bmonitor), _count(count) {}
+    BlockTask(Monitor& entryMonitor, Monitor& blockMonitor, bool& blocked, Monitor& doneMonitor, size_t& count)
+      : _entryMonitor(entryMonitor), _entered(false), _blockMonitor(blockMonitor), _blocked(blocked), _doneMonitor(doneMonitor), _count(count) {}
 
     void run() {
       {
-        Synchronized s(_bmonitor);
-
-        _bmonitor.wait();
+        Synchronized s(_entryMonitor);
+        _entered = true;
+        _entryMonitor.notify();
       }
 
       {
-        Synchronized s(_monitor);
-
-        _count--;
-
-        if (_count == 0) {
+        Synchronized s(_blockMonitor);
+        while (_blocked) {
+          _blockMonitor.wait();
+        }
+      }
 
-          _monitor.notify();
+      {
+        Synchronized s(_doneMonitor);
+        if (--_count == 0) {
+          _doneMonitor.notify();
         }
       }
     }
 
-    Monitor& _monitor;
-    Monitor& _bmonitor;
+    Monitor& _entryMonitor;
+    bool _entered;
+    Monitor& _blockMonitor;
+    bool& _blocked;
+    Monitor& _doneMonitor;
     size_t& _count;
   };
 
@@ -240,8 +245,10 @@ public:
 
     try {
 
-      Monitor bmonitor;
-      Monitor monitor;
+      Monitor entryMonitor;   // not used by this test
+      Monitor blockMonitor;
+      bool blocked[] = {true, true, true};
+      Monitor doneMonitor;
 
       size_t pendingTaskMaxCount = workerCount;
 
@@ -260,21 +267,22 @@ public:
 
       threadManager->start();
 
-      std::set<shared_ptr<ThreadManagerTests::BlockTask> > tasks;
+      std::vector<shared_ptr<ThreadManagerTests::BlockTask> > tasks;
+      tasks.reserve(workerCount + pendingTaskMaxCount);
 
       for (size_t ix = 0; ix < workerCount; ix++) {
 
-        tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(
-            new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[0])));
+        tasks.push_back(shared_ptr<ThreadManagerTests::BlockTask>(
+            new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[0], doneMonitor, activeCounts[0])));
       }
 
       for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) {
 
-        tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(
-            new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[1])));
+        tasks.push_back(shared_ptr<ThreadManagerTests::BlockTask>(
+            new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[1], doneMonitor, activeCounts[1])));
       }
 
-      for (std::set<shared_ptr<ThreadManagerTests::BlockTask> >::iterator ix = tasks.begin();
+      for (std::vector<shared_ptr<ThreadManagerTests::BlockTask> >::iterator ix = tasks.begin();
            ix != tasks.end();
            ix++) {
         threadManager->add(*ix);
@@ -285,7 +293,7 @@ public:
       }
 
       shared_ptr<ThreadManagerTests::BlockTask> extraTask(
-          new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[2]));
+          new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[2], doneMonitor, activeCounts[2]));
 
       try {
         threadManager->add(extraTask, 1);
@@ -309,16 +317,15 @@ public:
                 << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
 
       {
-        Synchronized s(bmonitor);
-
-        bmonitor.notifyAll();
+        Synchronized s(blockMonitor);
+        blocked[0] = false;
+        blockMonitor.notifyAll();
       }
 
       {
-        Synchronized s(monitor);
-
+        Synchronized s(doneMonitor);
         while (activeCounts[0] != 0) {
-          monitor.wait();
+          doneMonitor.wait();
         }
       }
 
@@ -341,37 +348,37 @@ public:
       // Wake up tasks that were pending before and wait for them to complete
 
       {
-        Synchronized s(bmonitor);
-
-        bmonitor.notifyAll();
+        Synchronized s(blockMonitor);
+        blocked[1] = false;
+        blockMonitor.notifyAll();
       }
 
       {
-        Synchronized s(monitor);
-
+        Synchronized s(doneMonitor);
         while (activeCounts[1] != 0) {
-          monitor.wait();
+          doneMonitor.wait();
         }
       }
 
       // Wake up the extra task and wait for it to complete
 
       {
-        Synchronized s(bmonitor);
-
-        bmonitor.notifyAll();
+        Synchronized s(blockMonitor);
+        blocked[2] = false;
+        blockMonitor.notifyAll();
       }
 
       {
-        Synchronized s(monitor);
-
+        Synchronized s(doneMonitor);
         while (activeCounts[2] != 0) {
-          monitor.wait();
+          doneMonitor.wait();
         }
       }
 
+      threadManager->stop();
+
       if (!(success = (threadManager->totalTaskCount() == 0))) {
-        throw TException("Unexpected pending task count");
+        throw TException("Unexpected total task count");
       }
 
     } catch (TException& e) {
@@ -381,9 +388,295 @@ public:
     std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl;
     return success;
   }
+
+
+  bool apiTest() {
+
+    // prove currentTime has milliseconds granularity since many other things depend on it
+    int64_t a = Util::currentTime();
+    sleep_(100);
+    int64_t b = Util::currentTime();
+    if (b - a < 50 || b - a > 150) {
+      std::cerr << "\t\t\texpected 100ms gap, found " << (b-a) << "ms gap instead." << std::endl;
+      return false;
+    }
+
+#if !USE_BOOST_THREAD && !USE_STD_THREAD
+    // test once with a detached thread factory and once with a joinable thread factory
+
+    shared_ptr<PosixThreadFactory> threadFactory
+        = shared_ptr<PosixThreadFactory>(new PosixThreadFactory(false));
+
+    std::cout << "\t\t\tapiTest with joinable thread factory" << std::endl;
+    if (!apiTestWithThreadFactory(threadFactory)) {
+      return false;
+    }
+
+    threadFactory.reset(new PosixThreadFactory(true));
+    std::cout << "\t\t\tapiTest with detached thread factory" << std::endl;
+    return apiTestWithThreadFactory(threadFactory);
+#else
+    return apiTestWithThreadFactory(shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory()));
+#endif
+
+  }
+
+  bool apiTestWithThreadFactory(shared_ptr<PlatformThreadFactory> threadFactory)
+  {
+    shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(1);
+    threadManager->threadFactory(threadFactory);
+
+#if !USE_BOOST_THREAD && !USE_STD_THREAD
+    threadFactory->setPriority(PosixThreadFactory::HIGHEST);
+
+    // verify we cannot change the thread factory to one with the opposite detached setting
+    shared_ptr<PlatformThreadFactory> threadFactory2
+        = shared_ptr<PosixThreadFactory>(new PlatformThreadFactory(
+          PosixThreadFactory::ROUND_ROBIN,
+          PosixThreadFactory::NORMAL,
+          1,
+          !threadFactory->isDetached()));
+    try {
+      threadManager->threadFactory(threadFactory2);
+      // if the call succeeded we changed the thread factory to one that had the opposite setting for "isDetached()".
+      // this is bad, because the thread manager checks with the thread factory to see if it should join threads
+      // as they are leaving - so the detached status of new threads cannot change while there are existing threads.
+      std::cerr << "\t\t\tShould not be able to change thread factory detached disposition" << std::endl;
+      return false;
+    }
+    catch (InvalidArgumentException& ex) {
+      /* expected */
+    }
+#endif
+
+    std::cout << "\t\t\t\tstarting.. " << std::endl;
+
+    threadManager->start();
+    threadManager->setExpireCallback(expiredNotifier); // apache::thrift::stdcxx::bind(&ThreadManagerTests::expiredNotifier, this));
+
+#define EXPECT(FUNC, COUNT) { size_t c = FUNC; if (c != COUNT) { std::cerr << "expected " #FUNC" to be " #COUNT ", but was " << c << std::endl; return false; } }
+
+    EXPECT(threadManager->workerCount(), 1);
+    EXPECT(threadManager->idleWorkerCount(), 1);
+    EXPECT(threadManager->pendingTaskCount(), 0);
+
+    std::cout << "\t\t\t\tadd 2nd worker.. " << std::endl;
+
+    threadManager->addWorker();
+
+    EXPECT(threadManager->workerCount(), 2);
+    EXPECT(threadManager->idleWorkerCount(), 2);
+    EXPECT(threadManager->pendingTaskCount(), 0);
+
+    std::cout << "\t\t\t\tremove 2nd worker.. " << std::endl;
+
+    threadManager->removeWorker();
+
+    EXPECT(threadManager->workerCount(), 1);
+    EXPECT(threadManager->idleWorkerCount(), 1);
+    EXPECT(threadManager->pendingTaskCount(), 0);
+
+    std::cout << "\t\t\t\tremove 1st worker.. " << std::endl;
+
+    threadManager->removeWorker();
+
+    EXPECT(threadManager->workerCount(), 0);
+    EXPECT(threadManager->idleWorkerCount(), 0);
+    EXPECT(threadManager->pendingTaskCount(), 0);
+
+    std::cout << "\t\t\t\tadd blocking task.. " << std::endl;
+
+    // We're going to throw a blocking task into the mix
+    Monitor entryMonitor;   // signaled when task is running
+    Monitor blockMonitor;   // to be signaled to unblock the task
+    bool blocked(true);     // set to false before notifying
+    Monitor doneMonitor;    // signaled when count reaches zero
+    size_t activeCount = 1;
+    shared_ptr<ThreadManagerTests::BlockTask> blockingTask(
+      new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked, doneMonitor, activeCount));
+    threadManager->add(blockingTask);
+
+    EXPECT(threadManager->workerCount(), 0);
+    EXPECT(threadManager->idleWorkerCount(), 0);
+    EXPECT(threadManager->pendingTaskCount(), 1);
+
+    std::cout << "\t\t\t\tadd other task.. " << std::endl;
+
+    shared_ptr<ThreadManagerTests::Task> otherTask(
+      new ThreadManagerTests::Task(doneMonitor, activeCount, 0));
+
+    threadManager->add(otherTask);
+
+    EXPECT(threadManager->workerCount(), 0);
+    EXPECT(threadManager->idleWorkerCount(), 0);
+    EXPECT(threadManager->pendingTaskCount(), 2);
+
+    std::cout << "\t\t\t\tremove blocking task specifically.. " << std::endl;
+
+    threadManager->remove(blockingTask);
+
+    EXPECT(threadManager->workerCount(), 0);
+    EXPECT(threadManager->idleWorkerCount(), 0);
+    EXPECT(threadManager->pendingTaskCount(), 1);
+
+    std::cout << "\t\t\t\tremove next pending task.." << std::endl;
+
+    shared_ptr<Runnable> nextTask = threadManager->removeNextPending();
+    if (nextTask != otherTask) {
+      std::cerr << "\t\t\t\t\texpected removeNextPending to return otherTask" << std::endl;
+      return false;
+    }
+
+    EXPECT(threadManager->workerCount(), 0);
+    EXPECT(threadManager->idleWorkerCount(), 0);
+    EXPECT(threadManager->pendingTaskCount(), 0);
+
+    std::cout << "\t\t\t\tremove next pending task (none left).." << std::endl;
+
+    nextTask = threadManager->removeNextPending();
+    if (nextTask) {
+      std::cerr << "\t\t\t\t\texpected removeNextPending to return an empty Runnable" << std::endl;
+      return false;
+    }
+
+    std::cout << "\t\t\t\tadd 2 expired tasks and 1 not.." << std::endl;
+
+    shared_ptr<ThreadManagerTests::Task> expiredTask(
+      new ThreadManagerTests::Task(doneMonitor, activeCount, 0));
+
+    threadManager->add(expiredTask, 0, 1);
+    threadManager->add(blockingTask);       // add one that hasn't expired to make sure it gets skipped
+    threadManager->add(expiredTask, 0, 1);  // add a second expired to ensure removeExpiredTasks removes both
+
+    sleep_(50);  // make sure enough time elapses for it to expire - the shortest expiration time is 1 millisecond
+
+    EXPECT(threadManager->workerCount(), 0);
+    EXPECT(threadManager->idleWorkerCount(), 0);
+    EXPECT(threadManager->pendingTaskCount(), 3);
+    EXPECT(threadManager->expiredTaskCount(), 0);
+
+    std::cout << "\t\t\t\tremove expired tasks.." << std::endl;
+
+    if (!m_expired.empty()) {
+      std::cerr << "\t\t\t\t\texpected m_expired to be empty" << std::endl;
+      return false;
+    }
+
+    threadManager->removeExpiredTasks();
+
+    if (m_expired.size() != 2) {
+      std::cerr << "\t\t\t\t\texpected m_expired to be set" << std::endl;
+      return false;
+    }
+
+    if (m_expired.front() != expiredTask) {
+      std::cerr << "\t\t\t\t\texpected m_expired[0] to be the expired task" << std::endl;
+      return false;
+    }
+    m_expired.pop_front();
+
+    if (m_expired.front() != expiredTask) {
+      std::cerr << "\t\t\t\t\texpected m_expired[1] to be the expired task" << std::endl;
+      return false;
+    }
+
+    m_expired.clear();
+
+    threadManager->remove(blockingTask);
+
+    EXPECT(threadManager->workerCount(), 0);
+    EXPECT(threadManager->idleWorkerCount(), 0);
+    EXPECT(threadManager->pendingTaskCount(), 0);
+    EXPECT(threadManager->expiredTaskCount(), 2);
+
+    std::cout << "\t\t\t\tadd expired task (again).." << std::endl;
+
+    threadManager->add(expiredTask, 0, 1);  // expires in 1ms
+    sleep_(50);  // make sure enough time elapses for it to expire - the shortest expiration time is 1ms
+
+    std::cout << "\t\t\t\tadd worker to consume expired task.." << std::endl;
+
+    threadManager->addWorker();
+    sleep_(100);  // make sure it has time to spin up and expire the task
+
+    if (m_expired.empty()) {
+      std::cerr << "\t\t\t\t\texpected m_expired to be set" << std::endl;
+      return false;
+    }
+
+    if (m_expired.front() != expiredTask) {
+      std::cerr << "\t\t\t\t\texpected m_expired to be the expired task" << std::endl;
+      return false;
+    }
+
+    m_expired.clear();
+
+    EXPECT(threadManager->workerCount(), 1);
+    EXPECT(threadManager->idleWorkerCount(), 1);
+    EXPECT(threadManager->pendingTaskCount(), 0);
+    EXPECT(threadManager->expiredTaskCount(), 3);
+
+    std::cout << "\t\t\t\ttry to remove too many workers" << std::endl;
+    try {
+      threadManager->removeWorker(2);
+      std::cerr << "\t\t\t\t\texpected InvalidArgumentException" << std::endl;
+      return false;
+    } catch (const InvalidArgumentException&) {
+      /* expected */
+    }
+
+    std::cout << "\t\t\t\tremove worker.. " << std::endl;
+
+    threadManager->removeWorker();
+
+    EXPECT(threadManager->workerCount(), 0);
+    EXPECT(threadManager->idleWorkerCount(), 0);
+    EXPECT(threadManager->pendingTaskCount(), 0);
+    EXPECT(threadManager->expiredTaskCount(), 3);
+
+    std::cout << "\t\t\t\tadd blocking task.. " << std::endl;
+
+    threadManager->add(blockingTask);
+
+    EXPECT(threadManager->workerCount(), 0);
+    EXPECT(threadManager->idleWorkerCount(), 0);
+    EXPECT(threadManager->pendingTaskCount(), 1);
+
+    std::cout << "\t\t\t\tadd worker.. " << std::endl;
+
+    threadManager->addWorker();
+    {
+      Synchronized s(entryMonitor);
+      while (!blockingTask->_entered) {
+        entryMonitor.wait();
+      }
+    }
+
+    EXPECT(threadManager->workerCount(), 1);
+    EXPECT(threadManager->idleWorkerCount(), 0);
+    EXPECT(threadManager->pendingTaskCount(), 0);
+
+    std::cout << "\t\t\t\tunblock task and remove worker.. " << std::endl;
+
+    {
+      Synchronized s(blockMonitor);
+      blocked = false;
+      blockMonitor.notifyAll();
+    }
+    threadManager->removeWorker();
+
+    EXPECT(threadManager->workerCount(), 0);
+    EXPECT(threadManager->idleWorkerCount(), 0);
+    EXPECT(threadManager->pendingTaskCount(), 0);
+
+    std::cout << "\t\t\t\tcleanup.. " << std::endl;
+
+    blockingTask.reset();
+    threadManager.reset();
+    return true;
+  }
 };
 
-const double ThreadManagerTests::TEST_TOLERANCE = .20;
 }
 }
 }

http://git-wip-us.apache.org/repos/asf/thrift/blob/df89913b/lib/cpp/test/concurrency/TimerManagerTests.h
----------------------------------------------------------------------
diff --git a/lib/cpp/test/concurrency/TimerManagerTests.h b/lib/cpp/test/concurrency/TimerManagerTests.h
index c6fa4cf..32d3935 100644
--- a/lib/cpp/test/concurrency/TimerManagerTests.h
+++ b/lib/cpp/test/concurrency/TimerManagerTests.h
@@ -34,8 +34,6 @@ using namespace apache::thrift::concurrency;
 
 class TimerManagerTests {
 
-  static const double TEST_TOLERANCE;
-
 public:
   class Task : public Runnable {
   public:
@@ -52,25 +50,11 @@ public:
     void run() {
 
       _endTime = Util::currentTime();
-
-      // Figure out error percentage
-
-      int64_t delta = _endTime - _startTime;
-
-      delta = delta > _timeout ? delta - _timeout : _timeout - delta;
-
-      double error = double(delta) / _timeout;
-
-      if (error < TEST_TOLERANCE) {
-        _success = true;
-      }
-
-      _done = true;
-
-      std::cout << "\t\t\tTimerManagerTests::Task[" << this << "] done" << std::endl; // debug
+      _success = (_endTime - _startTime) >= _timeout;
 
       {
         Synchronized s(_monitor);
+        _done = true;
         _monitor.notifyAll();
       }
     }
@@ -147,7 +131,6 @@ public:
   Monitor _monitor;
 };
 
-const double TimerManagerTests::TEST_TOLERANCE = .20;
 }
 }
 }

http://git-wip-us.apache.org/repos/asf/thrift/blob/df89913b/lib/cpp/test/processor/ProcessorTest.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/test/processor/ProcessorTest.cpp b/lib/cpp/test/processor/ProcessorTest.cpp
index 0066657..a4e984c 100644
--- a/lib/cpp/test/processor/ProcessorTest.cpp
+++ b/lib/cpp/test/processor/ProcessorTest.cpp
@@ -299,7 +299,7 @@ void checkNoEvents(const boost::shared_ptr<EventLog>& log) {
  */
 uint32_t checkNewConnEvents(const boost::shared_ptr<EventLog>& log) {
   // Check for an ET_CONN_CREATED event
-  Event event = log->waitForEvent();
+  Event event = log->waitForEvent(2500);
   BOOST_CHECK_EQUAL(EventLog::ET_CONN_CREATED, event.type);
 
   // Some servers call the processContext() hook immediately.

http://git-wip-us.apache.org/repos/asf/thrift/blob/df89913b/test/cpp/src/TestServer.cpp
----------------------------------------------------------------------
diff --git a/test/cpp/src/TestServer.cpp b/test/cpp/src/TestServer.cpp
index 4808d89..b437860 100644
--- a/test/cpp/src/TestServer.cpp
+++ b/test/cpp/src/TestServer.cpp
@@ -746,7 +746,7 @@ int main(int argc, char** argv) {
 
   if (server.get() != NULL) {
     if (protocol_type == "header") {
-      // Tell the server to use the same protocol for input / output 
+      // Tell the server to use the same protocol for input / output
       // if using header
       server->setOutputProtocolFactory(boost::shared_ptr<TProtocolFactory>());
     }