You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/04/06 00:33:19 UTC

svn commit: r1089273 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/decaf/util/concurrent/ test/decaf/util/concurrent/

Author: tabish
Date: Tue Apr  5 22:33:19 2011
New Revision: 1089273

URL: http://svn.apache.org/viewvc?rev=1089273&view=rev
Log:
More work on ThreadPoolExecutor, resolves the deadlock issue that was showing up occasionally and fixes several memory leaks.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp?rev=1089273&r1=1089272&r2=1089273&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp Tue Apr  5 22:33:19 2011
@@ -15,7 +15,12 @@
  * limitations under the License.
  */
 #include <decaf/util/concurrent/ThreadPoolExecutor.h>
-#include <decaf/util/concurrent/Concurrent.h>
+#include <decaf/util/concurrent/Mutex.h>
+#include <decaf/util/concurrent/CountDownLatch.h>
+#include <decaf/util/Timer.h>
+#include <decaf/util/TimerTask.h>
+#include <decaf/util/concurrent/atomic/AtomicInteger.h>
+#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 #include <decaf/lang/exceptions/IllegalArgumentException.h>
 #include <decaf/lang/exceptions/NullPointerException.h>
 #include <decaf/util/concurrent/RejectedExecutionException.h>
@@ -32,6 +37,7 @@ using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 using namespace decaf::util;
 using namespace decaf::util::concurrent;
