You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/07/18 16:28:05 UTC

svn commit: r1147898 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/ main/decaf/util/concurrent/ test/decaf/util/concurrent/

Author: tabish
Date: Mon Jul 18 14:28:03 2011
New Revision: 1147898

URL: http://svn.apache.org/viewvc?rev=1147898&view=rev
Log:
A nearly complete implementation of the ThreadPoolExecutor class and some additions to the concurrent package.

Added:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RunnableFuture.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RunnableFuture.h   (with props)
Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executor.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Future.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTestSupport.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?rev=1147898&r1=1147897&r2=1147898&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Mon Jul 18 14:28:03 2011
@@ -451,6 +451,7 @@ cc_sources = \
     decaf/util/concurrent/Lock.cpp \
     decaf/util/concurrent/Mutex.cpp \
     decaf/util/concurrent/RejectedExecutionHandler.cpp \
+    decaf/util/concurrent/RunnableFuture.cpp \
     decaf/util/concurrent/Semaphore.cpp \
     decaf/util/concurrent/SynchronousQueue.cpp \
     decaf/util/concurrent/ThreadFactory.cpp \
@@ -1047,6 +1048,7 @@ h_sources = \
     decaf/util/concurrent/Mutex.h \
     decaf/util/concurrent/RejectedExecutionException.h \
     decaf/util/concurrent/RejectedExecutionHandler.h \
