You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/04/06 00:33:19 UTC
svn commit: r1089273 - in /activemq/activemq-cpp/trunk/activemq-cpp/src:
main/decaf/util/concurrent/ test/decaf/util/concurrent/
Author: tabish
Date: Tue Apr 5 22:33:19 2011
New Revision: 1089273
URL: http://svn.apache.org/viewvc?rev=1089273&view=rev
Log:
More work on ThreadPoolExecutor, resolves the deadlock issue that was showing up occasionally and fixes several memory leaks.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp?rev=1089273&r1=1089272&r2=1089273&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp Tue Apr 5 22:33:19 2011
@@ -15,7 +15,12 @@
* limitations under the License.
*/
#include <decaf/util/concurrent/ThreadPoolExecutor.h>
-#include <decaf/util/concurrent/Concurrent.h>
+#include <decaf/util/concurrent/Mutex.h>
+#include <decaf/util/concurrent/CountDownLatch.h>
+#include <decaf/util/Timer.h>
+#include <decaf/util/TimerTask.h>
+#include <decaf/util/concurrent/atomic/AtomicInteger.h>
+#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
#include <decaf/lang/exceptions/IllegalArgumentException.h>
#include <decaf/lang/exceptions/NullPointerException.h>
#include <decaf/util/concurrent/RejectedExecutionException.h>
@@ -32,6 +37,7 @@ using namespace decaf::lang;
using namespace decaf::lang::exceptions;
using namespace decaf::util;
using namespace decaf::util::concurrent;
+using namespace decaf::util::concurrent::atomic;
////////////////////////////////////////////////////////////////////////////////
namespace decaf{
@@ -44,15 +50,20 @@ namespace concurrent{
public:
LinkedList<Worker*> workers;
+ LinkedList<Worker*> deadWorkers;
+ Timer cleanupTimer;
+
AtomicBoolean stopping;
AtomicBoolean stopped;
- std::size_t freeThreads;
+ AtomicBoolean terminated;
+ AtomicInteger freeThreads;
int maxPoolSize;
int corePoolSize;
long long keepAliveTime;
Pointer< BlockingQueue<decaf::lang::Runnable*> > workQueue;
Mutex mainLock;
+ CountDownLatch termination;
public:
@@ -77,6 +88,10 @@ namespace concurrent{
void shutdown();
+ bool awaitTermination(long long timeout, const TimeUnit& unit);
+
+ void handleWorkerExit(Worker* worker);
+
};
class Worker : public lang::Thread {
@@ -124,9 +139,13 @@ namespace concurrent{
this->busy = true;
this->kernel->onTaskStarted(this);
+
try{
task->run();
} catch(...) {}
+
+ delete task;
+
this->kernel->onTaskCompleted(this);
this->busy = false;
}
@@ -143,6 +162,8 @@ namespace concurrent{
this->busy = false;
this->kernel->onTaskException(this, ex);
}
+
+ this->kernel->handleWorkerExit(this);
}
void stop() {
@@ -155,6 +176,32 @@ namespace concurrent{
};
+ class WorkerKiller : public TimerTask {
+ private:
+
+ ExecutorKernel* kernel;
+
+ public:
+
+ WorkerKiller(ExecutorKernel* kernel) : kernel(kernel) {
+ }
+
+ virtual ~WorkerKiller() {}
+
+ virtual void run() {
+ try{
+ synchronized(&kernel->mainLock) {
+ Pointer< Iterator<Worker*> > iter( kernel->deadWorkers.iterator() );
+ while(iter->hasNext()) {
+ delete iter->next();
+ iter->remove();
+ }
+ }
+ } catch(...) {}
+ }
+
+ };
+
}}}
////////////////////////////////////////////////////////////////////////////////
@@ -202,6 +249,16 @@ void ThreadPoolExecutor::shutdown() {
}
////////////////////////////////////////////////////////////////////////////////
+bool ThreadPoolExecutor::awaitTermination(long long timeout, const TimeUnit& unit) {
+
+ try{
+ return this->kernel->awaitTermination(timeout, unit);
+ }
+ DECAF_CATCH_RETHROW( lang::Exception )
+ DECAF_CATCHALL_THROW( lang::Exception )
+}
+
+////////////////////////////////////////////////////////////////////////////////
int ThreadPoolExecutor::getPoolSize() const {
return (int)this->kernel->workers.size();
}
@@ -222,17 +279,35 @@ long long ThreadPoolExecutor::getTaskCou
}
////////////////////////////////////////////////////////////////////////////////
+bool ThreadPoolExecutor::isShutdown() const {
+ return this->kernel->stopped.get();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ThreadPoolExecutor::isTerminated() const {
+ return this->kernel->terminated.get();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPoolExecutor::terminated() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
ExecutorKernel::ExecutorKernel(int corePoolSize, int maxPoolSize, long long keepAliveTime,
BlockingQueue<decaf::lang::Runnable*>* workQueue) :
workers(),
+ deadWorkers(),
+ cleanupTimer(),
stopping(false),
stopped(false),
+ terminated(false),
freeThreads(0),
maxPoolSize(maxPoolSize),
corePoolSize(corePoolSize),
keepAliveTime(keepAliveTime),
workQueue(workQueue),
- mainLock() {
+ mainLock(),
+ termination(1) {
if(corePoolSize < 0 || maxPoolSize <= 0 ||
maxPoolSize < corePoolSize || keepAliveTime < 0) {
@@ -243,12 +318,28 @@ ExecutorKernel::ExecutorKernel(int coreP
if(workQueue == NULL) {
throw NullPointerException(__FILE__, __LINE__, "BlockingQueue pointer was null");
}
+
+ this->cleanupTimer.scheduleAtFixedRate(
+ new WorkerKiller(this), TimeUnit::SECONDS.toMillis(10), TimeUnit::SECONDS.toMillis(10));
}
////////////////////////////////////////////////////////////////////////////////
ExecutorKernel::~ExecutorKernel() {
try{
this->shutdown();
+
+ this->termination.await();
+
+ this->cleanupTimer.cancel();
+
+ // Ensure dead Worker Threads are destroyed, the Timer might not have
+ // run recently.
+ Pointer< Iterator<Worker*> > iter(this->deadWorkers.iterator());
+ while(iter->hasNext()) {
+ Worker* worker = iter->next();
+ worker->join();
+ delete worker;
+ }
}
DECAF_CATCH_NOTHROW(Exception)
DECAF_CATCHALL_NOTHROW()
@@ -261,7 +352,7 @@ void ExecutorKernel::onTaskStarted(Worke
synchronized( &mainLock ) {
- freeThreads--;
+ freeThreads.decrementAndGet();
// Now that this callback has decremented the free threads counter
// lets check if there is any outstanding work to be done and no
@@ -270,7 +361,7 @@ void ExecutorKernel::onTaskStarted(Worke
// having a chance to wake up and service the queue. This would
// cause the number of Task to exceed the number of free threads
// once the Threads got a chance to wake up and service the queue
- if( freeThreads == 0 && !workQueue->isEmpty() ) {
+ if( freeThreads.get() == 0 && !workQueue->isEmpty() ) {
AllocateThread();
}
}
@@ -283,8 +374,8 @@ void ExecutorKernel::onTaskStarted(Worke
void ExecutorKernel::onTaskCompleted(Worker* thread DECAF_UNUSED) {
try {
- synchronized( &mainLock ) {
- freeThreads++;
+ synchronized(&mainLock) {
+ freeThreads.incrementAndGet();
}
}
DECAF_CATCH_RETHROW( lang::Exception )
@@ -292,16 +383,30 @@ void ExecutorKernel::onTaskCompleted(Wor
}
////////////////////////////////////////////////////////////////////////////////
-void ExecutorKernel::onTaskException(Worker* thread, lang::Exception& ex DECAF_UNUSED) {
+void ExecutorKernel::onTaskException(Worker* thread DECAF_UNUSED, lang::Exception& ex DECAF_UNUSED) {
try{
+ }
+ DECAF_CATCH_RETHROW( Exception )
+ DECAF_CATCHALL_THROW( Exception )
+}
- synchronized( &mainLock ) {
+////////////////////////////////////////////////////////////////////////////////
+void ExecutorKernel::handleWorkerExit(Worker* worker) {
+ synchronized( &this->mainLock ) {
+
+ this->workers.remove(worker);
+ this->deadWorkers.add(worker);
+
+ if(this->workers.isEmpty()) {
+
+ // TODO - Notify ThreadPoolExecutor to call terminated()
+
+ this->terminated = true;
+ this->termination.countDown();
}
}
- DECAF_CATCH_RETHROW( Exception )
- DECAF_CATCHALL_THROW( Exception )
}
////////////////////////////////////////////////////////////////////////////////
@@ -313,7 +418,7 @@ void ExecutorKernel::enQueueTask(Runnabl
// If there's nobody open to do work, then create some more
// threads to handle the work.
- if( this->freeThreads == 0 ) {
+ if( this->freeThreads.get() == 0 ) {
AllocateThread();
}
}
@@ -378,7 +483,7 @@ void ExecutorKernel::AllocateThread() {
synchronized( &mainLock ) {
Worker* newWorker = new Worker(this);
this->workers.add(newWorker);
- freeThreads++;
+ freeThreads.incrementAndGet();
newWorker->start();
}
}
@@ -417,16 +522,18 @@ void ExecutorKernel::shutdown() {
// // Signal the Queue so that all waiters are notified
// workQueue->notifyAll();
//}
+ }
- iter.reset(this->workers.iterator());
- while(iter->hasNext()) {
- Worker* worker = iter->next();
- worker->join();
- delete worker;
- }
+ this->stopped.set(true);
+ }
+}
- this->workers.clear();
- this->stopped.set(true);
- }
+////////////////////////////////////////////////////////////////////////////////
+bool ExecutorKernel::awaitTermination(long long timeout, const TimeUnit& unit) {
+
+ if (this->terminated.get() == true) {
+ return true;
}
+
+ return this->termination.await(timeout, unit);
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h?rev=1089273&r1=1089272&r2=1089273&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h Tue Apr 5 22:33:19 2011
@@ -18,10 +18,9 @@
#define _DECAF_UTIL_CONCURRENT_THREADPOOLEXECUTOR_H_
#include <decaf/lang/Runnable.h>
-#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
#include <decaf/util/concurrent/ThreadFactory.h>
-#include <decaf/util/concurrent/Mutex.h>
#include <decaf/util/concurrent/BlockingQueue.h>
+#include <decaf/util/concurrent/TimeUnit.h>
#include <decaf/util/LinkedList.h>
#include <decaf/util/Config.h>
@@ -32,7 +31,6 @@ namespace util{
namespace concurrent{
using decaf::lang::Pointer;
- using decaf::util::concurrent::atomic::AtomicBoolean;
class ExecutorKernel;
@@ -120,6 +118,23 @@ namespace concurrent{
virtual void shutdown();
/**
+ * The caller will block until the executor has completed termination meaning all tasks
+ * that where scheduled before shutdown have now completed and the executor is ready for
+ * deletion. If the timeout period elapses before the executor reaches the terminated
+ * state then this method return false to indicate it has not terminated.
+ *
+ * @param timeout
+ * The amount of time to wait before abandoning the wait for termination.
+ * @param unit
+ * The unit of time that the timeout value represents.
+ *
+ * @return true if the executor terminated or false if the timeout expired.
+ *
+ * @throws InterruptedException if this call is interrupted while awaiting termination.
+ */
+ virtual bool awaitTermination(long long timeout, const decaf::util::concurrent::TimeUnit& unit);
+
+ /**
* Returns the number of threads that currently exists for this Executor.
*
* @return the configured number of Threads in the Pool.
@@ -149,6 +164,29 @@ namespace concurrent{
*/
virtual long long getTaskCount() const;
+ /**
+ * Returns whether this executor has been shutdown or not.
+ *
+ * @return true if this executor has been shutdown.
+ */
+ virtual bool isShutdown() const;
+
+ /**
+ * Returns whether all tasks have completed after this executor was shut down.
+ *
+ * @return true if all tasks have completed after a request to shut down was made.
+ */
+ virtual bool isTerminated() const;
+
+ protected:
+
+ /**
+ * Method invoked when the Executor has terminated, by default this method does
+ * nothing. When overridden the subclass should call superclass::terminated to
+ * ensure that all subclasses have their terminated method invoked.
+ */
+ virtual void terminated();
+
};
}}}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp?rev=1089273&r1=1089272&r2=1089273&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp Tue Apr 5 22:33:19 2011
@@ -36,15 +36,15 @@ namespace {
public:
CountDownLatch* latch;
- int value;
+ int* value;
- MyTask(CountDownLatch* latch, int x) : Runnable(), latch(latch), value(x) {
+ MyTask(CountDownLatch* latch, int* x) : Runnable(), latch(latch), value(x) {
}
virtual ~MyTask() {}
virtual void run() {
- value += 100;
+ *value += 100;
Thread::sleep(10);
latch->countDown();
}
@@ -99,23 +99,27 @@ void ThreadPoolExecutorTest::testSimpleT
{
CountDownLatch myLatch( 3 );
- MyTask task1(&myLatch, 1);
- MyTask task2(&myLatch, 2);
- MyTask task3(&myLatch, 3);
+ int taskValue1 = 1;
+ int taskValue2 = 2;
+ int taskValue3 = 3;
+
+ MyTask* task1 = new MyTask(&myLatch, &taskValue1);
+ MyTask* task2 = new MyTask(&myLatch, &taskValue2);
+ MyTask* task3 = new MyTask(&myLatch, &taskValue3);
ThreadPoolExecutor pool(1, 3, 5, TimeUnit::SECONDS, new LinkedBlockingQueue<Runnable*>());
- pool.execute(&task1);
- pool.execute(&task2);
- pool.execute(&task3);
+ pool.execute(task1);
+ pool.execute(task2);
+ pool.execute(task3);
// Wait for them to finish, if we can't do this in 30 seconds then
// there's probably something really wrong.
CPPUNIT_ASSERT( myLatch.await( 30000 ) );
- CPPUNIT_ASSERT( task1.value == 101 );
- CPPUNIT_ASSERT( task2.value == 102 );
- CPPUNIT_ASSERT( task3.value == 103 );
+ CPPUNIT_ASSERT( taskValue1 == 101 );
+ CPPUNIT_ASSERT( taskValue2 == 102 );
+ CPPUNIT_ASSERT( taskValue3 == 103 );
CPPUNIT_ASSERT( pool.getMaximumPoolSize() == 3 );
@@ -123,25 +127,64 @@ void ThreadPoolExecutorTest::testSimpleT
}
///////////////////////////////////////////////////////////////////////////////
+void ThreadPoolExecutorTest::testAwaitTermination()
+{
+ CountDownLatch myLatch( 3 );
+
+ int taskValue1 = 1;
+ int taskValue2 = 2;
+ int taskValue3 = 3;
+
+ MyTask* task1 = new MyTask(&myLatch, &taskValue1);
+ MyTask* task2 = new MyTask(&myLatch, &taskValue2);
+ MyTask* task3 = new MyTask(&myLatch, &taskValue3);
+
+ ThreadPoolExecutor pool(1, 3, 5, TimeUnit::SECONDS, new LinkedBlockingQueue<Runnable*>());
+
+ pool.execute(task1);
+ pool.execute(task2);
+ pool.execute(task3);
+
+ // Wait for them to finish, if we can't do this in 30 seconds then
+ // there's probably something really wrong.
+ CPPUNIT_ASSERT( myLatch.await( 30000 ) );
+
+ CPPUNIT_ASSERT( taskValue1 == 101 );
+ CPPUNIT_ASSERT( taskValue2 == 102 );
+ CPPUNIT_ASSERT( taskValue3 == 103 );
+
+ CPPUNIT_ASSERT( pool.getMaximumPoolSize() == 3 );
+
+ CPPUNIT_ASSERT_EQUAL(false, pool.isShutdown());
+ CPPUNIT_ASSERT_EQUAL(false, pool.isTerminated());
+
+ pool.shutdown();
+ CPPUNIT_ASSERT_EQUAL(true, pool.isShutdown());
+ CPPUNIT_ASSERT(pool.awaitTermination(30, TimeUnit::SECONDS));
+
+ CPPUNIT_ASSERT_EQUAL(true, pool.isShutdown());
+ CPPUNIT_ASSERT_EQUAL(true, pool.isTerminated());
+}
+
+///////////////////////////////////////////////////////////////////////////////
void ThreadPoolExecutorTest::testMoreTasksThanMaxPoolSize() {
ThreadPoolExecutor pool(1, 3, 5, TimeUnit::SECONDS, new LinkedBlockingQueue<Runnable*>());
- Mutex myMutex;
CPPUNIT_ASSERT( pool.getMaximumPoolSize() == 3);
CountDownLatch startedLatch1(3); // First three should go right away
CountDownLatch startedLatch2(1); // The fourth one goes after others finish
- MyWaitingTask task1( &myMutex, &startedLatch1 );
- MyWaitingTask task2( &myMutex, &startedLatch1 );
- MyWaitingTask task3( &myMutex, &startedLatch1 );
- MyWaitingTask task4( &myMutex, &startedLatch2 );
-
- pool.execute( &task1 );
- pool.execute( &task2 );
- pool.execute( &task3 );
- pool.execute( &task4 );
+ MyWaitingTask* task1 = new MyWaitingTask( &myMutex, &startedLatch1 );
+ MyWaitingTask* task2 = new MyWaitingTask( &myMutex, &startedLatch1 );
+ MyWaitingTask* task3 = new MyWaitingTask( &myMutex, &startedLatch1 );
+ MyWaitingTask* task4 = new MyWaitingTask( &myMutex, &startedLatch2 );
+
+ pool.execute(task1);
+ pool.execute(task2);
+ pool.execute(task3);
+ pool.execute(task4);
// Wait 30 seconds, then we let it fail because something is
// probably very wrong.
@@ -175,30 +218,34 @@ void ThreadPoolExecutorTest::testTasksTh
{
CountDownLatch myLatch( 3 );
- MyTask task1(&myLatch, 1);
- MyTask task2(&myLatch, 2);
- MyTask task3(&myLatch, 3);
-
- MyExceptionTask exTask1;
- MyExceptionTask exTask2;
- MyExceptionTask exTask3;
+ int taskValue1 = 1;
+ int taskValue2 = 2;
+ int taskValue3 = 3;
+
+ MyTask* task1 = new MyTask(&myLatch, &taskValue1);
+ MyTask* task2 = new MyTask(&myLatch, &taskValue2);
+ MyTask* task3 = new MyTask(&myLatch, &taskValue3);
+
+ MyExceptionTask* exTask1 = new MyExceptionTask;
+ MyExceptionTask* exTask2 = new MyExceptionTask;
+ MyExceptionTask* exTask3 = new MyExceptionTask;
ThreadPoolExecutor pool(1, 3, 5, TimeUnit::SECONDS, new LinkedBlockingQueue<Runnable*>());
- pool.execute(&exTask1);
- pool.execute(&task2);
- pool.execute(&exTask2);
- pool.execute(&task1);
- pool.execute(&exTask3);
- pool.execute(&task3);
+ pool.execute(exTask1);
+ pool.execute(task2);
+ pool.execute(exTask2);
+ pool.execute(task1);
+ pool.execute(exTask3);
+ pool.execute(task3);
// Wait for them to finish, if we can't do this in 30 seconds then
// there's probably something really wrong.
CPPUNIT_ASSERT( myLatch.await( 30000 ) );
- CPPUNIT_ASSERT( task1.value == 101 );
- CPPUNIT_ASSERT( task2.value == 102 );
- CPPUNIT_ASSERT( task3.value == 103 );
+ CPPUNIT_ASSERT( taskValue1 == 101 );
+ CPPUNIT_ASSERT( taskValue2 == 102 );
+ CPPUNIT_ASSERT( taskValue3 == 103 );
CPPUNIT_ASSERT( pool.getMaximumPoolSize() == 3 );
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h?rev=1089273&r1=1089272&r2=1089273&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h Tue Apr 5 22:33:19 2011
@@ -33,13 +33,19 @@ namespace util{
namespace concurrent{
class ThreadPoolExecutorTest : public CppUnit::TestFixture {
+ private:
CPPUNIT_TEST_SUITE( ThreadPoolExecutorTest );
CPPUNIT_TEST( testSimpleTasks );
CPPUNIT_TEST( testMoreTasksThanMaxPoolSize );
CPPUNIT_TEST( testTasksThatThrow );
+ CPPUNIT_TEST( testAwaitTermination );
CPPUNIT_TEST_SUITE_END();
+ private:
+
+ decaf::util::concurrent::Mutex myMutex;
+
public:
ThreadPoolExecutorTest() {}
@@ -48,6 +54,7 @@ namespace concurrent{
void testSimpleTasks();
void testMoreTasksThanMaxPoolSize();
void testTasksThatThrow();
+ void testAwaitTermination();
};