You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by dr...@apache.org on 2010/03/09 06:19:32 UTC
svn commit: r920667 -
/incubator/thrift/trunk/lib/cpp/src/concurrency/ThreadManager.cpp
Author: dreiss
Date: Tue Mar 9 05:19:32 2010
New Revision: 920667
URL: http://svn.apache.org/viewvc?rev=920667&view=rev
Log:
cpp: TNonBlockingServer: Use separate monitor for max queue
We were using the same monitor for max queue size and empty queue, this
meant the notifies might be going to the wrong place.
This change significantly reduces the time spent in futex calls in
loaded servers.
Modified:
incubator/thrift/trunk/lib/cpp/src/concurrency/ThreadManager.cpp
Modified: incubator/thrift/trunk/lib/cpp/src/concurrency/ThreadManager.cpp
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/src/concurrency/ThreadManager.cpp?rev=920667&r1=920666&r2=920667&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/src/concurrency/ThreadManager.cpp (original)
+++ incubator/thrift/trunk/lib/cpp/src/concurrency/ThreadManager.cpp Tue Mar 9 05:19:32 2010
@@ -54,7 +54,9 @@ class ThreadManager::Impl : public Threa
workerMaxCount_(0),
idleCount_(0),
pendingTaskCountMax_(0),
- state_(ThreadManager::UNINITIALIZED) {}
+ state_(ThreadManager::UNINITIALIZED),
+ monitor_(&mutex_),
+ maxMonitor_(&mutex_) {}
~Impl() { stop(); }
@@ -133,7 +135,9 @@ private:
friend class ThreadManager::Task;
std::queue<shared_ptr<Task> > tasks_;
+ Mutex mutex_;
Monitor monitor_;
+ Monitor maxMonitor_;
Monitor workerMonitor_;
friend class ThreadManager::Worker;
@@ -245,7 +249,7 @@ class ThreadManager::Worker: public Runn
* the manager will see it.
*/
{
- Synchronized s(manager_->monitor_);
+ Guard g(manager_->mutex_);
active = isActive();
while (active && manager_->tasks_.empty()) {
@@ -269,7 +273,7 @@ class ThreadManager::Worker: public Runn
thread that might be blocked on add. */
if (manager_->pendingTaskCountMax_ != 0 &&
manager_->tasks_.size() == manager_->pendingTaskCountMax_ - 1) {
- manager_->monitor_.notify();
+ manager_->maxMonitor_.notify();
}
}
} else {
@@ -432,7 +436,7 @@ void ThreadManager::Impl::removeWorker(s
}
void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout) {
- Synchronized s(monitor_);
+ Guard g(mutex_);
if (state_ != ThreadManager::STARTED) {
throw IllegalStateException();
@@ -441,7 +445,8 @@ void ThreadManager::Impl::removeWorker(s
if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
if (canSleep() && timeout >= 0) {
while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
- monitor_.wait(timeout);
+ // This is thread safe because the mutex is shared between monitors.
+ maxMonitor_.wait(timeout);
}
} else {
throw TooManyPendingTasksException();