+    decaf/util/concurrent/RunnableFuture.h \
     decaf/util/concurrent/Semaphore.h \
     decaf/util/concurrent/Synchronizable.h \
     decaf/util/concurrent/SynchronousQueue.h \

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executor.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executor.h?rev=1147898&r1=1147897&r2=1147898&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executor.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executor.h Mon Jul 18 14:28:03 2011
@@ -90,18 +90,37 @@ namespace concurrent {
         virtual ~Executor() {}
 
         /**
+         * This method is the same as calling the two param execute method and passing
+         * true as the second argument.
+         *
+         * @param command
+         *      The runnable task to be executed.
+         *
+         * @throws RejectedExecutionException if this task cannot be
+         *         accepted for execution.
+         *
+         * @throws NullPointerException if command is null
+         */
+
+        virtual void execute(decaf::lang::Runnable* command) = 0;
+
+        /**
          * Executes the given command at some time in the future.  The command
          * may execute in a new thread, in a pooled thread, or in the calling
          * thread, at the discretion of the <tt>Executor</tt> implementation.
          *
-         * @param command the runnable task
+         * @param command
+         *      The runnable task to be executed.
+         * @param takeOwnership
+         *      Indicates if the Executor should assume ownership of the task and
+         *      delete the pointer once the task has completed.
          *
          * @throws RejectedExecutionException if this task cannot be
          *         accepted for execution.
          *
          * @throws NullPointerException if command is null
          */
-        virtual void execute( decaf::lang::Runnable* command ) = 0;
+        virtual void execute(decaf::lang::Runnable* command, bool takeOwnership) = 0;
 
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Future.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Future.h?rev=1147898&r1=1147897&r2=1147898&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Future.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Future.h Mon Jul 18 14:28:03 2011
@@ -26,7 +26,7 @@ namespace decaf {
 namespace util {
 namespace concurrent {
 
-    class FutureType {
+    class DECAF_API FutureType {
     public:
 
         virtual ~FutureType() {}

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RunnableFuture.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RunnableFuture.cpp?rev=1147898&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RunnableFuture.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RunnableFuture.cpp Mon Jul 18 14:28:03 2011
@@ -0,0 +1,18 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RunnableFuture.h"

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RunnableFuture.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RunnableFuture.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RunnableFuture.h?rev=1147898&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RunnableFuture.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RunnableFuture.h Mon Jul 18 14:28:03 2011
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _DECAF_UTIL_CONCURRENT_RUNNABLEFUTURE_H_
+#define _DECAF_UTIL_CONCURRENT_RUNNABLEFUTURE_H_
+
+#include <decaf/util/Config.h>
+
+#include <decaf/lang/Runnable.h>
+#include <decaf/util/concurrent/Future.h>
+
+namespace decaf {
+namespace util {
+namespace concurrent {
+
+    /**
+     * A Runnable version of the Future type.  When the run method has completed
+     * successfully the Future will be considered complete and its get method will
+     * return the produced result.
+     *
+     * @since 1.0
+     */
+    template<typename T>
+    class RunnableFuture : public Future<T>, public decaf::lang::Runnable {
+    public:
+
+        virtual ~RunnableFuture() {}
+
+    };
+
+}}}
+
+#endif /* _DECAF_UTIL_CONCURRENT_RUNNABLEFUTURE_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RunnableFuture.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp?rev=1147898&r1=1147897&r2=1147898&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp Mon Jul 18 14:28:03 2011
@@ -21,13 +21,16 @@
 #include <decaf/util/LinkedList.h>
 #include <decaf/util/Timer.h>
 #include <decaf/util/TimerTask.h>
-#include <decaf/util/concurrent/Mutex.h>
+#include <decaf/util/concurrent/Future.h>
+#include <decaf/util/concurrent/locks/ReentrantLock.h>
+#include <decaf/util/concurrent/locks/AbstractQueuedSynchronizer.h>
 #include <decaf/util/concurrent/CountDownLatch.h>
 #include <decaf/util/concurrent/atomic/AtomicInteger.h>
 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 #include <decaf/util/concurrent/RejectedExecutionException.h>
 #include <decaf/util/concurrent/RejectedExecutionHandler.h>
 #include <decaf/util/concurrent/Executors.h>
+#include <decaf/lang/Throwable.h>
 #include <decaf/lang/Pointer.h>
 #include <decaf/lang/Math.h>
 #include <decaf/lang/exceptions/IllegalArgumentException.h>
@@ -43,35 +46,250 @@ using namespace decaf::lang::exceptions;
 using namespace decaf::util;
 using namespace decaf::util::concurrent;
 using namespace decaf::util::concurrent::atomic;
+using namespace decaf::util::concurrent::locks;
 
 ////////////////////////////////////////////////////////////////////////////////
 namespace decaf{
 namespace util{
 namespace concurrent{
 
-    class Worker;
-
+    /**
+     * The main pool control state, ctl, is an atomic integer packing
+     * two conceptual fields
+     *   workerCount, indicating the effective number of threads
+     *   runState,    indicating whether running, shutting down etc
+     *
+     * In order to pack them into one int, we limit workerCount to
+     * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
+     * billion) otherwise representable. If this is ever an issue in
+     * the future, the variable can be changed to be an AtomicLong,
+     * and the shift/mask constants below adjusted. But until the need
+     * arises, this code is a bit faster and simpler using an int.
+     *
+     * The workerCount is the number of workers that have been
+     * permitted to start and not permitted to stop.  The value may be
+     * transiently different from the actual number of live threads,
+     * for example when a ThreadFactory fails to create a thread when
+     * asked, and when exiting threads are still performing
+     * bookkeeping before terminating. The user-visible pool size is
+     * reported as the current size of the workers set.
+     *
+     * The runState provides the main lifecyle control, taking on values:
+     *
+     *   RUNNING:  Accept new tasks and process queued tasks
+     *   SHUTDOWN: Don't accept new tasks, but process queued tasks
+     *   STOP:     Don't accept new tasks, don't process queued tasks,
+     *             and interrupt in-progress tasks
+     *   TIDYING:  All tasks have terminated, workerCount is zero,
+     *             the thread transitioning to state TIDYING
+     *             will run the terminated() hook method
+     *   TERMINATED: terminated() has completed
+     *
+     * The numerical order among these values matters, to allow
+     * ordered comparisons. The runState monotonically increases over
+     * time, but need not hit each state. The transitions are:
+     *
+     * RUNNING -> SHUTDOWN
+     *    On invocation of shutdown(), perhaps implicitly in finalize()
+     * (RUNNING or SHUTDOWN) -> STOP
+     *    On invocation of shutdownNow()
+     * SHUTDOWN -> TIDYING
+     *    When both queue and pool are empty
+     * STOP -> TIDYING
+     *    When pool is empty
+     * TIDYING -> TERMINATED
+     *    When the terminated() hook method has completed
+     *
+     * Threads waiting in awaitTermination() will return when the
+     * state reaches TERMINATED.
+     *
+     * Detecting the transition from SHUTDOWN to TIDYING is less
+     * straightforward than you'd like because the queue may become
+     * empty after non-empty and vice versa during SHUTDOWN state, but
+     * we can only terminate if, after seeing that it is empty, we see
+     * that workerCount is 0 (which sometimes entails a recheck -- see
+     * below).
+     */
     class ExecutorKernel {
+    private:
+
+        /**
+         * The worker class does a small amount of Bookkeeping and provides a locking point
+         * for the kernel to access the running task.
+         */
+        class Worker : public AbstractQueuedSynchronizer, public Runnable {
+        private:
+
+            Pointer<Thread> thread;
+            Runnable* firstTask;
+            decaf::util::concurrent::ExecutorKernel* kernel;
+            long long completedTasks;
+
+            friend class ExecutorKernel;
+
+        private:
+
+            Worker( const Worker& );
+            Worker& operator= ( const Worker& );
+
+         public:
+
+            Worker(ExecutorKernel* kernel, Runnable* task) :
+                AbstractQueuedSynchronizer(), Runnable(), thread(), firstTask(task), kernel(kernel), completedTasks(0) {
+
+                if( kernel == NULL ) {
+                    throw IllegalArgumentException( __FILE__, __LINE__,
+                        "ThreadPoolExecutor Worker requires non-NULL pointer to parent ExecutorKernel");
+                }
+
+                this->thread.reset(kernel->factory->newThread(this));
+            }
+
+            ~Worker() {}
+
+            void run() {
+                // Delegate the running of this task to the Kernel so that all the logic
+                // for task execution and cleanup is contained in one place.
+                this->kernel->runWorker(this);
+            }
+
+            virtual void lock() {
+                acquire(1);
+            }
+
+            virtual bool tryLock() {
+                return tryAcquire(1);
+            }
+
+            virtual void unlock() {
+                release(1);
+            }
+
+            virtual bool isLocked() {
+                return isHeldExclusively();
+            }
+
+         protected:
+
+            virtual bool isHeldExclusively() {
+                return getState() == 1;
+            }
+
+            virtual bool tryAcquire(int unused DECAF_UNUSED) {
+                if (compareAndSetState(0, 1)) {
+                    setExclusiveOwnerThread(Thread::currentThread());
+                    return true;
+                }
+                return false;
+            }
+
+            virtual bool tryRelease(int unused DECAF_UNUSED) {
+                this->setExclusiveOwnerThread(NULL);
+                this->setState(0);
+                return true;
+            }
+        };
+
+        /**
+         * TimerTask implementation used to clean up Worker objects that have terminated
+         * for some reason.  Since they can't delete themselves the cleanup is delegated
+         * to the Timer's thread.
+         */
+        class WorkerKiller : public TimerTask {
+        private:
+
+            ExecutorKernel* kernel;
+
+        public:
+
+            WorkerKiller(ExecutorKernel* kernel) : kernel(kernel) {
+            }
+
+            virtual ~WorkerKiller() {}
+
+            virtual void run() {
+                kernel->mainLock.lock();
+                try{
+                    Pointer< Iterator<Worker*> > iter( kernel->deadWorkers.iterator() );
+                    while(iter->hasNext()) {
+                        delete iter->next();
+                        iter->remove();
+                    }
+                } catch(...) {}
+                kernel->mainLock.unlock();
+            }
+        };
+
     public:
 
+        static const int COUNT_BITS;
+        static const int CAPACITY;
+
+        // runState is stored in the high-order bits
+        static const int RUNNING;
+        static const int SHUTDOWN;
+        static const int STOP;
+        static const int TIDYING;
+        static const int TERMINATED;
+
+        static const bool ONLY_ONE;
+
+        AtomicInteger ctl;
+
         ThreadPoolExecutor* parent;
 
+        /**
+         * List containing all worker threads in pool. Accessed only when holding mainLock.
+         */
         LinkedList<Worker*> workers;
+
+        /**
+         * List to hold Worker object that have terminated for some reason. Usually this is
+         * because of a call to setMaximumPoolSize or setCorePoolSize but can also occur
+         * because of an exception from a task that the worker was running.
+         */
         LinkedList<Worker*> deadWorkers;
-        Timer cleanupTimer;
 
-        AtomicBoolean stopping;
-        AtomicBoolean stopped;
-        AtomicBoolean terminated;
-        AtomicInteger freeThreads;
+        /**
+         * Timer used to periodically clean up the dead worker objects.  They must be cleaned
+         * up on a separate thread because the Worker generally adds itself to the deadWorkers
+         * list from the context of its run method and cannot delete itself.
+         */
+        Timer cleanupTimer;
 
         int maxPoolSize;
         int corePoolSize;
         long long keepAliveTime;
         bool coreThreadsCanTimeout;
+
+        /**
+         * The queue used for holding tasks and handing off to worker threads.
+         * We do not require that workQueue.poll() returning NULL necessarily
+         * means that workQueue.isEmpty(), so rely solely on isEmpty to see if
+         * the queue is empty (which we must do for example when deciding whether
+         * to transition from SHUTDOWN to TIDYING).  This accommodates special-
+         * purpose queues such as DelayQueues for which poll() is allowed to
+         * return NULL even if it may later return non-NULL when delays expire.
+         */
         Pointer< BlockingQueue<decaf::lang::Runnable*> > workQueue;
-        Mutex mainLock;
-        CountDownLatch termination;
+
+        /**
+         * Lock held on access to workers set and related bookkeeping. While we could
+         * use a concurrent set of some sort, it turns out to be generally preferable
+         * to use a lock. Among the reasons is that this serializes interruptIdleWorkers,
+         * which avoids unnecessary interrupt storms, especially during shutdown.
+         * Otherwise exiting threads would concurrently interrupt those that have not
+         * yet interrupted. It also simplifies some of the associated statistics
+         * bookkeeping of largestPoolSize etc. We also hold mainLock on shutdown and
+         * shutdownNow, for the sake of ensuring workers set is stable while separately
+         * interrupting.
+         */
+        ReentrantLock mainLock;
+
+        /**
+         * Wait condition to support awaitTermination
+         */
+        Pointer<Condition> termination;
 
         long long completedTasks;
         int largestPoolSize;
@@ -84,155 +302,949 @@ namespace concurrent{
         ExecutorKernel(ThreadPoolExecutor* parent,
                        int corePoolSize, int maxPoolSize, long long keepAliveTime,
                        BlockingQueue<decaf::lang::Runnable*>* workQueue,
-                       ThreadFactory* threadFactory, RejectedExecutionHandler* handler);
+                       ThreadFactory* threadFactory, RejectedExecutionHandler* handler) :
+           ctl(ctlOf(RUNNING, 0)),
+           parent(parent),
+           workers(),
+           deadWorkers(),
+           cleanupTimer(),
+           maxPoolSize(maxPoolSize),
+           corePoolSize(corePoolSize),
+           keepAliveTime(keepAliveTime),
+           coreThreadsCanTimeout(false),
+           workQueue(),
+           mainLock(),
+           termination(),
+           completedTasks(0),
+           largestPoolSize(0),
+           factory(),
+           rejectionHandler() {
+
+           if(corePoolSize < 0 || maxPoolSize <= 0 ||
+              maxPoolSize < corePoolSize || keepAliveTime < 0) {
+
+               throw IllegalArgumentException(__FILE__, __LINE__, "Argument out of range.");
+           }
+
+           if(workQueue == NULL || threadFactory == NULL || handler == NULL) {
+               throw NullPointerException(__FILE__, __LINE__, "Required parameter was NULL");
+           }
+
+           this->cleanupTimer.scheduleAtFixedRate(
+               new WorkerKiller(this), TimeUnit::SECONDS.toMillis(10), TimeUnit::SECONDS.toMillis(10));
+
+           this->workQueue.reset(workQueue);
+           this->factory.reset(threadFactory);
+           this->rejectionHandler.reset(handler);
+           this->termination.reset(this->mainLock.newCondition());
+        }
+
+        ~ExecutorKernel() {
+            try{
+
+                // Turn off the cleanup timer first so that it doesn't fire while
+                // we transition all the remaining workers into the dead workers
+                // queue while can lead to lock contention.
+                this->cleanupTimer.cancel();
+
+                this->shutdown();
+                this->awaitTermination();
+
+                // Ensure dead Worker Threads are destroyed, the Timer might not have
+                // run recently.
+                Pointer< Iterator<Worker*> > iter(this->deadWorkers.iterator());
+                while(iter->hasNext()) {
+                    Worker* worker = iter->next();
+                    worker->thread->join();
+                    delete worker;
+                }
+            }
+            DECAF_CATCH_NOTHROW(Exception)
+            DECAF_CATCHALL_NOTHROW()
+        }
+
+        // Packing and unpacking ctl
+        static int runStateOf(int c) {
+            return c & ~CAPACITY;
+        }
 
-        ~ExecutorKernel();
+        static int workerCountOf(int c) {
+            return c & CAPACITY;
+        }
 
-        void onTaskStarted(Worker* thread);
+        static int ctlOf(int rs, int wc) {
+            return rs | wc;
+        }
 
-        void onTaskCompleted(Worker* thread);
+        int getPoolSize() {
+            mainLock.lock();
+            try {
+                // Remove rare and surprising possibility of
+                // isTerminated() && getPoolSize() > 0
+                return runStateAtLeast(ctl.get(), TIDYING) ? 0 : workers.size();
+            } catch(Exception& ex) {
+                mainLock.unlock();
+                throw;
+            }
+            mainLock.unlock();
+        }
 
-        void onTaskException(Worker* thread, lang::Exception& ex);
+        int getActiveCount() {
+            mainLock.lock();
+            try {
+                int n = 0;
+                Pointer< Iterator<Worker*> > iter(workers.iterator());
+                while(iter->hasNext()) {
+                    Worker* worker = iter->next();
+                    if (worker->isLocked()) {
+                        ++n;
+                    }
+                }
+                return n;
+            } catch(Exception& ex) {
+                mainLock.unlock();
+                throw;
+            }
+            mainLock.unlock();
+        }
 
-        void enQueueTask(Runnable* task);
+        int getLargestPoolSize() {
+            mainLock.lock();
+            try {
+                return largestPoolSize;
+            } catch(Exception& ex) {
+                mainLock.unlock();
+                throw;
+            }
+            mainLock.unlock();
+        }
 
-        Runnable* deQueueTask();
+        long long getTaskCount() {
+            mainLock.lock();
+            try {
+                long long n = completedTasks;
+                Pointer< Iterator<Worker*> > iter(workers.iterator());
+                while(iter->hasNext()) {
+                    Worker* worker = iter->next();
+                    n += worker->completedTasks;
+                    if (worker->isLocked()) {
+                        ++n;
+                    }
+                }
+                return n + workQueue->size();
+            } catch(Exception& ex) {
+                mainLock.unlock();
+                throw;
+            }
+            mainLock.unlock();
+        }
 
-        bool addWorker();
+        long long getCompletedTaskCount() {
+            mainLock.lock();
+            try {
+                long long n = completedTasks;
+                Pointer< Iterator<Worker*> > iter(workers.iterator());
+                while(iter->hasNext()) {
+                    Worker* worker = iter->next();
+                    n += worker->completedTasks;
+                }
+                return n;
+            } catch(Exception& ex) {
+                mainLock.unlock();
+                throw;
+            }
+            mainLock.unlock();
+        }
 
-        int addAllWorkers();
+        /**
+         * Transitions to TERMINATED state if either (SHUTDOWN and pool
+         * and queue empty) or (STOP and pool empty).  If otherwise
+         * eligible to terminate but workerCount is nonzero, interrupts an
+         * idle worker to ensure that shutdown signals propagate. This
+         * method must be called following any action that might make
+         * termination possible -- reducing worker count or removing tasks
+         * from the queue during shutdown. The method is non-private to
+         * allow access from ScheduledThreadPoolExecutor.
+         */
+        void tryTerminate() {
+            for (;;) {
+                int c = ctl.get();
+                if (isRunning(c) ||
+                    runStateAtLeast(c, TIDYING) ||
+                    (runStateOf(c) == SHUTDOWN && !workQueue->isEmpty())) {
+                    return;
+                }
 
-        bool isStoppedOrStopping();
+                if (workerCountOf(c) != 0) { // Eligible to terminate
+                    interruptIdleWorkers(ONLY_ONE);
+                    return;
+                }
 
-        void shutdown();
+                mainLock.lock();
+                try {
+                    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
+                        try {
+                            this->parent->terminated();
+                        } catch(Exception& ex) {
+                            ctl.set(ctlOf(TERMINATED, 0));
+                            termination->signalAll();
+                            mainLock.unlock();
+                            throw;
+                        }
 
-        void shutdownNow(ArrayList<Runnable*>& unexecutedTasks);
+                        ctl.set(ctlOf(TERMINATED, 0));
+                        termination->signalAll();
+                        mainLock.unlock();
+                        return;
+                    }
+                } catch(Exception& ex) {
+                    mainLock.unlock();
+                    throw;
+                }
+                mainLock.unlock();
+                // else retry on failed CAS
+            }
+        }
 
-        bool awaitTermination(long long timeout, const TimeUnit& unit);
+        /**
+         * Force an interrupt of all threads even if they are currently active.
+         */
+        void interruptWorkers() {
+            mainLock.lock();
+            try {
+                Pointer< Iterator<Worker*> > iter(this->workers.iterator());
+                while(iter->hasNext()) {
+                    iter->next()->thread->interrupt();
+                }
+            } catch(Exception& ex) {
+                mainLock.unlock();
+                throw;
+            }
+            mainLock.unlock();
+        }
 
-        void handleWorkerExit(Worker* worker);
+        /**
+         * Interrupts threads that might be waiting for tasks (as indicated by not
+         * being locked) so they can check for termination or configuration changes.
+         *
+         * @param onlyOne
+         *      If true, interrupt at most one worker. This is called only from
+         *      tryTerminate when termination is otherwise enabled but there are
+         *      still other workers.  In this case, at most one waiting worker is
+         *      interrupted to propagate shutdown signals in case all threads are
+         *      currently waiting.  Interrupting any arbitrary thread ensures that
+         *      newly arriving workers since shutdown began will also eventually exit.
+         *      To guarantee eventual termination, it suffices to always interrupt
+         *      only one idle worker, but shutdown() interrupts all idle workers so
+         *      that redundant workers exit promptly, not waiting for a straggler
+         *      task to finish.
+         */
+        void interruptIdleWorkers(bool onlyOne) {
+            mainLock.lock();
+            try {
+                Pointer< Iterator<Worker*> > iter(this->workers.iterator());
+                while(iter->hasNext()) {
+                    Worker* worker = iter->next();
+                    Pointer<Thread> thread = worker->thread;
+                    if (!thread->isInterrupted() && worker->tryLock()) {
+                        try {
+                            thread->interrupt();
+                        } catch(Exception& ex) {
+                            worker->unlock();
+                        }
 
-        void tryTerminate();
+                        worker->unlock();
+                    }
 
-        void drainQueue(ArrayList<Runnable*>& unexecutedTasks);
+                    if (onlyOne) {
+                        break;
+                    }
+                }
+            } catch(Exception& ex) {
+                mainLock.unlock();
+                throw;
+            }
 
-        void interruptIdleWorkers();
-        void interruptIdleWorkers(bool onlyOne);
-    };
+            mainLock.unlock();
+        }
 
-    class Worker : public lang::Thread {
-    private:
+        /**
+         * Common form of interruptIdleWorkers, to avoid having to remember what
+         * the boolean argument means.
+         */
+        void interruptIdleWorkers() {
+            this->interruptIdleWorkers(false);
+        }
+
+        /**
+         * Ensures that unless the pool is stopping, the current thread does not have
+         * its interrupt set. This requires a double-check of state in case the interrupt
+         * was cleared concurrently with a shutdownNow -- if so, the interrupt is re-enabled.
+         */
+        void clearInterruptsForTaskRun() {
+            if (this->runStateLessThan(ctl.get(), STOP) && Thread::interrupted() &&
+                this->runStateAtLeast(ctl.get(), STOP)) {
 
-        bool busy;
-        bool done;
-        decaf::util::concurrent::ExecutorKernel* kernel;
+                Thread::currentThread()->interrupt();
+            }
+        }
 
-    private:
+        /**
+         * State check needed by ScheduledThreadPoolExecutor to enable running
+         * tasks during shutdown.
+         *
+         * @param shutdownOK
+         *      true if should return true if SHUTDOWN
+         */
+        bool isRunningOrShutdown(bool shutdownOK) {
+            int rs = this->runStateOf(ctl.get());
+            return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
+        }
+
+        /**
+         * Main worker run loop.  Repeatedly gets tasks from queue and
+         * executes them, while coping with a number of issues:
+         *
+         * 1. We may start out with an initial task, in which case we
+         * don't need to get the first one. Otherwise, as long as pool is
+         * running, we get tasks from getTask. If it returns null then the
+         * worker exits due to changed pool state or configuration
+         * parameters.  Other exits result from exception throws in
+         * external code, in which case completedAbruptly holds, which
+         * usually leads processWorkerExit to replace this thread.
+         *
+         * 2. Before running any task, the lock is acquired to prevent
+         * other pool interrupts while the task is executing, and
+         * clearInterruptsForTaskRun called to ensure that unless pool is
+         * stopping, this thread does not have its interrupt set.
+         *
+         * 3. Each task run is preceded by a call to beforeExecute, which
+         * might throw an exception, in which case we cause thread to die
+         * (breaking loop with completedAbruptly true) without processing
+         * the task.
+         *
+         * 4. Assuming beforeExecute completes normally, we run the task,
+         * gathering any of its thrown exceptions to send to
+         * afterExecute. We separately handle RuntimeException, Error
+         * (both of which the specs guarantee that we trap) and arbitrary
+         * Throwables.  Because we cannot rethrow Throwables within
+         * Runnable.run, we wrap them within Errors on the way out (to the
+         * thread's UncaughtExceptionHandler).  Any thrown exception also
+         * conservatively causes thread to die.
+         *
+         * 5. After task.run completes, we call afterExecute, which may
+         * also throw an exception, which will also cause thread to
+         * die. According to JLS Sec 14.20, this exception is the one that
+         * will be in effect even if task.run throws.
+         *
+         * The net effect of the exception mechanics is that afterExecute
+         * and the thread's UncaughtExceptionHandler have as accurate
+         * information as we can provide about any problems encountered by
+         * user code.
+         *
+         * @param w the worker
+         */
+        void runWorker(Worker* w) {
+            Runnable* task = w->firstTask;
+            w->firstTask = NULL;
+            bool completedAbruptly = true;
+            try {
+                while (task != NULL || (task = getTask()) != NULL) {
+                    w->lock();
+                    clearInterruptsForTaskRun();
+                    try {
+                        this->parent->beforeExecute(w->thread.get(), task);
+                        try {
+                            task->run();
+                        } catch (RuntimeException& re) {
+                            this->parent->afterExecute(task, &re);
+                            throw;
+                        } catch (Exception& e) {
+                            this->parent->afterExecute(task, &e);
+                            throw;
+                        }
+
+                        this->parent->afterExecute(task, NULL);
+
+                    } catch(Exception& ex) {
+                        delete task;
+                        task = NULL;
+                        w->completedTasks++;
+                        w->unlock();
+                        throw;
+                    }
 
-        Worker( const Worker& );
-        Worker& operator= ( const Worker& );
+                    delete task;
+                    task = NULL;
+                    w->completedTasks++;
+                    w->unlock();
+                }
+
+                completedAbruptly = false;
+            } catch(Exception& ex) {
+                completedAbruptly = true;
+            }
+
+            processWorkerExit(w, completedAbruptly);
+        }
+
+        void execute(Runnable* task, bool takeOwnership DECAF_UNUSED) {
+
+            if (task == NULL) {
+                throw NullPointerException(__FILE__, __LINE__, "Runnable task cannot be NULL");
+            }
+
+            /*
+             * Proceed in 3 steps:
+             *
+             * 1. If fewer than corePoolSize threads are running, try to
+             * start a new thread with the given command as its first
+             * task.  The call to addWorker atomically checks runState and
+             * workerCount, and so prevents false alarms that would add
+             * threads when it shouldn't, by returning false.
+             *
+             * 2. If a task can be successfully queued, then we still need
+             * to double-check whether we should have added a thread
+             * (because existing ones died since last checking) or that
+             * the pool shut down since entry into this method. So we
+             * recheck state and if necessary roll back the enqueuing if
+             * stopped, or start a new thread if there are none.
+             *
+             * 3. If we cannot queue task, then we try to add a new
+             * thread.  If it fails, we know we are shut down or saturated
+             * and so reject the task.
+             */
+            int c = ctl.get();
+            if (workerCountOf(c) < corePoolSize) {
+                if (addWorker(task, true)) {
+                    return;
+                }
+                c = ctl.get();
+            }
 
-     public:
+            if (isRunning(c) && workQueue->offer(task)) {
+                int recheck = ctl.get();
+                if (!isRunning(recheck) && this->remove(task)) {
+                    this->rejectionHandler->rejectedExecution(task, this->parent);
+                } else if (workerCountOf(recheck) == 0) {
+                    addWorker(NULL, false);
+                }
+            } else if (!addWorker(task, false)) {
+                this->rejectionHandler->rejectedExecution(task, this->parent);
+            }
+        }
 
-        Worker(decaf::util::concurrent::ExecutorKernel* kernel) :
-            Thread(), busy(false), done(false), kernel(kernel) {
+        void shutdown() {
+            mainLock.lock();
+            try {
+                advanceRunState(SHUTDOWN);
+                interruptIdleWorkers();
+                this->parent->onShutdown();
+            } catch(Exception& ex) {
+                mainLock.unlock();
+                throw;
+            }
+            mainLock.unlock();
+            tryTerminate();
+        }
 
-            if( kernel == NULL ) {
-                throw IllegalArgumentException( __FILE__, __LINE__,
-                    "ThreadPoolExecutor Worker requires non-NULL pointer to parent ExecutorKernel");
+        void shutdownNow(ArrayList<Runnable*>& unexecutedTasks) {
+            mainLock.lock();
+            try {
+                advanceRunState(STOP);
+                interruptWorkers();
+                drainQueue(unexecutedTasks);
+            } catch(Exception& ex) {
+                mainLock.unlock();
+                throw;
             }
+            mainLock.unlock();
+            tryTerminate();
+        }
+
+        bool isShutdown() {
+            return !isRunning(ctl.get());
         }
 
-        ~Worker() {}
+        bool isTerminating() {
+            int c = ctl.get();
+            return !isRunning(c) && runStateLessThan(c, TERMINATED);
+        }
 
-        void run() {
+        bool isTerminated() {
+            return runStateAtLeast(ctl.get(), TERMINATED);
+        }
 
+        bool awaitTermination() {
+            mainLock.lock();
             try {
 
-                while(!this->done) {
+                for (;;) {
 
-                    // Blocks until there something to be done
-                    Runnable* task = this->kernel->deQueueTask();
+                    if (runStateAtLeast(ctl.get(), TERMINATED)) {
+                        mainLock.unlock();
+                        return true;
+                    }
 
-                    if(this->done) {
+                    this->termination->await();
+                }
 
-                        if(task != NULL) {
-                            delete task;
-                        }
+            } catch(Exception& ex) {
+                mainLock.unlock();
+                throw;
+            }
+            mainLock.unlock();
+        }
 
-                        break;
+        bool awaitTermination(long long timeout, const TimeUnit& unit) {
+            long long nanos = unit.toNanos(timeout);
+            mainLock.lock();
+            try {
+
+                for (;;) {
+
+                    if (runStateAtLeast(ctl.get(), TERMINATED)) {
+                        mainLock.unlock();
+                        return true;
                     }
 
-                    if(!task) {
-                        throw Exception( __FILE__, __LINE__,
-                            "Worker - Retrieved NULL task from Kernel.");
+                    if (nanos <= 0) {
+                        mainLock.unlock();
+                        return false;
                     }
 
-                    this->busy = true;
-                    this->kernel->onTaskStarted(this);
+                    nanos = this->termination->awaitNanos(nanos);
+                }
+
+            } catch(Exception& ex) {
+                mainLock.unlock();
+                throw;
+            }
+            mainLock.unlock();
+        }
 
-                    try{
-                        task->run();
-                    } catch(...) {}
+        void setCorePoolSize(int corePoolSize) {
 
-                    delete task;
+            int delta = corePoolSize - this->corePoolSize;
+
+            this->corePoolSize = corePoolSize;
 
-                    this->kernel->onTaskCompleted(this);
-                    this->busy = false;
+            if (workerCountOf(ctl.get()) > corePoolSize) {
+                interruptIdleWorkers();
+            } else if (delta > 0) {
+                // We don't really know how many new threads are "needed".
+                // As a heuristic, prestart enough new workers (up to new
+                // core size) to handle the current number of tasks in
+                // queue, but stop if queue becomes empty while doing so.
+                int k = Math::min(delta, workQueue->size());
+                while (k-- > 0 && addWorker(NULL, true)) {
+                    if (workQueue->isEmpty()) {
+                        break;
+                    }
                 }
             }
-            catch( Exception& ex )
-            {
-                ex.setMark( __FILE__, __LINE__ );
-                this->busy = false;
-                this->kernel->onTaskException(this, ex);
+        }
+
+        void setMaximumPoolSize(int maximumPoolSize) {
+            if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
+                throw IllegalArgumentException();
             }
-            catch(...)
-            {
-                Exception ex(__FILE__, __LINE__, "Worker - Caught Unknown Exception");
-                this->busy = false;
-                this->kernel->onTaskException(this, ex);
+
+            this->maxPoolSize = maximumPoolSize;
+
+            if (workerCountOf(ctl.get()) > maximumPoolSize) {
+                interruptIdleWorkers();
             }
+        }
 
-            this->kernel->handleWorkerExit(this);
+        bool prestartCoreThread() {
+            return workerCountOf(ctl.get()) < corePoolSize && addWorker(NULL, true);
         }
 
-        void stop() {
-            this->done = true;
+        int prestartAllCoreThreads() {
+            int n = 0;
+            while (addWorker(NULL, true)) {
+                ++n;
+            }
+            return n;
         }
 
-        bool isBusy() {
-            return this->busy;
+        void allowCoreThreadTimeOut(bool value) {
+            if (value && keepAliveTime <= 0) {
+                throw IllegalArgumentException(__FILE__, __LINE__,
+                    "Core threads must have nonzero keep alive times");
+            }
+
+            if (value != this->coreThreadsCanTimeout) {
+                this->coreThreadsCanTimeout = value;
+                if (value) {
+                    interruptIdleWorkers();
+                }
+            }
         }
 
-    };
+        void setKeepAliveTime(long long time, const TimeUnit& unit) {
+            if (time < 0) {
+                throw IllegalArgumentException();
+            }
 
-    class WorkerKiller : public TimerTask {
-    private:
+            if (time == 0 && this->coreThreadsCanTimeout) {
+                throw IllegalArgumentException(__FILE__, __LINE__,
+                    "Core threads must have nonzero keep alive times");
+            }
 
-        ExecutorKernel* kernel;
+            long long keepAliveTime = unit.toNanos(time);
+            long long delta = keepAliveTime - this->keepAliveTime;
 
-    public:
+            this->keepAliveTime = keepAliveTime;
 
-        WorkerKiller(ExecutorKernel* kernel) : kernel(kernel) {
+            if (delta < 0) {
+                interruptIdleWorkers();
+            }
         }
 
-        virtual ~WorkerKiller() {}
+        void purge() {
+            Pointer< BlockingQueue<Runnable*> > q = workQueue;
+            try {
 
-        virtual void run() {
-            try{
-                synchronized(&kernel->mainLock) {
-                    Pointer< Iterator<Worker*> > iter( kernel->deadWorkers.iterator() );
-                    while(iter->hasNext()) {
-                        delete iter->next();
+                Pointer< Iterator<Runnable*> > iter(q->iterator());
+                while (iter->hasNext()) {
+                    Runnable* r = iter->next();
+                    FutureType* future = dynamic_cast<FutureType*>(r);
+                    if (r != NULL && future->isCancelled()) {
                         iter->remove();
                     }
                 }
-            } catch(...) {}
+
+            } catch (ConcurrentModificationException& ex) {
+                // Take slow path if we encounter interference during traversal.
+                // Make copy for traversal and call remove for cancelled entries.
+                // The slow path is more likely to be O(N*N).
+                std::vector<Runnable*> array = q->toArray();
+                std::vector<Runnable*>::const_iterator iter = array.begin();
+                for(; iter != array.end(); ++iter) {
+                    Runnable* r = *iter;
+                    FutureType* future = dynamic_cast<FutureType*>(r);
+                    if (r != NULL && future->isCancelled()) {
+                        q->remove(r);
+                    }
+                }
+            }
+
+            tryTerminate(); // In case SHUTDOWN and now empty
+        }
+
+        bool remove(Runnable* task) {
+            bool removed = this->workQueue->remove(task);
+            this->tryTerminate();
+            return removed;
+        }
+
+    private:
+
+        static bool runStateLessThan(int c, int s) {
+            return c < s;
+        }
+
+        static bool runStateAtLeast(int c, int s) {
+            return c >= s;
+        }
+
+        static bool isRunning(int c) {
+            return c < SHUTDOWN;
+        }
+
+    private:
+
+        /**
+         * Drains the task queue into a new list, normally using drainTo. But if
+         * the queue is a DelayQueue or any other kind of queue for which poll or
+         * drainTo may fail to remove some elements, it deletes them one by one.
+         *
+         * @param unexecutedTasks
+         *      Reference to an ArrayList where the tasks are to be moved.
+         */
+        void drainQueue(ArrayList<Runnable*>& unexecutedTasks) {
+
+            // Some Queue implementations can fail in poll and drainTo so we check
+            // after attempting to drain the Queue and if its not empty we remove
+            // the tasks one by one.
+
+            this->workQueue->drainTo(unexecutedTasks);
+            if (!this->workQueue->isEmpty()) {
+
+                std::vector<Runnable*> tasks = this->workQueue->toArray();
+                std::vector<Runnable*>::iterator iter = tasks.begin();
+
+                for (; iter != tasks.end(); ++iter) {
+
+                    if (this->workQueue->remove(*iter)) {
+                        unexecutedTasks.add(*iter);
+                    }
+                }
+            }
+        }
+
+        /**
+         * Transitions runState to given target, or leaves it alone if already at
+         * least the given target.
+         *
+         * @param targetState the desired state, either SHUTDOWN or STOP
+         *        (but not TIDYING or TERMINATED -- use tryTerminate for that)
+         */
+        void advanceRunState(int targetState) {
+            for (;;) {
+                int c = ctl.get();
+                if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
+                    break;
+            }
+        }
+
+        /**
+         * Checks if a new worker can be added with respect to current pool state
+         * and the given bound (either core or maximum). If so, the worker count
+         * is adjusted accordingly, and, if possible, a new worker is created and
+         * started running firstTask as its first task. This method returns false
+         * if the pool is stopped or eligible to shut down. It also returns false
+         * if the thread factory fails to create a thread when asked, which requires
+         * a backout of workerCount, and a recheck for termination, in case the
+         * existence of this worker was holding up termination.
+         *
+         * @param firstTask
+         *      The task the new thread should run first (or null if none).
+         *      Workers are created with an initial first task (in method execute())
+         *      to bypass queuing when there are fewer than corePoolSize threads
+         *      (in which case we always start one), or when the queue is full
+         *      (in which case we must bypass queue). Initially idle threads are
+         *      usually created via prestartCoreThread or to replace other dying workers.
+         *
+         * @param core
+         *      If true use corePoolSize as bound, else maximumPoolSize.
+         *
+         * @return true if successful
+         */
+        bool addWorker(Runnable* firstTask, bool core) {
+            retry:
+            for (;;) {
+                int c = ctl.get();
+                int rs = this->runStateOf(c);
+
+                // Check if queue empty only if necessary.
+                if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == NULL && !workQueue->isEmpty())) {
+                    return false;
+                }
+
+                for (;;) {
+                    int wc = this->workerCountOf(c);
+                    if (wc >= CAPACITY || wc >= (core ? this->corePoolSize : this->maxPoolSize)) {
+                        return false;
+                    }
+                    if (compareAndIncrementWorkerCount(c)) {
+                        goto success;
+                    }
+                    c = ctl.get();  // Re-read ctl
+                    if (runStateOf(c) != rs) {
+                        goto retry;
+                    }
+                    // else CAS failed due to workerCount change; retry inner loop
+                }
+            }
+
+            success:
+
+            Pointer<Worker> w(new Worker(this, firstTask));
+            Pointer<Thread> t = w->thread;
+
+            mainLock.lock();
+            try {
+                // Recheck while holding lock. Back out on ThreadFactory failure or if
+                // shut down before lock acquired.
+                int c = ctl.get();
+                int rs = runStateOf(c);
+
+                if (t == NULL || (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == NULL))) {
+                    decrementWorkerCount();
+                    tryTerminate();
+                    return false;
+                }
+
+                workers.add(w.release());
+
+                int s = workers.size();
+                if (s > largestPoolSize) {
+                    largestPoolSize = s;
+                }
+
+            } catch(Exception& ex) {
+                mainLock.unlock();
+                throw;
+            }
+            mainLock.unlock();
+
+            t->start();
+            // It is possible (but unlikely) for a thread to have been added to
+            // workers, but not yet started, during transition to STOP, which
+            // could result in a rare missed interrupt, because Thread::interrupt
+            // is not guaranteed to have any effect on a non-yet-started Thread
+            // (see Thread#interrupt).
+            if (runStateOf(ctl.get()) == STOP && !t->isInterrupted()) {
+                t->interrupt();
+            }
+
+            return true;
+        }
+
+        /**
+         * Performs cleanup and bookkeeping for a dying worker. Called only from
+         * worker threads. Unless completedAbruptly is set, assumes that workerCount
+         * has already been adjusted to account for exit.  This method removes
+         * thread from worker set, and possibly terminates the pool or replaces the
+         * worker if either it exited due to user task exception or if fewer than
+         * corePoolSize workers are running or queue is non-empty but there are no
+         * workers.
+         *
+         * @param w
+         *      The worker that has completed or exited.
+         * @param completedAbruptly
+         *      Indicates if the worker died due to user exception.
+         */
+        void processWorkerExit(Worker* w, bool completedAbruptly) {
+
+            if (completedAbruptly) { // If abrupt, then workerCount wasn't adjusted
+                decrementWorkerCount();
+            }
+
+            mainLock.lock();
+            try {
+                this->completedTasks += w->completedTasks;
+                this->workers.remove(w);
+                this->deadWorkers.add(w);
+            } catch(...) {
+            }
+            mainLock.unlock();
+
+            tryTerminate();
+
+            int c = ctl.get();
+            if (runStateLessThan(c, STOP)) {
+                if (!completedAbruptly) {
+                    int min = this->coreThreadsCanTimeout ? 0 : corePoolSize;
+                    if (min == 0 && ! workQueue->isEmpty()) {
+                        min = 1;
+                    }
+                    if (workerCountOf(c) >= min) {
+                        return; // replacement not needed
+                    }
+                }
+                addWorker(NULL, false);
+            }
+        }
+
+        /**
+         * Performs blocking or timed wait for a task, depending on current configuration
+         * settings, or returns null if this worker must exit because of any of:
+         *
+         *  1. There are more than maximumPoolSize workers (due to
+         *     a call to setMaximumPoolSize).
+         *  2. The pool is stopped.
+         *  3. The pool is shutdown and the queue is empty.
+         *  4. This worker timed out waiting for a task, and timed-out
+         *     workers are subject to termination (that is,
+         *     {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
+         *     both before and after the timed wait.
+         *
+         * @return task, or NULL if the worker must exit, in which case
+         *         workerCount is decremented
+         */
+        Runnable* getTask() {
+            bool timedOut = false; // Did the last poll() time out?
+
+            retry:
+            for (;;) {
+                int c = ctl.get();
+                int rs = runStateOf(c);
+
+                // Check if queue empty only if necessary.
+                if (rs >= SHUTDOWN && (rs >= STOP || workQueue->isEmpty())) {
+                    decrementWorkerCount();
+                    return NULL;
+                }
+
+                bool timed;
+
+                for (;;) {
+                    int wc = workerCountOf(c);
+                    timed = this->coreThreadsCanTimeout || wc > this->corePoolSize;
+
+                    if (wc <= this->maxPoolSize && ! (timedOut && timed)) {
+                        break;
+                    }
+                    if (compareAndDecrementWorkerCount(c)) {
+                        return NULL;
+                    }
+                    c = ctl.get();  // Re-read ctl
+                    if (runStateOf(c) != rs) {
+                        goto retry;
+                    }
+                    // else CAS failed due to workerCount change; retry inner loop
+                }
+
+                try {
+                    Runnable* r = NULL;
+                    if (timed) {
+                        workQueue->poll(r, keepAliveTime, TimeUnit::NANOSECONDS);
+                    } else {
+                        r = workQueue->take();
+                    }
+
+                    if (r != NULL) {
+                        return r;
+                    }
+
+                    timedOut = true;
+                } catch (InterruptedException& retry) {
+                    timedOut = false;
+                }
+            }
+        }
+
+        /**
+         * Attempt to CAS-increment the workerCount field of ctl.
+         */
+        bool compareAndIncrementWorkerCount(int expect) {
+            return ctl.compareAndSet(expect, expect + 1);
+        }
+
+        /**
+         * Attempt to CAS-decrement the workerCount field of ctl.
+         */
+        bool compareAndDecrementWorkerCount(int expect) {
+            return ctl.compareAndSet(expect, expect - 1);
+        }
+
+        /**
+         * Decrements the workerCount field of ctl. This is called only on
+         * abrupt termination of a thread (see processWorkerExit). Other
+         * decrements are performed within getTask.
+         */
+        void decrementWorkerCount() {
+            do {} while (!compareAndDecrementWorkerCount(ctl.get()));
         }
 
     };
 
+    const bool ExecutorKernel::ONLY_ONE = true;
+
+    const int ExecutorKernel::COUNT_BITS = Integer::SIZE - 3;
+    const int ExecutorKernel::CAPACITY   = (1 << COUNT_BITS) - 1;
+
+    // runState is stored in the high-order bits
+    const int ExecutorKernel::RUNNING    = -1 << ExecutorKernel::COUNT_BITS;
+    const int ExecutorKernel::SHUTDOWN   =  0 << ExecutorKernel::COUNT_BITS;
+    const int ExecutorKernel::STOP       =  1 << ExecutorKernel::COUNT_BITS;
+    const int ExecutorKernel::TIDYING    =  2 << ExecutorKernel::COUNT_BITS;
+    const int ExecutorKernel::TERMINATED =  3 << ExecutorKernel::COUNT_BITS;
+
 }}}
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -371,7 +1383,26 @@ ThreadPoolExecutor::~ThreadPoolExecutor(
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ThreadPoolExecutor::execute(Runnable* task ) {
+void ThreadPoolExecutor::execute(Runnable* task) {
+
+    try{
+
+        if( task == NULL ) {
+            throw NullPointerException(
+                __FILE__, __LINE__,
+                "ThreadPoolExecutor::execute - Supplied Runnable pointer was NULL.");
+        }
+
+        this->kernel->execute(task, true);
+    }
+    DECAF_CATCH_RETHROW( RejectedExecutionException )
+    DECAF_CATCH_RETHROW( NullPointerException )
+    DECAF_CATCH_RETHROW( Exception )
+    DECAF_CATCHALL_THROW( Exception )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPoolExecutor::execute(Runnable* task, bool takeOwnership) {
 
     try{
 
@@ -381,7 +1412,7 @@ void ThreadPoolExecutor::execute(Runnabl
                 "ThreadPoolExecutor::execute - Supplied Runnable pointer was NULL.");
         }
 
-        this->kernel->enQueueTask(task);
+        this->kernel->execute(task, takeOwnership);
     }
     DECAF_CATCH_RETHROW( RejectedExecutionException )
     DECAF_CATCH_RETHROW( NullPointerException )
@@ -424,12 +1455,7 @@ bool ThreadPoolExecutor::awaitTerminatio
 
 ////////////////////////////////////////////////////////////////////////////////
 int ThreadPoolExecutor::getPoolSize() const {
-    int result = 0;
-    synchronized(&this->kernel->mainLock) {
-        result = this->kernel->workers.size();
-    }
-
-    return result;
+    return this->kernel->getPoolSize();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -444,25 +1470,7 @@ void ThreadPoolExecutor::setCorePoolSize
         throw IllegalArgumentException(__FILE__, __LINE__, "Pool size given was negative.");
     }
 
-    synchronized(&this->kernel->mainLock) {
-
-        //int delta = poolSize - this->kernel->corePoolSize;
-        this->kernel->corePoolSize = poolSize;
-
-        if (this->kernel->workers.size() > poolSize) {
-            // TODO - Once Threads are interruptible wake them up so some can terminate.
-        } else {
-
-            // TODO - Create new threads up to the new pool size, unless we are out
-            //        of work or run out while creating.
-//            int target = Math::min(delta, this->kernel->workQueue->size());
-//            while (target-- > 0 && addWorker(NULL, true)) {
-//                if (this->kernel->workQueue->isEmpty()) {
-//                    break;
-//                }
-//            }
-        }
-    }
+    this->kernel->setCorePoolSize(poolSize);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -473,53 +1481,31 @@ int ThreadPoolExecutor::getMaximumPoolSi
 ////////////////////////////////////////////////////////////////////////////////
 void ThreadPoolExecutor::setMaximumPoolSize(int maxSize) {
 
-    if (maxSize < 0 || maxSize < this->kernel->corePoolSize) {
-        throw IllegalArgumentException(__FILE__, __LINE__, "Size given was invalid.");
+    if (maxSize < 0) {
+        throw IllegalArgumentException(__FILE__, __LINE__, "Maximum Pool size given was negative.");
     }
 
-    this->kernel->maxPoolSize = maxSize;
-
-    if (this->kernel->workers.size() > maxSize) {
-        // TODO - Wake idle worker threads when able to.
-    }
+    this->kernel->setMaximumPoolSize(maxSize);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 long long ThreadPoolExecutor::getTaskCount() const {
-    return this->kernel->workQueue->size();
+    return this->kernel->getTaskCount();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 int ThreadPoolExecutor::getActiveCount() const {
-
-    int result = 0;
-    synchronized(&this->kernel->mainLock) {
-        if(!this->kernel->terminated.get()) {
-            result = this->kernel->workers.size() - this->kernel->freeThreads.get();
-        }
-    }
-
-    return result;
+    return this->kernel->getActiveCount();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 long long ThreadPoolExecutor::getCompletedTaskCount() const {
-    long long result = 0;
-    synchronized(&this->kernel->mainLock) {
-        result = this->kernel->completedTasks;
-    }
-
-    return result;
+    return this->kernel->getCompletedTaskCount();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 int ThreadPoolExecutor::getLargestPoolSize() const {
-    int result = 0;
-    synchronized(&this->kernel->mainLock) {
-        result = this->kernel->largestPoolSize;
-    }
-
-    return result;
+    return this->kernel->getLargestPoolSize();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -565,33 +1551,22 @@ BlockingQueue<Runnable*>* ThreadPoolExec
 
 ////////////////////////////////////////////////////////////////////////////////
 bool ThreadPoolExecutor::isShutdown() const {
-    return this->kernel->stopped.get();
+    return this->kernel->isShutdown();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 bool ThreadPoolExecutor::isTerminated() const {
-    return this->kernel->terminated.get();
+    return this->kernel->isTerminated();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 bool ThreadPoolExecutor::isTerminating() const {
-    return this->kernel->isStoppedOrStopping() && !this->kernel->terminated.get();
+    return this->kernel->isTerminating();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ThreadPoolExecutor::allowCoreThreadTimeout(bool value) {
-
-    if (value == true && this->kernel->keepAliveTime == 0) {
-        throw IllegalArgumentException(__FILE__, __LINE__,
-            "Keep Alive Time must be set to a non-zero value to enable this option.");
-    }
-
-    if (value != this->kernel->coreThreadsCanTimeout) {
-        this->kernel->coreThreadsCanTimeout = value;
-        if (value == true) {
-            // TODO - When Threads are interruptible wake works so they can check timeout.
-        }
-    }
+    this->kernel->allowCoreThreadTimeOut(value);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -601,22 +1576,7 @@ long long ThreadPoolExecutor::getKeepAli
 
 ////////////////////////////////////////////////////////////////////////////////
 void ThreadPoolExecutor::setKeepAliveTime(long long timeout, const TimeUnit& unit) {
-
-    if (timeout < 0) {
-        throw IllegalArgumentException(__FILE__, __LINE__, "Timeout value cannot be negative.");
-    }
-
-    if (this->kernel->coreThreadsCanTimeout == true && unit.toMillis(timeout) == 0) {
-        throw IllegalArgumentException(__FILE__, __LINE__,
-            "Keep Alive Time must be set to a non-zero value when allowCoreThreadsTimeout is enabled.");
-    }
-
-    long keepAliveTime = unit.toMillis(timeout);
-    long delta = keepAliveTime - this->kernel->keepAliveTime;
-    this->kernel->keepAliveTime = keepAliveTime;
-    if (delta < 0) {
-        // TODO - When Threads are interruptible wake works so they can check timeout.
-    }
+    this->kernel->setKeepAliveTime(timeout, unit);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -626,23 +1586,22 @@ bool ThreadPoolExecutor::allowsCoreThrea
 
 ////////////////////////////////////////////////////////////////////////////////
 bool ThreadPoolExecutor::prestartCoreThread() {
-    return this->kernel->addWorker();
+    return this->kernel->prestartCoreThread();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 int ThreadPoolExecutor::prestartAllCoreThreads() {
-    return this->kernel->addAllWorkers();
+    return this->kernel->prestartAllCoreThreads();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 bool ThreadPoolExecutor::remove(decaf::lang::Runnable* task) {
-    bool removed = this->kernel->workQueue->remove(task);
-    this->kernel->tryTerminate();
-    return removed;
+    return this->kernel->remove(task);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ThreadPoolExecutor::purge() {
+    this->kernel->purge();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -658,392 +1617,5 @@ void ThreadPoolExecutor::terminated() {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-ExecutorKernel::ExecutorKernel(ThreadPoolExecutor* parent, int corePoolSize,
-                               int maxPoolSize, long long keepAliveTime,
-                               BlockingQueue<decaf::lang::Runnable*>* workQueue,
-                               ThreadFactory* threadFactory, RejectedExecutionHandler* handler) :
-    parent(parent),
-    workers(),
-    deadWorkers(),
-    cleanupTimer(),
-    stopping(false),
-    stopped(false),
-    terminated(false),
-    freeThreads(0),
-    maxPoolSize(maxPoolSize),
-    corePoolSize(corePoolSize),
-    keepAliveTime(keepAliveTime),
-    coreThreadsCanTimeout(false),
-    workQueue(),
-    mainLock(),
-    termination(1),
-    completedTasks(0),
-    largestPoolSize(0),
-    factory(),
-    rejectionHandler() {
-
-    if(corePoolSize < 0 || maxPoolSize <= 0 ||
-       maxPoolSize < corePoolSize || keepAliveTime < 0) {
-
-        throw IllegalArgumentException(__FILE__, __LINE__, "Argument out of range.");
-    }
-
-    if(workQueue == NULL || threadFactory == NULL || handler == NULL) {
-        throw NullPointerException(__FILE__, __LINE__, "Required parameter was NULL");
-    }
-
-    this->cleanupTimer.scheduleAtFixedRate(
-        new WorkerKiller(this), TimeUnit::SECONDS.toMillis(10), TimeUnit::SECONDS.toMillis(10));
-
-    this->workQueue.reset(workQueue);
-    this->factory.reset(threadFactory);
-    this->rejectionHandler.reset(handler);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-ExecutorKernel::~ExecutorKernel() {
-    try{
-
-        this->shutdown();
-        this->tryTerminate();
-
-        this->termination.await();
-
-        this->cleanupTimer.cancel();
-
-        // Ensure dead Worker Threads are destroyed, the Timer might not have
-        // run recently.
-        Pointer< Iterator<Worker*> > iter(this->deadWorkers.iterator());
-        while(iter->hasNext()) {
-            Worker* worker = iter->next();
-            worker->join();
-            delete worker;
-        }
-    }
-    DECAF_CATCH_NOTHROW(Exception)
-    DECAF_CATCHALL_NOTHROW()
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ExecutorKernel::onTaskStarted(Worker* thread DECAF_UNUSED) {
-
-    try{
-
-        synchronized( &mainLock ) {
-
-            freeThreads.decrementAndGet();
-
-            // Now that this callback has decremented the free threads counter
-            // lets check if there is any outstanding work to be done and no
-            // threads to handle it.  This could happen if the QueueTask
-            // method was called successively without any of the PooledThreads
-            // having a chance to wake up and service the queue.  This would
-            // cause the number of Task to exceed the number of free threads
-            // once the Threads got a chance to wake up and service the queue
-            if( freeThreads.get() == 0 && !workQueue->isEmpty() ) {
-                addWorker();
-            }
-
-            // TODO - Get actual values for these.
-            this->parent->beforeExecute(thread, NULL);
-        }
-    }
-    DECAF_CATCH_RETHROW( Exception )
-    DECAF_CATCHALL_THROW( Exception )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ExecutorKernel::onTaskCompleted(Worker* thread DECAF_UNUSED) {
-
-    try {
-        synchronized(&mainLock) {
-            freeThreads.incrementAndGet();
-            completedTasks++;
-
-            // TODO - Get actual values for these.
-            this->parent->afterExecute(NULL, NULL);
-        }
-    }
-    DECAF_CATCH_RETHROW( Exception )
-    DECAF_CATCHALL_THROW( Exception )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ExecutorKernel::onTaskException(Worker* thread DECAF_UNUSED, lang::Exception& ex DECAF_UNUSED) {
-
-    try{
-    }
-    DECAF_CATCH_RETHROW( Exception )
-    DECAF_CATCHALL_THROW( Exception )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ExecutorKernel::handleWorkerExit(Worker* worker) {
-
-    synchronized( &this->mainLock ) {
-
-        this->workers.remove(worker);
-        this->deadWorkers.add(worker);
-
-        if(this->workers.isEmpty()) {
-            this->parent->terminated();
-            this->terminated = true;
-            this->termination.countDown();
-        }
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ExecutorKernel::enQueueTask(Runnable* task) {
-
-    try{
-
-        synchronized( &this->mainLock ) {
-
-            // If there's nobody open to do work, then create some more
-            // threads to handle the work.
-            if( this->freeThreads.get() == 0 ) {
-                addWorker();
-            }
-
-            // queue the new work.
-            if(isStoppedOrStopping() || !this->workQueue->offer(task)) {
-                this->rejectionHandler->rejectedExecution(task, this->parent);
-            }
-        }
-    }
-    DECAF_CATCH_RETHROW( RejectedExecutionException )
-    DECAF_CATCH_RETHROW( Exception )
-    DECAF_CATCHALL_THROW( Exception )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-Runnable* ExecutorKernel::deQueueTask() {
-
-    try{
-
-        Runnable* task = NULL;
-
-        while(true) {
-
-            // TODO - Threads aren't interruptible yet, so spin wait.
-            if(workQueue->poll(task, 10, TimeUnit::MILLISECONDS)) {
-                break;
-            }
-
-//            try {
-//                if ((task = workQueue->take()) != NULL) {
-//                    break;
-//                }
-//            } catch(InterruptedException& ex) {
-//            }
-
-            if(isStoppedOrStopping() && workQueue->isEmpty()) {
-                break;
-            }
-        }
-
-        if(isStoppedOrStopping() && task == NULL) {
-            return NULL;
-        }
-
-        if( task == NULL ) {
-           throw lang::Exception(__FILE__, __LINE__,
-               "deQueueTask: Got empty Runnable while not in shutdown.");
-        }
-
-        return task;
-    }
-    DECAF_CATCH_RETHROW( Exception )
-    DECAF_CATCHALL_THROW( Exception )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-bool ExecutorKernel::addWorker() {
-
-    try{
-
-        if( this->isStoppedOrStopping() ) {
-            return false;
-        }
-
-        if( this->workers.size() >= this->maxPoolSize ) {
-            return false;
-        }
-
-        synchronized( &mainLock ) {
-            Worker* newWorker = new Worker(this);
-            this->workers.add(newWorker);
-            freeThreads.incrementAndGet();
-            newWorker->start();
-            this->largestPoolSize++;
-        }
-
-        return true;
-    }
-    DECAF_CATCH_RETHROW( Exception )
-    DECAF_CATCHALL_THROW( Exception )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-int ExecutorKernel::addAllWorkers() {
-
-    try{
-
-        if( this->isStoppedOrStopping() ) {
-            return 0;
-        }
-
-        if( this->workers.size() >= this->maxPoolSize ) {
-            return 0;
-        }
-
-        int delta = 0;
-
-        synchronized( &mainLock ) {
-
-            delta = this->maxPoolSize - this->workers.size();
-
-            for(int i = 0; i < delta; ++i) {
-                Worker* newWorker = new Worker(this);
-                this->workers.add(newWorker);
-                freeThreads.incrementAndGet();
-                newWorker->start();
-                this->largestPoolSize++;
-            }
-        }
-
-        return delta;
-    }
-    DECAF_CATCH_RETHROW( Exception )
-    DECAF_CATCHALL_THROW( Exception )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-bool ExecutorKernel::isStoppedOrStopping() {
-    if(this->stopped.get() || this->stopping.get()) {
-        return true;
-    }
-
-    return false;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ExecutorKernel::shutdown() {
-
-    if(isStoppedOrStopping()) {
-        return;
-    }
-
-    if(this->stopping.compareAndSet(false, true)) {
-
-        synchronized(&mainLock) {
-
-            // TODO - When threads are interruptible, we need to interrupt the Queue.
-            //synchronized( workQueue.get() ) {
-            //    // Signal the Queue so that all waiters are notified
-            //    workQueue->notifyAll();
-            //}
-        }
-
-        this->tryTerminate();
-        this->stopped.set(true);
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ExecutorKernel::shutdownNow(ArrayList<Runnable*>& unexecutedTasks) {
-
-    if(isStoppedOrStopping()) {
-        return;
-    }
-
-    if(this->stopping.compareAndSet(false, true)) {
-
-        synchronized(&mainLock) {
-
-            Pointer< Iterator<Worker*> > iter(this->workers.iterator());
-
-            while(iter->hasNext()) {
-                iter->next()->stop();
-            }
-
-            // TODO - When threads are interruptible, we need to interrupt the Queue.
-            //synchronized( workQueue.get() ) {
-            //    // Signal the Queue so that all waiters are notified
-            //    workQueue->notifyAll();
-            //}
-
-            this->drainQueue(unexecutedTasks);
-        }
-
-        this->tryTerminate();
-        this->stopped.set(true);
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ExecutorKernel::drainQueue(ArrayList<Runnable*>& unexecutedTasks) {
-
-    // Some Queue implementations can fail in poll and drainTo so we check
-    // after attempting to drain the Queue and if its not empty we remove
-    // the tasks one by one.
-
-    this->workQueue->drainTo(unexecutedTasks);
-    if (!this->workQueue->isEmpty()) {
-
-        std::vector<Runnable*> tasks = this->workQueue->toArray();
-        std::vector<Runnable*>::iterator iter = tasks.begin();
-
-        for (; iter != tasks.end(); ++iter) {
-
-            if (this->workQueue->remove(*iter)) {
-                unexecutedTasks.add(*iter);
-            }
-        }
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-bool ExecutorKernel::awaitTermination(long long timeout, const TimeUnit& unit) {
-
-    if (this->terminated.get() == true) {
-        return true;
-    }
-
-    return this->termination.await(timeout, unit);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ExecutorKernel::tryTerminate() {
-
-    if (!this->isStoppedOrStopping() || (this->isStoppedOrStopping() && !this->workQueue->isEmpty())) {
-        return;
-    }
-
-    if (this->workers.size() > 0) {
-        // TODO - Once they are interruptible wake a worker.
-        return;
-    }
-
-    synchronized(&this->mainLock) {
-        try {
-            this->parent->terminated();
-        } catch(...) {}
-
-        this->terminated.set(true);
-        this->termination.countDown();
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ExecutorKernel::interruptIdleWorkers() {
-    this->interruptIdleWorkers(false);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ExecutorKernel::interruptIdleWorkers(bool onlyOne DECAF_UNUSED) {
-
-    synchronized(&this->mainLock) {
-    }
+void ThreadPoolExecutor::onShutdown() {
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h?rev=1147898&r1=1147897&r2=1147898&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h Mon Jul 18 14:28:03 2011
@@ -212,6 +212,8 @@ namespace concurrent{
 
         virtual void execute(decaf::lang::Runnable* task);
 
+        virtual void execute(decaf::lang::Runnable* task, bool takeOwnership);
+
         virtual void shutdown();
 
         virtual ArrayList<decaf::lang::Runnable*> shutdownNow();
@@ -493,6 +495,13 @@ namespace concurrent{
          */
         virtual void terminated();
 
+    protected:
+
+        /**
+         * Used by some Decaf ThreadPoolExecutor extensions to correctly handle the shutdown case.
+         */
+        virtual void onShutdown();
+
     public:  // RejectedExecutionHandler implementations.
 
         /**

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTestSupport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTestSupport.h?rev=1147898&r1=1147897&r2=1147898&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTestSupport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTestSupport.h Mon Jul 18 14:28:03 2011
@@ -130,6 +130,47 @@ namespace concurrent {
             }
         };
 
+        class SmallInterruptedRunnable : public decaf::lang::Runnable {
+        private:
+
+            ExecutorsTestSupport* parent;
+
+        public:
+
+            SmallInterruptedRunnable(ExecutorsTestSupport* parent) : decaf::lang::Runnable() {
+            }
+
+            virtual ~SmallInterruptedRunnable() {}
+
+            virtual void run() {
+                try {
+                    Thread::sleep(SMALL_DELAY_MS);
+                    parent->threadShouldThrow();
+                } catch(decaf::lang::Exception& e) {
+                }
+            }
+        };
+
+        class SmallPossiblyInterruptedRunnable : public decaf::lang::Runnable {
+        private:
+
+            ExecutorsTestSupport* parent;
+
+        public:
+
+            SmallPossiblyInterruptedRunnable(ExecutorsTestSupport* parent) : decaf::lang::Runnable() {
+            }
+
+            virtual ~SmallPossiblyInterruptedRunnable() {}
+
+            virtual void run() {
+                try {
+                    Thread::sleep(SMALL_DELAY_MS);
+                } catch(decaf::lang::exceptions::InterruptedException& e) {
+                }
+            }
+        };
+
         class MediumRunnable : public decaf::lang::Runnable {
         private:
 
@@ -151,6 +192,47 @@ namespace concurrent {
             }
         };
 
+        class MediumInterruptedRunnable : public decaf::lang::Runnable {
+        private:
+
+            ExecutorsTestSupport* parent;
+
+        public:
+
+            MediumInterruptedRunnable(ExecutorsTestSupport* parent) : decaf::lang::Runnable() {
+            }
+
+            virtual ~MediumInterruptedRunnable() {}
+
+            virtual void run() {
+                try {
+                    Thread::sleep(MEDIUM_DELAY_MS);
+                    parent->threadShouldThrow();
+                } catch(decaf::lang::Exception& e) {
+                }
+            }
+        };
+
+        class MediumPossiblyInterruptedRunnable : public decaf::lang::Runnable {
+        private:
+
+            ExecutorsTestSupport* parent;
+
+        public:
+
+            MediumPossiblyInterruptedRunnable(ExecutorsTestSupport* parent) : decaf::lang::Runnable() {
+            }
+
+            virtual ~MediumPossiblyInterruptedRunnable() {}
+
+            virtual void run() {
+                try {
+                    Thread::sleep(MEDIUM_DELAY_MS);
+                } catch(decaf::lang::exceptions::InterruptedException& e) {
+                }
+            }
+        };
+
         class LongRunnable : public decaf::lang::Runnable {
         private:
 
@@ -172,6 +254,48 @@ namespace concurrent {
             }
         };
 
+        class LongInterruptedRunnable : public decaf::lang::Runnable {
+        private:
+
+            ExecutorsTestSupport* parent;
+
+        public:
+
+            LongInterruptedRunnable(ExecutorsTestSupport* parent) : decaf::lang::Runnable() {
+            }
+
+            virtual ~LongInterruptedRunnable() {}
+
+            virtual void run() {
+                try {
+                    Thread::sleep(LONG_DELAY_MS);
+                    parent->threadShouldThrow();
+                } catch(decaf::lang::Exception& e) {
+                }
+            }
+        };
+
+        class LongPossiblyInterruptedRunnable : public decaf::lang::Runnable {
+        private:
+
+            ExecutorsTestSupport* parent;
+
+        public:
+
+            LongPossiblyInterruptedRunnable(ExecutorsTestSupport* parent) : decaf::lang::Runnable() {
+            }
+
+            virtual ~LongPossiblyInterruptedRunnable() {}
+
+            virtual void run() {
+                try {
+                    Thread::sleep(LONG_DELAY_MS);
+                } catch(decaf::lang::exceptions::InterruptedException& e) {
+                }
+            }
+        };
+
+
         class SimpleThreadFactory : public ThreadFactory {
         public:
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp?rev=1147898&r1=1147897&r2=1147898&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp Mon Jul 18 14:28:03 2011
@@ -239,7 +239,7 @@ void ThreadPoolExecutorTest::testAwaitTe
 ///////////////////////////////////////////////////////////////////////////////
 void ThreadPoolExecutorTest::testMoreTasksThanMaxPoolSize() {
 
-    ThreadPoolExecutor pool(1, 3, 5, TimeUnit::SECONDS, new LinkedBlockingQueue<Runnable*>());
+    ThreadPoolExecutor pool(3, 3, 5, TimeUnit::SECONDS, new LinkedBlockingQueue<Runnable*>());
 
     CPPUNIT_ASSERT( pool.getMaximumPoolSize() == 3);
 
@@ -559,7 +559,7 @@ void ThreadPoolExecutorTest::testGetQueu
     ThreadPoolExecutor p1(1, 1, LONG_DELAY_MS, TimeUnit::MILLISECONDS, q);
     Runnable* tasks[5];
     for (int i = 0; i < 5; i++){
-        tasks[i] = new MediumRunnable(this);
+        tasks[i] = new MediumPossiblyInterruptedRunnable(this);
         p1.execute(tasks[i]);
     }
     try {
@@ -583,7 +583,7 @@ void ThreadPoolExecutorTest::testRemove(
     ThreadPoolExecutor p1(1, 1, LONG_DELAY_MS, TimeUnit::MILLISECONDS, q);
     Runnable* tasks[5];
     for (int i = 0; i < 5; i++){
-        tasks[i] = new MediumRunnable(this);
+        tasks[i] = new MediumPossiblyInterruptedRunnable(this);
         p1.execute(tasks[i]);
     }
 
@@ -619,7 +619,7 @@ void ThreadPoolExecutorTest::testShutDow
     try {
 
         for (int i = 0; i < 5; i++) {
-            p1.execute(new MediumRunnable(this));
+            p1.execute(new MediumPossiblyInterruptedRunnable(this));
         }
     }
     catch(...) {

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h?rev=1147898&r1=1147897&r2=1147898&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h Mon Jul 18 14:28:03 2011
@@ -53,7 +53,7 @@ namespace concurrent{
         CPPUNIT_TEST( testGetLargestPoolSize );
         CPPUNIT_TEST( testGetMaximumPoolSize );
         CPPUNIT_TEST( testGetPoolSize );
-        //CPPUNIT_TEST( testGetTaskCount );
+        CPPUNIT_TEST( testGetTaskCount );
         CPPUNIT_TEST( testIsShutdown );
         CPPUNIT_TEST( testIsTerminated );
         CPPUNIT_TEST( testIsTerminating );