You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/04/06 21:26:32 UTC
svn commit: r1089595 - in /activemq/activemq-cpp/trunk/activemq-cpp/src:
main/decaf/util/concurrent/ test/decaf/util/concurrent/
Author: tabish
Date: Wed Apr 6 19:26:32 2011
New Revision: 1089595
URL: http://svn.apache.org/viewvc?rev=1089595&view=rev
Log:
Additional improvements to ThreadPoolExecutor and added tests.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp?rev=1089595&r1=1089594&r2=1089595&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp Wed Apr 6 19:26:32 2011
@@ -49,6 +49,8 @@ namespace concurrent{
class ExecutorKernel {
public:
+ ThreadPoolExecutor* parent;
+
LinkedList<Worker*> workers;
LinkedList<Worker*> deadWorkers;
Timer cleanupTimer;
@@ -65,9 +67,13 @@ namespace concurrent{
Mutex mainLock;
CountDownLatch termination;
+ long long completedTasks;
+ int largestPoolSize;
+
public:
- ExecutorKernel(int corePoolSize, int maxPoolSize, long long keepAliveTime,
+ ExecutorKernel(ThreadPoolExecutor* parent,
+ int corePoolSize, int maxPoolSize, long long keepAliveTime,
BlockingQueue<decaf::lang::Runnable*>* workQueue);
~ExecutorKernel();
@@ -92,6 +98,8 @@ namespace concurrent{
void handleWorkerExit(Worker* worker);
+ void tryTerminate();
+
};
class Worker : public lang::Thread {
@@ -208,7 +216,7 @@ namespace concurrent{
ThreadPoolExecutor::ThreadPoolExecutor(int corePoolSize, int maxPoolSize,
long long keepAliveTime, const TimeUnit& unit,
BlockingQueue<decaf::lang::Runnable*>* workQueue) :
- kernel(new ExecutorKernel(corePoolSize, maxPoolSize, unit.toMillis(keepAliveTime), workQueue)) {
+ kernel(new ExecutorKernel(this, corePoolSize, maxPoolSize, unit.toMillis(keepAliveTime), workQueue)) {
}
////////////////////////////////////////////////////////////////////////////////
@@ -260,7 +268,12 @@ bool ThreadPoolExecutor::awaitTerminatio
////////////////////////////////////////////////////////////////////////////////
int ThreadPoolExecutor::getPoolSize() const {
- return (int)this->kernel->workers.size();
+ int result = 0;
+ synchronized(&this->kernel->mainLock) {
+ result = this->kernel->workers.size();
+ }
+
+ return result;
}
////////////////////////////////////////////////////////////////////////////////
@@ -279,6 +292,39 @@ long long ThreadPoolExecutor::getTaskCou
}
////////////////////////////////////////////////////////////////////////////////
+int ThreadPoolExecutor::getActiveCount() const {
+
+ int result = 0;
+ synchronized(&this->kernel->mainLock) {
+ if(!this->kernel->terminated.get()) {
+ result = this->kernel->workers.size() - this->kernel->freeThreads.get();
+ }
+ }
+
+ return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long ThreadPoolExecutor::getCompletedTaskCount() const {
+ long long result = 0;
+ synchronized(&this->kernel->mainLock) {
+ result = this->kernel->completedTasks;
+ }
+
+ return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ThreadPoolExecutor::getLargestPoolSize() const {
+ int result = 0;
+ synchronized(&this->kernel->mainLock) {
+ result = this->kernel->largestPoolSize;
+ }
+
+ return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
bool ThreadPoolExecutor::isShutdown() const {
return this->kernel->stopped.get();
}
@@ -293,8 +339,10 @@ void ThreadPoolExecutor::terminated() {
}
////////////////////////////////////////////////////////////////////////////////
-ExecutorKernel::ExecutorKernel(int corePoolSize, int maxPoolSize, long long keepAliveTime,
+ExecutorKernel::ExecutorKernel(ThreadPoolExecutor* parent, int corePoolSize,
+ int maxPoolSize, long long keepAliveTime,
BlockingQueue<decaf::lang::Runnable*>* workQueue) :
+ parent(parent),
workers(),
deadWorkers(),
cleanupTimer(),
@@ -307,7 +355,9 @@ ExecutorKernel::ExecutorKernel(int coreP
keepAliveTime(keepAliveTime),
workQueue(workQueue),
mainLock(),
- termination(1) {
+ termination(1),
+ completedTasks(0),
+ largestPoolSize(0) {
if(corePoolSize < 0 || maxPoolSize <= 0 ||
maxPoolSize < corePoolSize || keepAliveTime < 0) {
@@ -326,7 +376,9 @@ ExecutorKernel::ExecutorKernel(int coreP
////////////////////////////////////////////////////////////////////////////////
ExecutorKernel::~ExecutorKernel() {
try{
+
this->shutdown();
+ this->tryTerminate();
this->termination.await();
@@ -376,6 +428,7 @@ void ExecutorKernel::onTaskCompleted(Wor
try {
synchronized(&mainLock) {
freeThreads.incrementAndGet();
+ completedTasks++;
}
}
DECAF_CATCH_RETHROW( lang::Exception )
@@ -400,9 +453,7 @@ void ExecutorKernel::handleWorkerExit(Wo
this->deadWorkers.add(worker);
if(this->workers.isEmpty()) {
-
- // TODO - Notify ThreadPoolExecutor to call terminated()
-
+ this->parent->terminated();
this->terminated = true;
this->termination.countDown();
}
@@ -485,6 +536,7 @@ void ExecutorKernel::AllocateThread() {
this->workers.add(newWorker);
freeThreads.incrementAndGet();
newWorker->start();
+ this->largestPoolSize++;
}
}
DECAF_CATCH_RETHROW( lang::Exception )
@@ -537,3 +589,25 @@ bool ExecutorKernel::awaitTermination(lo
return this->termination.await(timeout, unit);
}
+
+////////////////////////////////////////////////////////////////////////////////
+void ExecutorKernel::tryTerminate() {
+
+ if (!this->isStoppedOrStopping() || (this->isStoppedOrStopping() && !this->workQueue->isEmpty())) {
+ return;
+ }
+
+ if (this->workers.size() > 0) {
+ // TODO - Once they are interruptible wake a worker.
+ return;
+ }
+
+ synchronized(&this->mainLock) {
+ try {
+ this->parent->terminated();
+ } catch(...) {}
+
+ this->terminated.set(true);
+ this->termination.countDown();
+ }
+}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h?rev=1089595&r1=1089594&r2=1089595&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h Wed Apr 6 19:26:32 2011
@@ -58,6 +58,7 @@ namespace concurrent{
private:
+ friend class ExecutorKernel;
ExecutorKernel* kernel;
public:
@@ -165,6 +166,30 @@ namespace concurrent{
virtual long long getTaskCount() const;
/**
+ * Returns an approximation of the number of threads that are currently running
+ * tasks for this executor. This value can change rapidly.
+ *
+ * @return the number of currently active threads.
+ */
+ virtual int getActiveCount() const;
+
+ /**
+ * Returns the approximate number of Tasks that have been completed by this
+ * Executor, this value never decreases.
+ *
+ * @return the number of completed tasks since creation of the Executor.
+ */
+ virtual long long getCompletedTaskCount() const;
+
+ /**
+ * Returns the most Threads that have ever been active at one time within this
+ * Executors Thread pool.
+ *
+ * @return the largest number of threads ever to coexist in this executor.
+ */
+ virtual int getLargestPoolSize() const;
+
+ /**
* Returns whether this executor has been shutdown or not.
*
* @return true if this executor has been shutdown.
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp?rev=1089595&r1=1089594&r2=1089595&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp Wed Apr 6 19:26:32 2011
@@ -95,6 +95,17 @@ namespace {
}
///////////////////////////////////////////////////////////////////////////////
+void ThreadPoolExecutorTest::testConstructor1() {
+
+ ThreadPoolExecutor pool(1, 3, 5, TimeUnit::SECONDS, new LinkedBlockingQueue<Runnable*>());
+
+ CPPUNIT_ASSERT_EQUAL(1, pool.getCorePoolSize());
+ CPPUNIT_ASSERT_EQUAL(3, pool.getMaximumPoolSize());
+ CPPUNIT_ASSERT_EQUAL(false, pool.isShutdown());
+ CPPUNIT_ASSERT_EQUAL(false, pool.isTerminated());
+}
+
+///////////////////////////////////////////////////////////////////////////////
void ThreadPoolExecutorTest::testSimpleTasks()
{
CountDownLatch myLatch( 3 );
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h?rev=1089595&r1=1089594&r2=1089595&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h Wed Apr 6 19:26:32 2011
@@ -36,6 +36,7 @@ namespace concurrent{
private:
CPPUNIT_TEST_SUITE( ThreadPoolExecutorTest );
+ CPPUNIT_TEST( testConstructor1 );
CPPUNIT_TEST( testSimpleTasks );
CPPUNIT_TEST( testMoreTasksThanMaxPoolSize );
CPPUNIT_TEST( testTasksThatThrow );
@@ -51,6 +52,7 @@ namespace concurrent{
ThreadPoolExecutorTest() {}
virtual ~ThreadPoolExecutorTest() {}
+ void testConstructor1();
void testSimpleTasks();
void testMoreTasksThanMaxPoolSize();
void testTasksThatThrow();