+using namespace decaf::util::concurrent::atomic;
 
 ////////////////////////////////////////////////////////////////////////////////
 namespace decaf{
@@ -44,15 +50,20 @@ namespace concurrent{
     public:
 
         LinkedList<Worker*> workers;
+        LinkedList<Worker*> deadWorkers;
+        Timer cleanupTimer;
+
         AtomicBoolean stopping;
         AtomicBoolean stopped;
-        std::size_t freeThreads;
+        AtomicBoolean terminated;
+        AtomicInteger freeThreads;
 
         int maxPoolSize;
         int corePoolSize;
         long long keepAliveTime;
         Pointer< BlockingQueue<decaf::lang::Runnable*> > workQueue;
         Mutex mainLock;
+        CountDownLatch termination;
 
     public:
 
@@ -77,6 +88,10 @@ namespace concurrent{
 
         void shutdown();
 
+        bool awaitTermination(long long timeout, const TimeUnit& unit);
+
+        void handleWorkerExit(Worker* worker);
+
     };
 
     class Worker : public lang::Thread {
@@ -124,9 +139,13 @@ namespace concurrent{
 
                     this->busy = true;
                     this->kernel->onTaskStarted(this);
+
                     try{
                         task->run();
                     } catch(...) {}
+
+                    delete task;
+
                     this->kernel->onTaskCompleted(this);
                     this->busy = false;
                 }
@@ -143,6 +162,8 @@ namespace concurrent{
                 this->busy = false;
                 this->kernel->onTaskException(this, ex);
             }
+
+            this->kernel->handleWorkerExit(this);
         }
 
         void stop() {
@@ -155,6 +176,32 @@ namespace concurrent{
 
     };
 
+    class WorkerKiller : public TimerTask {
+    private:
+
+        ExecutorKernel* kernel;
+
+    public:
+
+        WorkerKiller(ExecutorKernel* kernel) : kernel(kernel) {
+        }
+
+        virtual ~WorkerKiller() {}
+
+        virtual void run() {
+            try{
+                synchronized(&kernel->mainLock) {
+                    Pointer< Iterator<Worker*> > iter( kernel->deadWorkers.iterator() );
+                    while(iter->hasNext()) {
+                        delete iter->next();
+                        iter->remove();
+                    }
+                }
+            } catch(...) {}
+        }
+
+    };
+
 }}}
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -202,6 +249,16 @@ void ThreadPoolExecutor::shutdown() {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+bool ThreadPoolExecutor::awaitTermination(long long timeout, const TimeUnit& unit) {
+
+    try{
+        return this->kernel->awaitTermination(timeout, unit);
+    }
+    DECAF_CATCH_RETHROW( lang::Exception )
+    DECAF_CATCHALL_THROW( lang::Exception )
+}
+
+////////////////////////////////////////////////////////////////////////////////
 int ThreadPoolExecutor::getPoolSize() const {
     return (int)this->kernel->workers.size();
 }
@@ -222,17 +279,35 @@ long long ThreadPoolExecutor::getTaskCou
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+bool ThreadPoolExecutor::isShutdown() const {
+    return this->kernel->stopped.get();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ThreadPoolExecutor::isTerminated() const {
+    return this->kernel->terminated.get();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPoolExecutor::terminated() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
 ExecutorKernel::ExecutorKernel(int corePoolSize, int maxPoolSize, long long keepAliveTime,
                                BlockingQueue<decaf::lang::Runnable*>* workQueue) :
     workers(),
+    deadWorkers(),
+    cleanupTimer(),
     stopping(false),
     stopped(false),
+    terminated(false),
     freeThreads(0),
     maxPoolSize(maxPoolSize),
     corePoolSize(corePoolSize),
     keepAliveTime(keepAliveTime),
     workQueue(workQueue),
-    mainLock() {
+    mainLock(),
+    termination(1) {
 
     if(corePoolSize < 0 || maxPoolSize <= 0 ||
        maxPoolSize < corePoolSize || keepAliveTime < 0) {
@@ -243,12 +318,28 @@ ExecutorKernel::ExecutorKernel(int coreP
     if(workQueue == NULL) {
         throw NullPointerException(__FILE__, __LINE__, "BlockingQueue pointer was null");
     }
+
+    this->cleanupTimer.scheduleAtFixedRate(
+        new WorkerKiller(this), TimeUnit::SECONDS.toMillis(10), TimeUnit::SECONDS.toMillis(10));
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 ExecutorKernel::~ExecutorKernel() {
     try{
         this->shutdown();
+
+        this->termination.await();
+
+        this->cleanupTimer.cancel();
+
+        // Ensure dead Worker Threads are destroyed, the Timer might not have
+        // run recently.
+        Pointer< Iterator<Worker*> > iter(this->deadWorkers.iterator());
+        while(iter->hasNext()) {
+            Worker* worker = iter->next();
+            worker->join();
+            delete worker;
+        }
     }
     DECAF_CATCH_NOTHROW(Exception)
     DECAF_CATCHALL_NOTHROW()
@@ -261,7 +352,7 @@ void ExecutorKernel::onTaskStarted(Worke
 
         synchronized( &mainLock ) {
 
-            freeThreads--;
+            freeThreads.decrementAndGet();
 
             // Now that this callback has decremented the free threads counter
             // lets check if there is any outstanding work to be done and no
@@ -270,7 +361,7 @@ void ExecutorKernel::onTaskStarted(Worke
             // having a chance to wake up and service the queue.  This would
             // cause the number of Task to exceed the number of free threads
             // once the Threads got a chance to wake up and service the queue
-            if( freeThreads == 0 && !workQueue->isEmpty() ) {
+            if( freeThreads.get() == 0 && !workQueue->isEmpty() ) {
                 AllocateThread();
             }
         }
@@ -283,8 +374,8 @@ void ExecutorKernel::onTaskStarted(Worke
 void ExecutorKernel::onTaskCompleted(Worker* thread DECAF_UNUSED) {
 
     try {
-        synchronized( &mainLock ) {
-            freeThreads++;
+        synchronized(&mainLock) {
+            freeThreads.incrementAndGet();
         }
     }
     DECAF_CATCH_RETHROW( lang::Exception )
@@ -292,16 +383,30 @@ void ExecutorKernel::onTaskCompleted(Wor
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ExecutorKernel::onTaskException(Worker* thread, lang::Exception& ex DECAF_UNUSED) {
+void ExecutorKernel::onTaskException(Worker* thread DECAF_UNUSED, lang::Exception& ex DECAF_UNUSED) {
 
     try{
+    }
+    DECAF_CATCH_RETHROW( Exception )
+    DECAF_CATCHALL_THROW( Exception )
+}
 
-        synchronized( &mainLock ) {
+////////////////////////////////////////////////////////////////////////////////
+void ExecutorKernel::handleWorkerExit(Worker* worker) {
 
+    synchronized( &this->mainLock ) {
+
+        this->workers.remove(worker);
+        this->deadWorkers.add(worker);
+
+        if(this->workers.isEmpty()) {
+
+            // TODO - Notify ThreadPoolExecutor to call terminated()
+
+            this->terminated = true;
+            this->termination.countDown();
         }
     }
-    DECAF_CATCH_RETHROW( Exception )
-    DECAF_CATCHALL_THROW( Exception )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -313,7 +418,7 @@ void ExecutorKernel::enQueueTask(Runnabl
 
             // If there's nobody open to do work, then create some more
             // threads to handle the work.
-            if( this->freeThreads == 0 ) {
+            if( this->freeThreads.get() == 0 ) {
                 AllocateThread();
             }
         }
@@ -378,7 +483,7 @@ void ExecutorKernel::AllocateThread() {
         synchronized( &mainLock ) {
             Worker* newWorker = new Worker(this);
             this->workers.add(newWorker);
-            freeThreads++;
+            freeThreads.incrementAndGet();
             newWorker->start();
         }
     }
@@ -417,16 +522,18 @@ void ExecutorKernel::shutdown() {
             //    // Signal the Queue so that all waiters are notified
             //    workQueue->notifyAll();
             //}
+        }
 
-            iter.reset(this->workers.iterator());
-            while(iter->hasNext()) {
-                Worker* worker = iter->next();
-                worker->join();
-                delete worker;
-            }
+        this->stopped.set(true);
+    }
+}
 
-            this->workers.clear();
-            this->stopped.set(true);
-        }
+////////////////////////////////////////////////////////////////////////////////
+bool ExecutorKernel::awaitTermination(long long timeout, const TimeUnit& unit) {
+
+    if (this->terminated.get() == true) {
+        return true;
     }
+
+    return this->termination.await(timeout, unit);
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h?rev=1089273&r1=1089272&r2=1089273&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h Tue Apr  5 22:33:19 2011
@@ -18,10 +18,9 @@
 #define _DECAF_UTIL_CONCURRENT_THREADPOOLEXECUTOR_H_
 
 #include <decaf/lang/Runnable.h>
-#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 #include <decaf/util/concurrent/ThreadFactory.h>
-#include <decaf/util/concurrent/Mutex.h>
 #include <decaf/util/concurrent/BlockingQueue.h>
+#include <decaf/util/concurrent/TimeUnit.h>
 #include <decaf/util/LinkedList.h>
 #include <decaf/util/Config.h>
 
@@ -32,7 +31,6 @@ namespace util{
 namespace concurrent{
 
     using decaf::lang::Pointer;
-    using decaf::util::concurrent::atomic::AtomicBoolean;
 
     class ExecutorKernel;
 
@@ -120,6 +118,23 @@ namespace concurrent{
         virtual void shutdown();
 
         /**
+         * The caller will block until the executor has completed termination meaning all tasks
+         * that where scheduled before shutdown have now completed and the executor is ready for
+         * deletion.  If the timeout period elapses before the executor reaches the terminated
+         * state then this method return false to indicate it has not terminated.
+         *
+         * @param timeout
+         *      The amount of time to wait before abandoning the wait for termination.
+         * @param unit
+         *      The unit of time that the timeout value represents.
+         *
+         * @return true if the executor terminated or false if the timeout expired.
+         *
+         * @throws InterruptedException if this call is interrupted while awaiting termination.
+         */
+        virtual bool awaitTermination(long long timeout, const decaf::util::concurrent::TimeUnit& unit);
+
+        /**
          * Returns the number of threads that currently exists for this Executor.
          *
          * @return the configured number of Threads in the Pool.
@@ -149,6 +164,29 @@ namespace concurrent{
          */
         virtual long long getTaskCount() const;
 
+        /**
+         * Returns whether this executor has been shutdown or not.
+         *
+         * @return true if this executor has been shutdown.
+         */
+        virtual bool isShutdown() const;
+
+        /**
+         * Returns whether all tasks have completed after this executor was shut down.
+         *
+         * @return true if all tasks have completed after a request to shut down was made.
+         */
+        virtual bool isTerminated() const;
+
+    protected:
+
+        /**
+         * Method invoked when the Executor has terminated, by default this method does
+         * nothing.  When overridden the subclass should call superclass::terminated to
+         * ensure that all subclasses have their terminated method invoked.
+         */
+        virtual void terminated();
+
     };
 
 }}}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp?rev=1089273&r1=1089272&r2=1089273&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp Tue Apr  5 22:33:19 2011
@@ -36,15 +36,15 @@ namespace {
     public:
 
         CountDownLatch* latch;
-        int value;
+        int* value;
 
-        MyTask(CountDownLatch* latch, int x) : Runnable(), latch(latch), value(x) {
+        MyTask(CountDownLatch* latch, int* x) : Runnable(), latch(latch), value(x) {
         }
 
         virtual ~MyTask() {}
 
         virtual void run() {
-            value += 100;
+            *value += 100;
             Thread::sleep(10);
             latch->countDown();
         }
@@ -99,23 +99,27 @@ void ThreadPoolExecutorTest::testSimpleT
 {
     CountDownLatch myLatch( 3 );
 
-    MyTask task1(&myLatch, 1);
-    MyTask task2(&myLatch, 2);
-    MyTask task3(&myLatch, 3);
+    int taskValue1 = 1;
+    int taskValue2 = 2;
+    int taskValue3 = 3;
+
+    MyTask* task1 = new MyTask(&myLatch, &taskValue1);
+    MyTask* task2 = new MyTask(&myLatch, &taskValue2);
+    MyTask* task3 = new MyTask(&myLatch, &taskValue3);
 
     ThreadPoolExecutor pool(1, 3, 5, TimeUnit::SECONDS, new LinkedBlockingQueue<Runnable*>());
 
-    pool.execute(&task1);
-    pool.execute(&task2);
-    pool.execute(&task3);
+    pool.execute(task1);
+    pool.execute(task2);
+    pool.execute(task3);
 
     // Wait for them to finish, if we can't do this in 30 seconds then
     // there's probably something really wrong.
     CPPUNIT_ASSERT( myLatch.await( 30000 ) );
 
-    CPPUNIT_ASSERT( task1.value == 101 );
-    CPPUNIT_ASSERT( task2.value == 102 );
-    CPPUNIT_ASSERT( task3.value == 103 );
+    CPPUNIT_ASSERT( taskValue1 == 101 );
+    CPPUNIT_ASSERT( taskValue2 == 102 );
+    CPPUNIT_ASSERT( taskValue3 == 103 );
 
     CPPUNIT_ASSERT( pool.getMaximumPoolSize() == 3 );
 
@@ -123,25 +127,64 @@ void ThreadPoolExecutorTest::testSimpleT
 }
 
 ///////////////////////////////////////////////////////////////////////////////
+void ThreadPoolExecutorTest::testAwaitTermination()
+{
+    CountDownLatch myLatch( 3 );
+
+    int taskValue1 = 1;
+    int taskValue2 = 2;
+    int taskValue3 = 3;
+
+    MyTask* task1 = new MyTask(&myLatch, &taskValue1);
+    MyTask* task2 = new MyTask(&myLatch, &taskValue2);
+    MyTask* task3 = new MyTask(&myLatch, &taskValue3);
+
+    ThreadPoolExecutor pool(1, 3, 5, TimeUnit::SECONDS, new LinkedBlockingQueue<Runnable*>());
+
+    pool.execute(task1);
+    pool.execute(task2);
+    pool.execute(task3);
+
+    // Wait for them to finish, if we can't do this in 30 seconds then
+    // there's probably something really wrong.
+    CPPUNIT_ASSERT( myLatch.await( 30000 ) );
+
+    CPPUNIT_ASSERT( taskValue1 == 101 );
+    CPPUNIT_ASSERT( taskValue2 == 102 );
+    CPPUNIT_ASSERT( taskValue3 == 103 );
+
+    CPPUNIT_ASSERT( pool.getMaximumPoolSize() == 3 );
+
+    CPPUNIT_ASSERT_EQUAL(false, pool.isShutdown());
+    CPPUNIT_ASSERT_EQUAL(false, pool.isTerminated());
+
+    pool.shutdown();
+    CPPUNIT_ASSERT_EQUAL(true, pool.isShutdown());
+    CPPUNIT_ASSERT(pool.awaitTermination(30, TimeUnit::SECONDS));
+
+    CPPUNIT_ASSERT_EQUAL(true, pool.isShutdown());
+    CPPUNIT_ASSERT_EQUAL(true, pool.isTerminated());
+}
+
+///////////////////////////////////////////////////////////////////////////////
 void ThreadPoolExecutorTest::testMoreTasksThanMaxPoolSize() {
 
     ThreadPoolExecutor pool(1, 3, 5, TimeUnit::SECONDS, new LinkedBlockingQueue<Runnable*>());
-    Mutex myMutex;
 
     CPPUNIT_ASSERT( pool.getMaximumPoolSize() == 3);
 
     CountDownLatch startedLatch1(3);  // First three should go right away
     CountDownLatch startedLatch2(1);  // The fourth one goes after others finish
 
-    MyWaitingTask task1( &myMutex, &startedLatch1 );
-    MyWaitingTask task2( &myMutex, &startedLatch1 );
-    MyWaitingTask task3( &myMutex, &startedLatch1 );
-    MyWaitingTask task4( &myMutex, &startedLatch2 );
-
-    pool.execute( &task1 );
-    pool.execute( &task2 );
-    pool.execute( &task3 );
-    pool.execute( &task4 );
+    MyWaitingTask* task1 = new MyWaitingTask( &myMutex, &startedLatch1 );
+    MyWaitingTask* task2 = new MyWaitingTask( &myMutex, &startedLatch1 );
+    MyWaitingTask* task3 = new MyWaitingTask( &myMutex, &startedLatch1 );
+    MyWaitingTask* task4 = new MyWaitingTask( &myMutex, &startedLatch2 );
+
+    pool.execute(task1);
+    pool.execute(task2);
+    pool.execute(task3);
+    pool.execute(task4);
 
     // Wait 30 seconds, then we let it fail because something is
     // probably very wrong.
@@ -175,30 +218,34 @@ void ThreadPoolExecutorTest::testTasksTh
 {
     CountDownLatch myLatch( 3 );
 
-    MyTask task1(&myLatch, 1);
-    MyTask task2(&myLatch, 2);
-    MyTask task3(&myLatch, 3);
-
-    MyExceptionTask exTask1;
-    MyExceptionTask exTask2;
-    MyExceptionTask exTask3;
+    int taskValue1 = 1;
+    int taskValue2 = 2;
+    int taskValue3 = 3;
+
+    MyTask* task1 = new MyTask(&myLatch, &taskValue1);
+    MyTask* task2 = new MyTask(&myLatch, &taskValue2);
+    MyTask* task3 = new MyTask(&myLatch, &taskValue3);
+
+    MyExceptionTask* exTask1 = new MyExceptionTask;
+    MyExceptionTask* exTask2 = new MyExceptionTask;
+    MyExceptionTask* exTask3 = new MyExceptionTask;
 
     ThreadPoolExecutor pool(1, 3, 5, TimeUnit::SECONDS, new LinkedBlockingQueue<Runnable*>());
 
-    pool.execute(&exTask1);
-    pool.execute(&task2);
-    pool.execute(&exTask2);
-    pool.execute(&task1);
-    pool.execute(&exTask3);
-    pool.execute(&task3);
+    pool.execute(exTask1);
+    pool.execute(task2);
+    pool.execute(exTask2);
+    pool.execute(task1);
+    pool.execute(exTask3);
+    pool.execute(task3);
 
     // Wait for them to finish, if we can't do this in 30 seconds then
     // there's probably something really wrong.
     CPPUNIT_ASSERT( myLatch.await( 30000 ) );
 
-    CPPUNIT_ASSERT( task1.value == 101 );
-    CPPUNIT_ASSERT( task2.value == 102 );
-    CPPUNIT_ASSERT( task3.value == 103 );
+    CPPUNIT_ASSERT( taskValue1 == 101 );
+    CPPUNIT_ASSERT( taskValue2 == 102 );
+    CPPUNIT_ASSERT( taskValue3 == 103 );
 
     CPPUNIT_ASSERT( pool.getMaximumPoolSize() == 3 );
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h?rev=1089273&r1=1089272&r2=1089273&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h Tue Apr  5 22:33:19 2011
@@ -33,13 +33,19 @@ namespace util{
 namespace concurrent{
 
     class ThreadPoolExecutorTest : public CppUnit::TestFixture {
+    private:
 
         CPPUNIT_TEST_SUITE( ThreadPoolExecutorTest );
         CPPUNIT_TEST( testSimpleTasks );
         CPPUNIT_TEST( testMoreTasksThanMaxPoolSize );
         CPPUNIT_TEST( testTasksThatThrow );
+        CPPUNIT_TEST( testAwaitTermination );
         CPPUNIT_TEST_SUITE_END();
 
+    private:
+
+        decaf::util::concurrent::Mutex myMutex;
+
     public:
 
         ThreadPoolExecutorTest() {}
@@ -48,6 +54,7 @@ namespace concurrent{
         void testSimpleTasks();
         void testMoreTasksThanMaxPoolSize();
         void testTasksThatThrow();
+        void testAwaitTermination();
 
     };