You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/07/18 16:28:05 UTC
svn commit: r1147898 - in /activemq/activemq-cpp/trunk/activemq-cpp/src:
main/ main/decaf/util/concurrent/ test/decaf/util/concurrent/
Author: tabish
Date: Mon Jul 18 14:28:03 2011
New Revision: 1147898
URL: http://svn.apache.org/viewvc?rev=1147898&view=rev
Log:
A nearly complete implementation of the ThreadPoolExecutor class and some additions to the concurrent package.
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RunnableFuture.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RunnableFuture.h (with props)
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executor.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Future.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTestSupport.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?rev=1147898&r1=1147897&r2=1147898&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Mon Jul 18 14:28:03 2011
@@ -451,6 +451,7 @@ cc_sources = \
decaf/util/concurrent/Lock.cpp \
decaf/util/concurrent/Mutex.cpp \
decaf/util/concurrent/RejectedExecutionHandler.cpp \
+ decaf/util/concurrent/RunnableFuture.cpp \
decaf/util/concurrent/Semaphore.cpp \
decaf/util/concurrent/SynchronousQueue.cpp \
decaf/util/concurrent/ThreadFactory.cpp \
@@ -1047,6 +1048,7 @@ h_sources = \
decaf/util/concurrent/Mutex.h \
decaf/util/concurrent/RejectedExecutionException.h \
decaf/util/concurrent/RejectedExecutionHandler.h \
+ decaf/util/concurrent/RunnableFuture.h \
decaf/util/concurrent/Semaphore.h \
decaf/util/concurrent/Synchronizable.h \
decaf/util/concurrent/SynchronousQueue.h \
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executor.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executor.h?rev=1147898&r1=1147897&r2=1147898&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executor.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executor.h Mon Jul 18 14:28:03 2011
@@ -90,18 +90,37 @@ namespace concurrent {
virtual ~Executor() {}
/**
+ * This method is the same as calling the two param execute method and passing
+ * true as the second argument.
+ *
+ * @param command
+ * The runnable task to be executed.
+ *
+ * @throws RejectedExecutionException if this task cannot be
+ * accepted for execution.
+ *
+ * @throws NullPointerException if command is null
+ */
+
+ virtual void execute(decaf::lang::Runnable* command) = 0;
+
+ /**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the <tt>Executor</tt> implementation.
*
- * @param command the runnable task
+ * @param command
+ * The runnable task to be executed.
+ * @param takeOwnership
+ * Indicates if the Executor should assume ownership of the task and
+ * delete the pointer once the task has completed.
*
* @throws RejectedExecutionException if this task cannot be
* accepted for execution.
*
* @throws NullPointerException if command is null
*/
- virtual void execute( decaf::lang::Runnable* command ) = 0;
+ virtual void execute(decaf::lang::Runnable* command, bool takeOwnership) = 0;
};
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Future.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Future.h?rev=1147898&r1=1147897&r2=1147898&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Future.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Future.h Mon Jul 18 14:28:03 2011
@@ -26,7 +26,7 @@ namespace decaf {
namespace util {
namespace concurrent {
- class FutureType {
+ class DECAF_API FutureType {
public:
virtual ~FutureType() {}
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RunnableFuture.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RunnableFuture.cpp?rev=1147898&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RunnableFuture.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RunnableFuture.cpp Mon Jul 18 14:28:03 2011
@@ -0,0 +1,18 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RunnableFuture.h"
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RunnableFuture.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RunnableFuture.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RunnableFuture.h?rev=1147898&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RunnableFuture.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RunnableFuture.h Mon Jul 18 14:28:03 2011
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _DECAF_UTIL_CONCURRENT_RUNNABLEFUTURE_H_
+#define _DECAF_UTIL_CONCURRENT_RUNNABLEFUTURE_H_
+
+#include <decaf/util/Config.h>
+
+#include <decaf/lang/Runnable.h>
+#include <decaf/util/concurrent/Future.h>
+
+namespace decaf {
+namespace util {
+namespace concurrent {
+
+ /**
+ * A Runnable version of the Future type. When the run method has completed
+ * successfully the Future will be considered complete and its get method will
+ * return the produced result.
+ *
+ * @since 1.0
+ */
+ template<typename T>
+ class RunnableFuture : public Future<T>, public decaf::lang::Runnable {
+ public:
+
+ virtual ~RunnableFuture() {}
+
+ };
+
+}}}
+
+#endif /* _DECAF_UTIL_CONCURRENT_RUNNABLEFUTURE_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RunnableFuture.h
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp?rev=1147898&r1=1147897&r2=1147898&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp Mon Jul 18 14:28:03 2011
@@ -21,13 +21,16 @@
#include <decaf/util/LinkedList.h>
#include <decaf/util/Timer.h>
#include <decaf/util/TimerTask.h>
-#include <decaf/util/concurrent/Mutex.h>
+#include <decaf/util/concurrent/Future.h>
+#include <decaf/util/concurrent/locks/ReentrantLock.h>
+#include <decaf/util/concurrent/locks/AbstractQueuedSynchronizer.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/util/concurrent/atomic/AtomicInteger.h>
#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
#include <decaf/util/concurrent/RejectedExecutionException.h>
#include <decaf/util/concurrent/RejectedExecutionHandler.h>
#include <decaf/util/concurrent/Executors.h>
+#include <decaf/lang/Throwable.h>
#include <decaf/lang/Pointer.h>
#include <decaf/lang/Math.h>
#include <decaf/lang/exceptions/IllegalArgumentException.h>
@@ -43,35 +46,250 @@ using namespace decaf::lang::exceptions;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace decaf::util::concurrent::atomic;
+using namespace decaf::util::concurrent::locks;
////////////////////////////////////////////////////////////////////////////////
namespace decaf{
namespace util{
namespace concurrent{
- class Worker;
-
+ /**
+ * The main pool control state, ctl, is an atomic integer packing
+ * two conceptual fields
+ * workerCount, indicating the effective number of threads
+ * runState, indicating whether running, shutting down etc
+ *
+ * In order to pack them into one int, we limit workerCount to
+ * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
+ * billion) otherwise representable. If this is ever an issue in
+ * the future, the variable can be changed to be an AtomicLong,
+ * and the shift/mask constants below adjusted. But until the need
+ * arises, this code is a bit faster and simpler using an int.
+ *
+ * The workerCount is the number of workers that have been
+ * permitted to start and not permitted to stop. The value may be
+ * transiently different from the actual number of live threads,
+ * for example when a ThreadFactory fails to create a thread when
+ * asked, and when exiting threads are still performing
+ * bookkeeping before terminating. The user-visible pool size is
+ * reported as the current size of the workers set.
+ *
+ * The runState provides the main lifecyle control, taking on values:
+ *
+ * RUNNING: Accept new tasks and process queued tasks
+ * SHUTDOWN: Don't accept new tasks, but process queued tasks
+ * STOP: Don't accept new tasks, don't process queued tasks,
+ * and interrupt in-progress tasks
+ * TIDYING: All tasks have terminated, workerCount is zero,
+ * the thread transitioning to state TIDYING
+ * will run the terminated() hook method
+ * TERMINATED: terminated() has completed
+ *
+ * The numerical order among these values matters, to allow
+ * ordered comparisons. The runState monotonically increases over
+ * time, but need not hit each state. The transitions are:
+ *
+ * RUNNING -> SHUTDOWN
+ * On invocation of shutdown(), perhaps implicitly in finalize()
+ * (RUNNING or SHUTDOWN) -> STOP
+ * On invocation of shutdownNow()
+ * SHUTDOWN -> TIDYING
+ * When both queue and pool are empty
+ * STOP -> TIDYING
+ * When pool is empty
+ * TIDYING -> TERMINATED
+ * When the terminated() hook method has completed
+ *
+ * Threads waiting in awaitTermination() will return when the
+ * state reaches TERMINATED.
+ *
+ * Detecting the transition from SHUTDOWN to TIDYING is less
+ * straightforward than you'd like because the queue may become
+ * empty after non-empty and vice versa during SHUTDOWN state, but
+ * we can only terminate if, after seeing that it is empty, we see
+ * that workerCount is 0 (which sometimes entails a recheck -- see
+ * below).
+ */
class ExecutorKernel {
+ private:
+
+ /**
+ * The worker class does a small amount of Bookkeeping and provides a locking point
+ * for the kernel to access the running task.
+ */
+ class Worker : public AbstractQueuedSynchronizer, public Runnable {
+ private:
+
+ Pointer<Thread> thread;
+ Runnable* firstTask;
+ decaf::util::concurrent::ExecutorKernel* kernel;
+ long long completedTasks;
+
+ friend class ExecutorKernel;
+
+ private:
+
+ Worker( const Worker& );
+ Worker& operator= ( const Worker& );
+
+ public:
+
+ Worker(ExecutorKernel* kernel, Runnable* task) :
+ AbstractQueuedSynchronizer(), Runnable(), thread(), firstTask(task), kernel(kernel), completedTasks(0) {
+
+ if( kernel == NULL ) {
+ throw IllegalArgumentException( __FILE__, __LINE__,
+ "ThreadPoolExecutor Worker requires non-NULL pointer to parent ExecutorKernel");
+ }
+
+ this->thread.reset(kernel->factory->newThread(this));
+ }
+
+ ~Worker() {}
+
+ void run() {
+ // Delegate the running of this task to the Kernel so that all the logic
+ // for task execution and cleanup is contained in one place.
+ this->kernel->runWorker(this);
+ }
+
+ virtual void lock() {
+ acquire(1);
+ }
+
+ virtual bool tryLock() {
+ return tryAcquire(1);
+ }
+
+ virtual void unlock() {
+ release(1);
+ }
+
+ virtual bool isLocked() {
+ return isHeldExclusively();
+ }
+
+ protected:
+
+ virtual bool isHeldExclusively() {
+ return getState() == 1;
+ }
+
+ virtual bool tryAcquire(int unused DECAF_UNUSED) {
+ if (compareAndSetState(0, 1)) {
+ setExclusiveOwnerThread(Thread::currentThread());
+ return true;
+ }
+ return false;
+ }
+
+ virtual bool tryRelease(int unused DECAF_UNUSED) {
+ this->setExclusiveOwnerThread(NULL);
+ this->setState(0);
+ return true;
+ }
+ };
+
+ /**
+ * TimerTask implementation used to clean up Worker objects that have terminated
+ * for some reason. Since they can't delete themselves the cleanup is delegated
+ * to the Timer's thread.
+ */
+ class WorkerKiller : public TimerTask {
+ private:
+
+ ExecutorKernel* kernel;
+
+ public:
+
+ WorkerKiller(ExecutorKernel* kernel) : kernel(kernel) {
+ }
+
+ virtual ~WorkerKiller() {}
+
+ virtual void run() {
+ kernel->mainLock.lock();
+ try{
+ Pointer< Iterator<Worker*> > iter( kernel->deadWorkers.iterator() );
+ while(iter->hasNext()) {
+ delete iter->next();
+ iter->remove();
+ }
+ } catch(...) {}
+ kernel->mainLock.unlock();
+ }
+ };
+
public:
+ static const int COUNT_BITS;
+ static const int CAPACITY;
+
+ // runState is stored in the high-order bits
+ static const int RUNNING;
+ static const int SHUTDOWN;
+ static const int STOP;
+ static const int TIDYING;
+ static const int TERMINATED;
+
+ static const bool ONLY_ONE;
+
+ AtomicInteger ctl;
+
ThreadPoolExecutor* parent;
+ /**
+ * List containing all worker threads in pool. Accessed only when holding mainLock.
+ */
LinkedList<Worker*> workers;
+
+ /**
+ * List to hold Worker object that have terminated for some reason. Usually this is
+ * because of a call to setMaximumPoolSize or setCorePoolSize but can also occur
+ * because of an exception from a task that the worker was running.
+ */
LinkedList<Worker*> deadWorkers;
- Timer cleanupTimer;
- AtomicBoolean stopping;
- AtomicBoolean stopped;
- AtomicBoolean terminated;
- AtomicInteger freeThreads;
+ /**
+ * Timer used to periodically clean up the dead worker objects. They must be cleaned
+ * up on a separate thread because the Worker generally adds itself to the deadWorkers
+ * list from the context of its run method and cannot delete itself.
+ */
+ Timer cleanupTimer;
int maxPoolSize;
int corePoolSize;
long long keepAliveTime;
bool coreThreadsCanTimeout;
+
+ /**
+ * The queue used for holding tasks and handing off to worker threads.
+ * We do not require that workQueue.poll() returning NULL necessarily
+ * means that workQueue.isEmpty(), so rely solely on isEmpty to see if
+ * the queue is empty (which we must do for example when deciding whether
+ * to transition from SHUTDOWN to TIDYING). This accommodates special-
+ * purpose queues such as DelayQueues for which poll() is allowed to
+ * return NULL even if it may later return non-NULL when delays expire.
+ */
Pointer< BlockingQueue<decaf::lang::Runnable*> > workQueue;
- Mutex mainLock;
- CountDownLatch termination;
+
+ /**
+ * Lock held on access to workers set and related bookkeeping. While we could
+ * use a concurrent set of some sort, it turns out to be generally preferable
+ * to use a lock. Among the reasons is that this serializes interruptIdleWorkers,
+ * which avoids unnecessary interrupt storms, especially during shutdown.
+ * Otherwise exiting threads would concurrently interrupt those that have not
+ * yet interrupted. It also simplifies some of the associated statistics
+ * bookkeeping of largestPoolSize etc. We also hold mainLock on shutdown and
+ * shutdownNow, for the sake of ensuring workers set is stable while separately
+ * interrupting.
+ */
+ ReentrantLock mainLock;
+
+ /**
+ * Wait condition to support awaitTermination
+ */
+ Pointer<Condition> termination;
long long completedTasks;
int largestPoolSize;
@@ -84,155 +302,949 @@ namespace concurrent{
ExecutorKernel(ThreadPoolExecutor* parent,
int corePoolSize, int maxPoolSize, long long keepAliveTime,
BlockingQueue<decaf::lang::Runnable*>* workQueue,
- ThreadFactory* threadFactory, RejectedExecutionHandler* handler);
+ ThreadFactory* threadFactory, RejectedExecutionHandler* handler) :
+ ctl(ctlOf(RUNNING, 0)),
+ parent(parent),
+ workers(),
+ deadWorkers(),
+ cleanupTimer(),
+ maxPoolSize(maxPoolSize),
+ corePoolSize(corePoolSize),
+ keepAliveTime(keepAliveTime),
+ coreThreadsCanTimeout(false),
+ workQueue(),
+ mainLock(),
+ termination(),
+ completedTasks(0),
+ largestPoolSize(0),
+ factory(),
+ rejectionHandler() {
+
+ if(corePoolSize < 0 || maxPoolSize <= 0 ||
+ maxPoolSize < corePoolSize || keepAliveTime < 0) {
+
+ throw IllegalArgumentException(__FILE__, __LINE__, "Argument out of range.");
+ }
+
+ if(workQueue == NULL || threadFactory == NULL || handler == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "Required parameter was NULL");
+ }
+
+ this->cleanupTimer.scheduleAtFixedRate(
+ new WorkerKiller(this), TimeUnit::SECONDS.toMillis(10), TimeUnit::SECONDS.toMillis(10));
+
+ this->workQueue.reset(workQueue);
+ this->factory.reset(threadFactory);
+ this->rejectionHandler.reset(handler);
+ this->termination.reset(this->mainLock.newCondition());
+ }
+
+ ~ExecutorKernel() {
+ try{
+
+ // Turn off the cleanup timer first so that it doesn't fire while
+ // we transition all the remaining workers into the dead workers
+ // queue while can lead to lock contention.
+ this->cleanupTimer.cancel();
+
+ this->shutdown();
+ this->awaitTermination();
+
+ // Ensure dead Worker Threads are destroyed, the Timer might not have
+ // run recently.
+ Pointer< Iterator<Worker*> > iter(this->deadWorkers.iterator());
+ while(iter->hasNext()) {
+ Worker* worker = iter->next();
+ worker->thread->join();
+ delete worker;
+ }
+ }
+ DECAF_CATCH_NOTHROW(Exception)
+ DECAF_CATCHALL_NOTHROW()
+ }
+
+ // Packing and unpacking ctl
+ static int runStateOf(int c) {
+ return c & ~CAPACITY;
+ }
- ~ExecutorKernel();
+ static int workerCountOf(int c) {
+ return c & CAPACITY;
+ }
- void onTaskStarted(Worker* thread);
+ static int ctlOf(int rs, int wc) {
+ return rs | wc;
+ }
- void onTaskCompleted(Worker* thread);
+ int getPoolSize() {
+ mainLock.lock();
+ try {
+ // Remove rare and surprising possibility of
+ // isTerminated() && getPoolSize() > 0
+ return runStateAtLeast(ctl.get(), TIDYING) ? 0 : workers.size();
+ } catch(Exception& ex) {
+ mainLock.unlock();
+ throw;
+ }
+ mainLock.unlock();
+ }
- void onTaskException(Worker* thread, lang::Exception& ex);
+ int getActiveCount() {
+ mainLock.lock();
+ try {
+ int n = 0;
+ Pointer< Iterator<Worker*> > iter(workers.iterator());
+ while(iter->hasNext()) {
+ Worker* worker = iter->next();
+ if (worker->isLocked()) {
+ ++n;
+ }
+ }
+ return n;
+ } catch(Exception& ex) {
+ mainLock.unlock();
+ throw;
+ }
+ mainLock.unlock();
+ }
- void enQueueTask(Runnable* task);
+ int getLargestPoolSize() {
+ mainLock.lock();
+ try {
+ return largestPoolSize;
+ } catch(Exception& ex) {
+ mainLock.unlock();
+ throw;
+ }
+ mainLock.unlock();
+ }
- Runnable* deQueueTask();
+ long long getTaskCount() {
+ mainLock.lock();
+ try {
+ long long n = completedTasks;
+ Pointer< Iterator<Worker*> > iter(workers.iterator());
+ while(iter->hasNext()) {
+ Worker* worker = iter->next();
+ n += worker->completedTasks;
+ if (worker->isLocked()) {
+ ++n;
+ }
+ }
+ return n + workQueue->size();
+ } catch(Exception& ex) {
+ mainLock.unlock();
+ throw;
+ }
+ mainLock.unlock();
+ }
- bool addWorker();
+ long long getCompletedTaskCount() {
+ mainLock.lock();
+ try {
+ long long n = completedTasks;
+ Pointer< Iterator<Worker*> > iter(workers.iterator());
+ while(iter->hasNext()) {
+ Worker* worker = iter->next();
+ n += worker->completedTasks;
+ }
+ return n;
+ } catch(Exception& ex) {
+ mainLock.unlock();
+ throw;
+ }
+ mainLock.unlock();
+ }
- int addAllWorkers();
+ /**
+ * Transitions to TERMINATED state if either (SHUTDOWN and pool
+ * and queue empty) or (STOP and pool empty). If otherwise
+ * eligible to terminate but workerCount is nonzero, interrupts an
+ * idle worker to ensure that shutdown signals propagate. This
+ * method must be called following any action that might make
+ * termination possible -- reducing worker count or removing tasks
+ * from the queue during shutdown. The method is non-private to
+ * allow access from ScheduledThreadPoolExecutor.
+ */
+ void tryTerminate() {
+ for (;;) {
+ int c = ctl.get();
+ if (isRunning(c) ||
+ runStateAtLeast(c, TIDYING) ||
+ (runStateOf(c) == SHUTDOWN && !workQueue->isEmpty())) {
+ return;
+ }
- bool isStoppedOrStopping();
+ if (workerCountOf(c) != 0) { // Eligible to terminate
+ interruptIdleWorkers(ONLY_ONE);
+ return;
+ }
- void shutdown();
+ mainLock.lock();
+ try {
+ if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
+ try {
+ this->parent->terminated();
+ } catch(Exception& ex) {
+ ctl.set(ctlOf(TERMINATED, 0));
+ termination->signalAll();
+ mainLock.unlock();
+ throw;
+ }
- void shutdownNow(ArrayList<Runnable*>& unexecutedTasks);
+ ctl.set(ctlOf(TERMINATED, 0));
+ termination->signalAll();
+ mainLock.unlock();
+ return;
+ }
+ } catch(Exception& ex) {
+ mainLock.unlock();
+ throw;
+ }
+ mainLock.unlock();
+ // else retry on failed CAS
+ }
+ }
- bool awaitTermination(long long timeout, const TimeUnit& unit);
+ /**
+ * Force an interrupt of all threads even if they are currently active.
+ */
+ void interruptWorkers() {
+ mainLock.lock();
+ try {
+ Pointer< Iterator<Worker*> > iter(this->workers.iterator());
+ while(iter->hasNext()) {
+ iter->next()->thread->interrupt();
+ }
+ } catch(Exception& ex) {
+ mainLock.unlock();
+ throw;
+ }
+ mainLock.unlock();
+ }
- void handleWorkerExit(Worker* worker);
+ /**
+ * Interrupts threads that might be waiting for tasks (as indicated by not
+ * being locked) so they can check for termination or configuration changes.
+ *
+ * @param onlyOne
+ * If true, interrupt at most one worker. This is called only from
+ * tryTerminate when termination is otherwise enabled but there are
+ * still other workers. In this case, at most one waiting worker is
+ * interrupted to propagate shutdown signals in case all threads are
+ * currently waiting. Interrupting any arbitrary thread ensures that
+ * newly arriving workers since shutdown began will also eventually exit.
+ * To guarantee eventual termination, it suffices to always interrupt
+ * only one idle worker, but shutdown() interrupts all idle workers so
+ * that redundant workers exit promptly, not waiting for a straggler
+ * task to finish.
+ */
+ void interruptIdleWorkers(bool onlyOne) {
+ mainLock.lock();
+ try {
+ Pointer< Iterator<Worker*> > iter(this->workers.iterator());
+ while(iter->hasNext()) {
+ Worker* worker = iter->next();
+ Pointer<Thread> thread = worker->thread;
+ if (!thread->isInterrupted() && worker->tryLock()) {
+ try {
+ thread->interrupt();
+ } catch(Exception& ex) {
+ worker->unlock();
+ }
- void tryTerminate();
+ worker->unlock();
+ }
- void drainQueue(ArrayList<Runnable*>& unexecutedTasks);
+ if (onlyOne) {
+ break;
+ }
+ }
+ } catch(Exception& ex) {
+ mainLock.unlock();
+ throw;
+ }
- void interruptIdleWorkers();
- void interruptIdleWorkers(bool onlyOne);
- };
+ mainLock.unlock();
+ }
- class Worker : public lang::Thread {
- private:
+ /**
+ * Common form of interruptIdleWorkers, to avoid having to remember what
+ * the boolean argument means.
+ */
+ void interruptIdleWorkers() {
+ this->interruptIdleWorkers(false);
+ }
+
+ /**
+ * Ensures that unless the pool is stopping, the current thread does not have
+ * its interrupt set. This requires a double-check of state in case the interrupt
+ * was cleared concurrently with a shutdownNow -- if so, the interrupt is re-enabled.
+ */
+ void clearInterruptsForTaskRun() {
+ if (this->runStateLessThan(ctl.get(), STOP) && Thread::interrupted() &&
+ this->runStateAtLeast(ctl.get(), STOP)) {
- bool busy;
- bool done;
- decaf::util::concurrent::ExecutorKernel* kernel;
+ Thread::currentThread()->interrupt();
+ }
+ }
- private:
+ /**
+ * State check needed by ScheduledThreadPoolExecutor to enable running
+ * tasks during shutdown.
+ *
+ * @param shutdownOK
+ * true if should return true if SHUTDOWN
+ */
+ bool isRunningOrShutdown(bool shutdownOK) {
+ int rs = this->runStateOf(ctl.get());
+ return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
+ }
+
+ /**
+ * Main worker run loop. Repeatedly gets tasks from queue and
+ * executes them, while coping with a number of issues:
+ *
+ * 1. We may start out with an initial task, in which case we
+ * don't need to get the first one. Otherwise, as long as pool is
+ * running, we get tasks from getTask. If it returns null then the
+ * worker exits due to changed pool state or configuration
+ * parameters. Other exits result from exception throws in
+ * external code, in which case completedAbruptly holds, which
+ * usually leads processWorkerExit to replace this thread.
+ *
+ * 2. Before running any task, the lock is acquired to prevent
+ * other pool interrupts while the task is executing, and
+ * clearInterruptsForTaskRun called to ensure that unless pool is
+ * stopping, this thread does not have its interrupt set.
+ *
+ * 3. Each task run is preceded by a call to beforeExecute, which
+ * might throw an exception, in which case we cause thread to die
+ * (breaking loop with completedAbruptly true) without processing
+ * the task.
+ *
+ * 4. Assuming beforeExecute completes normally, we run the task,
+ * gathering any of its thrown exceptions to send to
+ * afterExecute. We separately handle RuntimeException, Error
+ * (both of which the specs guarantee that we trap) and arbitrary
+ * Throwables. Because we cannot rethrow Throwables within
+ * Runnable.run, we wrap them within Errors on the way out (to the
+ * thread's UncaughtExceptionHandler). Any thrown exception also
+ * conservatively causes thread to die.
+ *
+ * 5. After task.run completes, we call afterExecute, which may
+ * also throw an exception, which will also cause thread to
+ * die. According to JLS Sec 14.20, this exception is the one that
+ * will be in effect even if task.run throws.
+ *
+ * The net effect of the exception mechanics is that afterExecute
+ * and the thread's UncaughtExceptionHandler have as accurate
+ * information as we can provide about any problems encountered by
+ * user code.
+ *
+ * @param w the worker
+ */
+ void runWorker(Worker* w) {
+ Runnable* task = w->firstTask;
+ w->firstTask = NULL;
+ bool completedAbruptly = true;
+ try {
+ while (task != NULL || (task = getTask()) != NULL) {
+ w->lock();
+ clearInterruptsForTaskRun();
+ try {
+ this->parent->beforeExecute(w->thread.get(), task);
+ try {
+ task->run();
+ } catch (RuntimeException& re) {
+ this->parent->afterExecute(task, &re);
+ throw;
+ } catch (Exception& e) {
+ this->parent->afterExecute(task, &e);
+ throw;
+ }
+
+ this->parent->afterExecute(task, NULL);
+
+ } catch(Exception& ex) {
+ delete task;
+ task = NULL;
+ w->completedTasks++;
+ w->unlock();
+ throw;
+ }
- Worker( const Worker& );
- Worker& operator= ( const Worker& );
+ delete task;
+ task = NULL;
+ w->completedTasks++;
+ w->unlock();
+ }
+
+ completedAbruptly = false;
+ } catch(Exception& ex) {
+ completedAbruptly = true;
+ }
+
+ processWorkerExit(w, completedAbruptly);
+ }
+
+ void execute(Runnable* task, bool takeOwnership DECAF_UNUSED) {
+
+ if (task == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "Runnable task cannot be NULL");
+ }
+
+ /*
+ * Proceed in 3 steps:
+ *
+ * 1. If fewer than corePoolSize threads are running, try to
+ * start a new thread with the given command as its first
+ * task. The call to addWorker atomically checks runState and
+ * workerCount, and so prevents false alarms that would add
+ * threads when it shouldn't, by returning false.
+ *
+ * 2. If a task can be successfully queued, then we still need
+ * to double-check whether we should have added a thread
+ * (because existing ones died since last checking) or that
+ * the pool shut down since entry into this method. So we
+ * recheck state and if necessary roll back the enqueuing if
+ * stopped, or start a new thread if there are none.
+ *
+ * 3. If we cannot queue task, then we try to add a new
+ * thread. If it fails, we know we are shut down or saturated
+ * and so reject the task.
+ */
+ int c = ctl.get();
+ if (workerCountOf(c) < corePoolSize) {
+ if (addWorker(task, true)) {
+ return;
+ }
+ c = ctl.get();
+ }
- public:
+ if (isRunning(c) && workQueue->offer(task)) {
+ int recheck = ctl.get();
+ if (!isRunning(recheck) && this->remove(task)) {
+ this->rejectionHandler->rejectedExecution(task, this->parent);
+ } else if (workerCountOf(recheck) == 0) {
+ addWorker(NULL, false);
+ }
+ } else if (!addWorker(task, false)) {
+ this->rejectionHandler->rejectedExecution(task, this->parent);
+ }
+ }
- Worker(decaf::util::concurrent::ExecutorKernel* kernel) :
- Thread(), busy(false), done(false), kernel(kernel) {
+ void shutdown() {
+ mainLock.lock();
+ try {
+ advanceRunState(SHUTDOWN);
+ interruptIdleWorkers();
+ this->parent->onShutdown();
+ } catch(Exception& ex) {
+ mainLock.unlock();
+ throw;
+ }
+ mainLock.unlock();
+ tryTerminate();
+ }
- if( kernel == NULL ) {
- throw IllegalArgumentException( __FILE__, __LINE__,
- "ThreadPoolExecutor Worker requires non-NULL pointer to parent ExecutorKernel");
+ void shutdownNow(ArrayList<Runnable*>& unexecutedTasks) {
+ mainLock.lock();
+ try {
+ advanceRunState(STOP);
+ interruptWorkers();
+ drainQueue(unexecutedTasks);
+ } catch(Exception& ex) {
+ mainLock.unlock();
+ throw;
}
+ mainLock.unlock();
+ tryTerminate();
+ }
+
+ bool isShutdown() {
+ return !isRunning(ctl.get());
}
- ~Worker() {}
+ bool isTerminating() {
+ int c = ctl.get();
+ return !isRunning(c) && runStateLessThan(c, TERMINATED);
+ }
- void run() {
+ bool isTerminated() {
+ return runStateAtLeast(ctl.get(), TERMINATED);
+ }
+ bool awaitTermination() {
+ mainLock.lock();
try {
- while(!this->done) {
+ for (;;) {
- // Blocks until there something to be done
- Runnable* task = this->kernel->deQueueTask();
+ if (runStateAtLeast(ctl.get(), TERMINATED)) {
+ mainLock.unlock();
+ return true;
+ }
- if(this->done) {
+ this->termination->await();
+ }
- if(task != NULL) {
- delete task;
- }
+ } catch(Exception& ex) {
+ mainLock.unlock();
+ throw;
+ }
+ mainLock.unlock();
+ }
- break;
+ bool awaitTermination(long long timeout, const TimeUnit& unit) {
+ long long nanos = unit.toNanos(timeout);
+ mainLock.lock();
+ try {
+
+ for (;;) {
+
+ if (runStateAtLeast(ctl.get(), TERMINATED)) {
+ mainLock.unlock();
+ return true;
}
- if(!task) {
- throw Exception( __FILE__, __LINE__,
- "Worker - Retrieved NULL task from Kernel.");
+ if (nanos <= 0) {
+ mainLock.unlock();
+ return false;
}
- this->busy = true;
- this->kernel->onTaskStarted(this);
+ nanos = this->termination->awaitNanos(nanos);
+ }
+
+ } catch(Exception& ex) {
+ mainLock.unlock();
+ throw;
+ }
+ mainLock.unlock();
+ }
- try{
- task->run();
- } catch(...) {}
+ void setCorePoolSize(int corePoolSize) {
- delete task;
+ int delta = corePoolSize - this->corePoolSize;
+
+ this->corePoolSize = corePoolSize;
- this->kernel->onTaskCompleted(this);
- this->busy = false;
+ if (workerCountOf(ctl.get()) > corePoolSize) {
+ interruptIdleWorkers();
+ } else if (delta > 0) {
+ // We don't really know how many new threads are "needed".
+ // As a heuristic, prestart enough new workers (up to new
+ // core size) to handle the current number of tasks in
+ // queue, but stop if queue becomes empty while doing so.
+ int k = Math::min(delta, workQueue->size());
+ while (k-- > 0 && addWorker(NULL, true)) {
+ if (workQueue->isEmpty()) {
+ break;
+ }
}
}
- catch( Exception& ex )
- {
- ex.setMark( __FILE__, __LINE__ );
- this->busy = false;
- this->kernel->onTaskException(this, ex);
+ }
+
+ void setMaximumPoolSize(int maximumPoolSize) {
+ if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
+ throw IllegalArgumentException();
}
- catch(...)
- {
- Exception ex(__FILE__, __LINE__, "Worker - Caught Unknown Exception");
- this->busy = false;
- this->kernel->onTaskException(this, ex);
+
+ this->maxPoolSize = maximumPoolSize;
+
+ if (workerCountOf(ctl.get()) > maximumPoolSize) {
+ interruptIdleWorkers();
}
+ }
- this->kernel->handleWorkerExit(this);
+ bool prestartCoreThread() {
+ return workerCountOf(ctl.get()) < corePoolSize && addWorker(NULL, true);
}
- void stop() {
- this->done = true;
+ int prestartAllCoreThreads() {
+ int n = 0;
+ while (addWorker(NULL, true)) {
+ ++n;
+ }
+ return n;
}
- bool isBusy() {
- return this->busy;
+ void allowCoreThreadTimeOut(bool value) {
+ if (value && keepAliveTime <= 0) {
+ throw IllegalArgumentException(__FILE__, __LINE__,
+ "Core threads must have nonzero keep alive times");
+ }
+
+ if (value != this->coreThreadsCanTimeout) {
+ this->coreThreadsCanTimeout = value;
+ if (value) {
+ interruptIdleWorkers();
+ }
+ }
}
- };
+ void setKeepAliveTime(long long time, const TimeUnit& unit) {
+ if (time < 0) {
+ throw IllegalArgumentException();
+ }
- class WorkerKiller : public TimerTask {
- private:
+ if (time == 0 && this->coreThreadsCanTimeout) {
+ throw IllegalArgumentException(__FILE__, __LINE__,
+ "Core threads must have nonzero keep alive times");
+ }
- ExecutorKernel* kernel;
+ long long keepAliveTime = unit.toNanos(time);
+ long long delta = keepAliveTime - this->keepAliveTime;
- public:
+ this->keepAliveTime = keepAliveTime;
- WorkerKiller(ExecutorKernel* kernel) : kernel(kernel) {
+ if (delta < 0) {
+ interruptIdleWorkers();
+ }
}
- virtual ~WorkerKiller() {}
+ void purge() {
+ Pointer< BlockingQueue<Runnable*> > q = workQueue;
+ try {
- virtual void run() {
- try{
- synchronized(&kernel->mainLock) {
- Pointer< Iterator<Worker*> > iter( kernel->deadWorkers.iterator() );
- while(iter->hasNext()) {
- delete iter->next();
+ Pointer< Iterator<Runnable*> > iter(q->iterator());
+ while (iter->hasNext()) {
+ Runnable* r = iter->next();
+ FutureType* future = dynamic_cast<FutureType*>(r);
+ if (r != NULL && future->isCancelled()) {
iter->remove();
}
}
- } catch(...) {}
+
+ } catch (ConcurrentModificationException& ex) {
+ // Take slow path if we encounter interference during traversal.
+ // Make copy for traversal and call remove for cancelled entries.
+ // The slow path is more likely to be O(N*N).
+ std::vector<Runnable*> array = q->toArray();
+ std::vector<Runnable*>::const_iterator iter = array.begin();
+ for(; iter != array.end(); ++iter) {
+ Runnable* r = *iter;
+ FutureType* future = dynamic_cast<FutureType*>(r);
+ if (r != NULL && future->isCancelled()) {
+ q->remove(r);
+ }
+ }
+ }
+
+ tryTerminate(); // In case SHUTDOWN and now empty
+ }
+
+ bool remove(Runnable* task) {
+ bool removed = this->workQueue->remove(task);
+ this->tryTerminate();
+ return removed;
+ }
+
+ private:
+
+ static bool runStateLessThan(int c, int s) {
+ return c < s;
+ }
+
+ static bool runStateAtLeast(int c, int s) {
+ return c >= s;
+ }
+
+ static bool isRunning(int c) {
+ return c < SHUTDOWN;
+ }
+
+ private:
+
+ /**
+ * Drains the task queue into a new list, normally using drainTo. But if
+ * the queue is a DelayQueue or any other kind of queue for which poll or
+ * drainTo may fail to remove some elements, it deletes them one by one.
+ *
+ * @param unexecutedTasks
+ * Reference to an ArrayList where the tasks are to be moved.
+ */
+ void drainQueue(ArrayList<Runnable*>& unexecutedTasks) {
+
+ // Some Queue implementations can fail in poll and drainTo so we check
+ // after attempting to drain the Queue and if its not empty we remove
+ // the tasks one by one.
+
+ this->workQueue->drainTo(unexecutedTasks);
+ if (!this->workQueue->isEmpty()) {
+
+ std::vector<Runnable*> tasks = this->workQueue->toArray();
+ std::vector<Runnable*>::iterator iter = tasks.begin();
+
+ for (; iter != tasks.end(); ++iter) {
+
+ if (this->workQueue->remove(*iter)) {
+ unexecutedTasks.add(*iter);
+ }
+ }
+ }
+ }
+
+ /**
+ * Transitions runState to given target, or leaves it alone if already at
+ * least the given target.
+ *
+ * @param targetState the desired state, either SHUTDOWN or STOP
+ * (but not TIDYING or TERMINATED -- use tryTerminate for that)
+ */
+ void advanceRunState(int targetState) {
+ for (;;) {
+ int c = ctl.get();
+ if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
+ break;
+ }
+ }
+
+ /**
+ * Checks if a new worker can be added with respect to current pool state
+ * and the given bound (either core or maximum). If so, the worker count
+ * is adjusted accordingly, and, if possible, a new worker is created and
+ * started running firstTask as its first task. This method returns false
+ * if the pool is stopped or eligible to shut down. It also returns false
+ * if the thread factory fails to create a thread when asked, which requires
+ * a backout of workerCount, and a recheck for termination, in case the
+ * existence of this worker was holding up termination.
+ *
+ * @param firstTask
+ * The task the new thread should run first (or null if none).
+ * Workers are created with an initial first task (in method execute())
+ * to bypass queuing when there are fewer than corePoolSize threads
+ * (in which case we always start one), or when the queue is full
+ * (in which case we must bypass queue). Initially idle threads are
+ * usually created via prestartCoreThread or to replace other dying workers.
+ *
+ * @param core
+ * If true use corePoolSize as bound, else maximumPoolSize.
+ *
+ * @return true if successful
+ */
+ bool addWorker(Runnable* firstTask, bool core) {
+ retry:
+ for (;;) {
+ int c = ctl.get();
+ int rs = this->runStateOf(c);
+
+ // Check if queue empty only if necessary.
+ if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == NULL && !workQueue->isEmpty())) {
+ return false;
+ }
+
+ for (;;) {
+ int wc = this->workerCountOf(c);
+ if (wc >= CAPACITY || wc >= (core ? this->corePoolSize : this->maxPoolSize)) {
+ return false;
+ }
+ if (compareAndIncrementWorkerCount(c)) {
+ goto success;
+ }
+ c = ctl.get(); // Re-read ctl
+ if (runStateOf(c) != rs) {
+ goto retry;
+ }
+ // else CAS failed due to workerCount change; retry inner loop
+ }
+ }
+
+ success:
+
+ Pointer<Worker> w(new Worker(this, firstTask));
+ Pointer<Thread> t = w->thread;
+
+ mainLock.lock();
+ try {
+ // Recheck while holding lock. Back out on ThreadFactory failure or if
+ // shut down before lock acquired.
+ int c = ctl.get();
+ int rs = runStateOf(c);
+
+ if (t == NULL || (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == NULL))) {
+ decrementWorkerCount();
+ tryTerminate();
+ return false;
+ }
+
+ workers.add(w.release());
+
+ int s = workers.size();
+ if (s > largestPoolSize) {
+ largestPoolSize = s;
+ }
+
+ } catch(Exception& ex) {
+ mainLock.unlock();
+ throw;
+ }
+ mainLock.unlock();
+
+ t->start();
+ // It is possible (but unlikely) for a thread to have been added to
+ // workers, but not yet started, during transition to STOP, which
+ // could result in a rare missed interrupt, because Thread::interrupt
+ // is not guaranteed to have any effect on a non-yet-started Thread
+ // (see Thread#interrupt).
+ if (runStateOf(ctl.get()) == STOP && !t->isInterrupted()) {
+ t->interrupt();
+ }
+
+ return true;
+ }
+
+ /**
+ * Performs cleanup and bookkeeping for a dying worker. Called only from
+ * worker threads. Unless completedAbruptly is set, assumes that workerCount
+ * has already been adjusted to account for exit. This method removes
+ * thread from worker set, and possibly terminates the pool or replaces the
+ * worker if either it exited due to user task exception or if fewer than
+ * corePoolSize workers are running or queue is non-empty but there are no
+ * workers.
+ *
+ * @param w
+ * The worker that has completed or exited.
+ * @param completedAbruptly
+ * Indicates if the worker died due to user exception.
+ */
+ void processWorkerExit(Worker* w, bool completedAbruptly) {
+
+ if (completedAbruptly) { // If abrupt, then workerCount wasn't adjusted
+ decrementWorkerCount();
+ }
+
+ mainLock.lock();
+ try {
+ this->completedTasks += w->completedTasks;
+ this->workers.remove(w);
+ this->deadWorkers.add(w);
+ } catch(...) {
+ }
+ mainLock.unlock();
+
+ tryTerminate();
+
+ int c = ctl.get();
+ if (runStateLessThan(c, STOP)) {
+ if (!completedAbruptly) {
+ int min = this->coreThreadsCanTimeout ? 0 : corePoolSize;
+ if (min == 0 && ! workQueue->isEmpty()) {
+ min = 1;
+ }
+ if (workerCountOf(c) >= min) {
+ return; // replacement not needed
+ }
+ }
+ addWorker(NULL, false);
+ }
+ }
+
+ /**
+ * Performs blocking or timed wait for a task, depending on current configuration
+ * settings, or returns null if this worker must exit because of any of:
+ *
+ * 1. There are more than maximumPoolSize workers (due to
+ * a call to setMaximumPoolSize).
+ * 2. The pool is stopped.
+ * 3. The pool is shutdown and the queue is empty.
+ * 4. This worker timed out waiting for a task, and timed-out
+ * workers are subject to termination (that is,
+ * {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
+ * both before and after the timed wait.
+ *
+ * @return task, or NULL if the worker must exit, in which case
+ * workerCount is decremented
+ */
+ Runnable* getTask() {
+ bool timedOut = false; // Did the last poll() time out?
+
+ retry:
+ for (;;) {
+ int c = ctl.get();
+ int rs = runStateOf(c);
+
+ // Check if queue empty only if necessary.
+ if (rs >= SHUTDOWN && (rs >= STOP || workQueue->isEmpty())) {
+ decrementWorkerCount();
+ return NULL;
+ }
+
+ bool timed;
+
+ for (;;) {
+ int wc = workerCountOf(c);
+ timed = this->coreThreadsCanTimeout || wc > this->corePoolSize;
+
+ if (wc <= this->maxPoolSize && ! (timedOut && timed)) {
+ break;
+ }
+ if (compareAndDecrementWorkerCount(c)) {
+ return NULL;
+ }
+ c = ctl.get(); // Re-read ctl
+ if (runStateOf(c) != rs) {
+ goto retry;
+ }
+ // else CAS failed due to workerCount change; retry inner loop
+ }
+
+ try {
+ Runnable* r = NULL;
+ if (timed) {
+ workQueue->poll(r, keepAliveTime, TimeUnit::NANOSECONDS);
+ } else {
+ r = workQueue->take();
+ }
+
+ if (r != NULL) {
+ return r;
+ }
+
+ timedOut = true;
+ } catch (InterruptedException& retry) {
+ timedOut = false;
+ }
+ }
+ }
+
+ /**
+ * Attempt to CAS-increment the workerCount field of ctl.
+ */
+ bool compareAndIncrementWorkerCount(int expect) {
+ return ctl.compareAndSet(expect, expect + 1);
+ }
+
+ /**
+ * Attempt to CAS-decrement the workerCount field of ctl.
+ */
+ bool compareAndDecrementWorkerCount(int expect) {
+ return ctl.compareAndSet(expect, expect - 1);
+ }
+
+ /**
+ * Decrements the workerCount field of ctl. This is called only on
+ * abrupt termination of a thread (see processWorkerExit). Other
+ * decrements are performed within getTask.
+ */
+ void decrementWorkerCount() {
+ do {} while (!compareAndDecrementWorkerCount(ctl.get()));
}
};
+ const bool ExecutorKernel::ONLY_ONE = true;
+
+ const int ExecutorKernel::COUNT_BITS = Integer::SIZE - 3;
+ const int ExecutorKernel::CAPACITY = (1 << COUNT_BITS) - 1;
+
+ // runState is stored in the high-order bits
+ const int ExecutorKernel::RUNNING = -1 << ExecutorKernel::COUNT_BITS;
+ const int ExecutorKernel::SHUTDOWN = 0 << ExecutorKernel::COUNT_BITS;
+ const int ExecutorKernel::STOP = 1 << ExecutorKernel::COUNT_BITS;
+ const int ExecutorKernel::TIDYING = 2 << ExecutorKernel::COUNT_BITS;
+ const int ExecutorKernel::TERMINATED = 3 << ExecutorKernel::COUNT_BITS;
+
}}}
////////////////////////////////////////////////////////////////////////////////
@@ -371,7 +1383,26 @@ ThreadPoolExecutor::~ThreadPoolExecutor(
}
////////////////////////////////////////////////////////////////////////////////
-void ThreadPoolExecutor::execute(Runnable* task ) {
+void ThreadPoolExecutor::execute(Runnable* task) {
+
+ try{
+
+ if( task == NULL ) {
+ throw NullPointerException(
+ __FILE__, __LINE__,
+ "ThreadPoolExecutor::execute - Supplied Runnable pointer was NULL.");
+ }
+
+ this->kernel->execute(task, true);
+ }
+ DECAF_CATCH_RETHROW( RejectedExecutionException )
+ DECAF_CATCH_RETHROW( NullPointerException )
+ DECAF_CATCH_RETHROW( Exception )
+ DECAF_CATCHALL_THROW( Exception )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPoolExecutor::execute(Runnable* task, bool takeOwnership) {
try{
@@ -381,7 +1412,7 @@ void ThreadPoolExecutor::execute(Runnabl
"ThreadPoolExecutor::execute - Supplied Runnable pointer was NULL.");
}
- this->kernel->enQueueTask(task);
+ this->kernel->execute(task, takeOwnership);
}
DECAF_CATCH_RETHROW( RejectedExecutionException )
DECAF_CATCH_RETHROW( NullPointerException )
@@ -424,12 +1455,7 @@ bool ThreadPoolExecutor::awaitTerminatio
////////////////////////////////////////////////////////////////////////////////
int ThreadPoolExecutor::getPoolSize() const {
- int result = 0;
- synchronized(&this->kernel->mainLock) {
- result = this->kernel->workers.size();
- }
-
- return result;
+ return this->kernel->getPoolSize();
}
////////////////////////////////////////////////////////////////////////////////
@@ -444,25 +1470,7 @@ void ThreadPoolExecutor::setCorePoolSize
throw IllegalArgumentException(__FILE__, __LINE__, "Pool size given was negative.");
}
- synchronized(&this->kernel->mainLock) {
-
- //int delta = poolSize - this->kernel->corePoolSize;
- this->kernel->corePoolSize = poolSize;
-
- if (this->kernel->workers.size() > poolSize) {
- // TODO - Once Threads are interruptible wake them up so some can terminate.
- } else {
-
- // TODO - Create new threads up to the new pool size, unless we are out
- // of work or run out while creating.
-// int target = Math::min(delta, this->kernel->workQueue->size());
-// while (target-- > 0 && addWorker(NULL, true)) {
-// if (this->kernel->workQueue->isEmpty()) {
-// break;
-// }
-// }
- }
- }
+ this->kernel->setCorePoolSize(poolSize);
}
////////////////////////////////////////////////////////////////////////////////
@@ -473,53 +1481,31 @@ int ThreadPoolExecutor::getMaximumPoolSi
////////////////////////////////////////////////////////////////////////////////
void ThreadPoolExecutor::setMaximumPoolSize(int maxSize) {
- if (maxSize < 0 || maxSize < this->kernel->corePoolSize) {
- throw IllegalArgumentException(__FILE__, __LINE__, "Size given was invalid.");
+ if (maxSize < 0) {
+ throw IllegalArgumentException(__FILE__, __LINE__, "Maximum Pool size given was negative.");
}
- this->kernel->maxPoolSize = maxSize;
-
- if (this->kernel->workers.size() > maxSize) {
- // TODO - Wake idle worker threads when able to.
- }
+ this->kernel->setMaximumPoolSize(maxSize);
}
////////////////////////////////////////////////////////////////////////////////
long long ThreadPoolExecutor::getTaskCount() const {
- return this->kernel->workQueue->size();
+ return this->kernel->getTaskCount();
}
////////////////////////////////////////////////////////////////////////////////
int ThreadPoolExecutor::getActiveCount() const {
-
- int result = 0;
- synchronized(&this->kernel->mainLock) {
- if(!this->kernel->terminated.get()) {
- result = this->kernel->workers.size() - this->kernel->freeThreads.get();
- }
- }
-
- return result;
+ return this->kernel->getActiveCount();
}
////////////////////////////////////////////////////////////////////////////////
long long ThreadPoolExecutor::getCompletedTaskCount() const {
- long long result = 0;
- synchronized(&this->kernel->mainLock) {
- result = this->kernel->completedTasks;
- }
-
- return result;
+ return this->kernel->getCompletedTaskCount();
}
////////////////////////////////////////////////////////////////////////////////
int ThreadPoolExecutor::getLargestPoolSize() const {
- int result = 0;
- synchronized(&this->kernel->mainLock) {
- result = this->kernel->largestPoolSize;
- }
-
- return result;
+ return this->kernel->getLargestPoolSize();
}
////////////////////////////////////////////////////////////////////////////////
@@ -565,33 +1551,22 @@ BlockingQueue<Runnable*>* ThreadPoolExec
////////////////////////////////////////////////////////////////////////////////
bool ThreadPoolExecutor::isShutdown() const {
- return this->kernel->stopped.get();
+ return this->kernel->isShutdown();
}
////////////////////////////////////////////////////////////////////////////////
bool ThreadPoolExecutor::isTerminated() const {
- return this->kernel->terminated.get();
+ return this->kernel->isTerminated();
}
////////////////////////////////////////////////////////////////////////////////
bool ThreadPoolExecutor::isTerminating() const {
- return this->kernel->isStoppedOrStopping() && !this->kernel->terminated.get();
+ return this->kernel->isTerminating();
}
////////////////////////////////////////////////////////////////////////////////
void ThreadPoolExecutor::allowCoreThreadTimeout(bool value) {
-
- if (value == true && this->kernel->keepAliveTime == 0) {
- throw IllegalArgumentException(__FILE__, __LINE__,
- "Keep Alive Time must be set to a non-zero value to enable this option.");
- }
-
- if (value != this->kernel->coreThreadsCanTimeout) {
- this->kernel->coreThreadsCanTimeout = value;
- if (value == true) {
- // TODO - When Threads are interruptible wake works so they can check timeout.
- }
- }
+ this->kernel->allowCoreThreadTimeOut(value);
}
////////////////////////////////////////////////////////////////////////////////
@@ -601,22 +1576,7 @@ long long ThreadPoolExecutor::getKeepAli
////////////////////////////////////////////////////////////////////////////////
void ThreadPoolExecutor::setKeepAliveTime(long long timeout, const TimeUnit& unit) {
-
- if (timeout < 0) {
- throw IllegalArgumentException(__FILE__, __LINE__, "Timeout value cannot be negative.");
- }
-
- if (this->kernel->coreThreadsCanTimeout == true && unit.toMillis(timeout) == 0) {
- throw IllegalArgumentException(__FILE__, __LINE__,
- "Keep Alive Time must be set to a non-zero value when allowCoreThreadsTimeout is enabled.");
- }
-
- long keepAliveTime = unit.toMillis(timeout);
- long delta = keepAliveTime - this->kernel->keepAliveTime;
- this->kernel->keepAliveTime = keepAliveTime;
- if (delta < 0) {
- // TODO - When Threads are interruptible wake works so they can check timeout.
- }
+ this->kernel->setKeepAliveTime(timeout, unit);
}
////////////////////////////////////////////////////////////////////////////////
@@ -626,23 +1586,22 @@ bool ThreadPoolExecutor::allowsCoreThrea
////////////////////////////////////////////////////////////////////////////////
bool ThreadPoolExecutor::prestartCoreThread() {
- return this->kernel->addWorker();
+ return this->kernel->prestartCoreThread();
}
////////////////////////////////////////////////////////////////////////////////
int ThreadPoolExecutor::prestartAllCoreThreads() {
- return this->kernel->addAllWorkers();
+ return this->kernel->prestartAllCoreThreads();
}
////////////////////////////////////////////////////////////////////////////////
bool ThreadPoolExecutor::remove(decaf::lang::Runnable* task) {
- bool removed = this->kernel->workQueue->remove(task);
- this->kernel->tryTerminate();
- return removed;
+ return this->kernel->remove(task);
}
////////////////////////////////////////////////////////////////////////////////
void ThreadPoolExecutor::purge() {
+ this->kernel->purge();
}
////////////////////////////////////////////////////////////////////////////////
@@ -658,392 +1617,5 @@ void ThreadPoolExecutor::terminated() {
}
////////////////////////////////////////////////////////////////////////////////
-ExecutorKernel::ExecutorKernel(ThreadPoolExecutor* parent, int corePoolSize,
- int maxPoolSize, long long keepAliveTime,
- BlockingQueue<decaf::lang::Runnable*>* workQueue,
- ThreadFactory* threadFactory, RejectedExecutionHandler* handler) :
- parent(parent),
- workers(),
- deadWorkers(),
- cleanupTimer(),
- stopping(false),
- stopped(false),
- terminated(false),
- freeThreads(0),
- maxPoolSize(maxPoolSize),
- corePoolSize(corePoolSize),
- keepAliveTime(keepAliveTime),
- coreThreadsCanTimeout(false),
- workQueue(),
- mainLock(),
- termination(1),
- completedTasks(0),
- largestPoolSize(0),
- factory(),
- rejectionHandler() {
-
- if(corePoolSize < 0 || maxPoolSize <= 0 ||
- maxPoolSize < corePoolSize || keepAliveTime < 0) {
-
- throw IllegalArgumentException(__FILE__, __LINE__, "Argument out of range.");
- }
-
- if(workQueue == NULL || threadFactory == NULL || handler == NULL) {
- throw NullPointerException(__FILE__, __LINE__, "Required parameter was NULL");
- }
-
- this->cleanupTimer.scheduleAtFixedRate(
- new WorkerKiller(this), TimeUnit::SECONDS.toMillis(10), TimeUnit::SECONDS.toMillis(10));
-
- this->workQueue.reset(workQueue);
- this->factory.reset(threadFactory);
- this->rejectionHandler.reset(handler);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-ExecutorKernel::~ExecutorKernel() {
- try{
-
- this->shutdown();
- this->tryTerminate();
-
- this->termination.await();
-
- this->cleanupTimer.cancel();
-
- // Ensure dead Worker Threads are destroyed, the Timer might not have
- // run recently.
- Pointer< Iterator<Worker*> > iter(this->deadWorkers.iterator());
- while(iter->hasNext()) {
- Worker* worker = iter->next();
- worker->join();
- delete worker;
- }
- }
- DECAF_CATCH_NOTHROW(Exception)
- DECAF_CATCHALL_NOTHROW()
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ExecutorKernel::onTaskStarted(Worker* thread DECAF_UNUSED) {
-
- try{
-
- synchronized( &mainLock ) {
-
- freeThreads.decrementAndGet();
-
- // Now that this callback has decremented the free threads counter
- // lets check if there is any outstanding work to be done and no
- // threads to handle it. This could happen if the QueueTask
- // method was called successively without any of the PooledThreads
- // having a chance to wake up and service the queue. This would
- // cause the number of Task to exceed the number of free threads
- // once the Threads got a chance to wake up and service the queue
- if( freeThreads.get() == 0 && !workQueue->isEmpty() ) {
- addWorker();
- }
-
- // TODO - Get actual values for these.
- this->parent->beforeExecute(thread, NULL);
- }
- }
- DECAF_CATCH_RETHROW( Exception )
- DECAF_CATCHALL_THROW( Exception )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ExecutorKernel::onTaskCompleted(Worker* thread DECAF_UNUSED) {
-
- try {
- synchronized(&mainLock) {
- freeThreads.incrementAndGet();
- completedTasks++;
-
- // TODO - Get actual values for these.
- this->parent->afterExecute(NULL, NULL);
- }
- }
- DECAF_CATCH_RETHROW( Exception )
- DECAF_CATCHALL_THROW( Exception )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ExecutorKernel::onTaskException(Worker* thread DECAF_UNUSED, lang::Exception& ex DECAF_UNUSED) {
-
- try{
- }
- DECAF_CATCH_RETHROW( Exception )
- DECAF_CATCHALL_THROW( Exception )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ExecutorKernel::handleWorkerExit(Worker* worker) {
-
- synchronized( &this->mainLock ) {
-
- this->workers.remove(worker);
- this->deadWorkers.add(worker);
-
- if(this->workers.isEmpty()) {
- this->parent->terminated();
- this->terminated = true;
- this->termination.countDown();
- }
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ExecutorKernel::enQueueTask(Runnable* task) {
-
- try{
-
- synchronized( &this->mainLock ) {
-
- // If there's nobody open to do work, then create some more
- // threads to handle the work.
- if( this->freeThreads.get() == 0 ) {
- addWorker();
- }
-
- // queue the new work.
- if(isStoppedOrStopping() || !this->workQueue->offer(task)) {
- this->rejectionHandler->rejectedExecution(task, this->parent);
- }
- }
- }
- DECAF_CATCH_RETHROW( RejectedExecutionException )
- DECAF_CATCH_RETHROW( Exception )
- DECAF_CATCHALL_THROW( Exception )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-Runnable* ExecutorKernel::deQueueTask() {
-
- try{
-
- Runnable* task = NULL;
-
- while(true) {
-
- // TODO - Threads aren't interruptible yet, so spin wait.
- if(workQueue->poll(task, 10, TimeUnit::MILLISECONDS)) {
- break;
- }
-
-// try {
-// if ((task = workQueue->take()) != NULL) {
-// break;
-// }
-// } catch(InterruptedException& ex) {
-// }
-
- if(isStoppedOrStopping() && workQueue->isEmpty()) {
- break;
- }
- }
-
- if(isStoppedOrStopping() && task == NULL) {
- return NULL;
- }
-
- if( task == NULL ) {
- throw lang::Exception(__FILE__, __LINE__,
- "deQueueTask: Got empty Runnable while not in shutdown.");
- }
-
- return task;
- }
- DECAF_CATCH_RETHROW( Exception )
- DECAF_CATCHALL_THROW( Exception )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-bool ExecutorKernel::addWorker() {
-
- try{
-
- if( this->isStoppedOrStopping() ) {
- return false;
- }
-
- if( this->workers.size() >= this->maxPoolSize ) {
- return false;
- }
-
- synchronized( &mainLock ) {
- Worker* newWorker = new Worker(this);
- this->workers.add(newWorker);
- freeThreads.incrementAndGet();
- newWorker->start();
- this->largestPoolSize++;
- }
-
- return true;
- }
- DECAF_CATCH_RETHROW( Exception )
- DECAF_CATCHALL_THROW( Exception )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-int ExecutorKernel::addAllWorkers() {
-
- try{
-
- if( this->isStoppedOrStopping() ) {
- return 0;
- }
-
- if( this->workers.size() >= this->maxPoolSize ) {
- return 0;
- }
-
- int delta = 0;
-
- synchronized( &mainLock ) {
-
- delta = this->maxPoolSize - this->workers.size();
-
- for(int i = 0; i < delta; ++i) {
- Worker* newWorker = new Worker(this);
- this->workers.add(newWorker);
- freeThreads.incrementAndGet();
- newWorker->start();
- this->largestPoolSize++;
- }
- }
-
- return delta;
- }
- DECAF_CATCH_RETHROW( Exception )
- DECAF_CATCHALL_THROW( Exception )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-bool ExecutorKernel::isStoppedOrStopping() {
- if(this->stopped.get() || this->stopping.get()) {
- return true;
- }
-
- return false;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ExecutorKernel::shutdown() {
-
- if(isStoppedOrStopping()) {
- return;
- }
-
- if(this->stopping.compareAndSet(false, true)) {
-
- synchronized(&mainLock) {
-
- // TODO - When threads are interruptible, we need to interrupt the Queue.
- //synchronized( workQueue.get() ) {
- // // Signal the Queue so that all waiters are notified
- // workQueue->notifyAll();
- //}
- }
-
- this->tryTerminate();
- this->stopped.set(true);
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ExecutorKernel::shutdownNow(ArrayList<Runnable*>& unexecutedTasks) {
-
- if(isStoppedOrStopping()) {
- return;
- }
-
- if(this->stopping.compareAndSet(false, true)) {
-
- synchronized(&mainLock) {
-
- Pointer< Iterator<Worker*> > iter(this->workers.iterator());
-
- while(iter->hasNext()) {
- iter->next()->stop();
- }
-
- // TODO - When threads are interruptible, we need to interrupt the Queue.
- //synchronized( workQueue.get() ) {
- // // Signal the Queue so that all waiters are notified
- // workQueue->notifyAll();
- //}
-
- this->drainQueue(unexecutedTasks);
- }
-
- this->tryTerminate();
- this->stopped.set(true);
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ExecutorKernel::drainQueue(ArrayList<Runnable*>& unexecutedTasks) {
-
- // Some Queue implementations can fail in poll and drainTo so we check
- // after attempting to drain the Queue and if its not empty we remove
- // the tasks one by one.
-
- this->workQueue->drainTo(unexecutedTasks);
- if (!this->workQueue->isEmpty()) {
-
- std::vector<Runnable*> tasks = this->workQueue->toArray();
- std::vector<Runnable*>::iterator iter = tasks.begin();
-
- for (; iter != tasks.end(); ++iter) {
-
- if (this->workQueue->remove(*iter)) {
- unexecutedTasks.add(*iter);
- }
- }
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-bool ExecutorKernel::awaitTermination(long long timeout, const TimeUnit& unit) {
-
- if (this->terminated.get() == true) {
- return true;
- }
-
- return this->termination.await(timeout, unit);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ExecutorKernel::tryTerminate() {
-
- if (!this->isStoppedOrStopping() || (this->isStoppedOrStopping() && !this->workQueue->isEmpty())) {
- return;
- }
-
- if (this->workers.size() > 0) {
- // TODO - Once they are interruptible wake a worker.
- return;
- }
-
- synchronized(&this->mainLock) {
- try {
- this->parent->terminated();
- } catch(...) {}
-
- this->terminated.set(true);
- this->termination.countDown();
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ExecutorKernel::interruptIdleWorkers() {
- this->interruptIdleWorkers(false);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ExecutorKernel::interruptIdleWorkers(bool onlyOne DECAF_UNUSED) {
-
- synchronized(&this->mainLock) {
- }
+void ThreadPoolExecutor::onShutdown() {
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h?rev=1147898&r1=1147897&r2=1147898&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h Mon Jul 18 14:28:03 2011
@@ -212,6 +212,8 @@ namespace concurrent{
virtual void execute(decaf::lang::Runnable* task);
+ virtual void execute(decaf::lang::Runnable* task, bool takeOwnership);
+
virtual void shutdown();
virtual ArrayList<decaf::lang::Runnable*> shutdownNow();
@@ -493,6 +495,13 @@ namespace concurrent{
*/
virtual void terminated();
+ protected:
+
+ /**
+ * Used by some Decaf ThreadPoolExecutor extensions to correctly handle the shutdown case.
+ */
+ virtual void onShutdown();
+
public: // RejectedExecutionHandler implementations.
/**
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTestSupport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTestSupport.h?rev=1147898&r1=1147897&r2=1147898&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTestSupport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTestSupport.h Mon Jul 18 14:28:03 2011
@@ -130,6 +130,47 @@ namespace concurrent {
}
};
+ class SmallInterruptedRunnable : public decaf::lang::Runnable {
+ private:
+
+ ExecutorsTestSupport* parent;
+
+ public:
+
+ SmallInterruptedRunnable(ExecutorsTestSupport* parent) : decaf::lang::Runnable() {
+ }
+
+ virtual ~SmallInterruptedRunnable() {}
+
+ virtual void run() {
+ try {
+ Thread::sleep(SMALL_DELAY_MS);
+ parent->threadShouldThrow();
+ } catch(decaf::lang::Exception& e) {
+ }
+ }
+ };
+
+ class SmallPossiblyInterruptedRunnable : public decaf::lang::Runnable {
+ private:
+
+ ExecutorsTestSupport* parent;
+
+ public:
+
+ SmallPossiblyInterruptedRunnable(ExecutorsTestSupport* parent) : decaf::lang::Runnable() {
+ }
+
+ virtual ~SmallPossiblyInterruptedRunnable() {}
+
+ virtual void run() {
+ try {
+ Thread::sleep(SMALL_DELAY_MS);
+ } catch(decaf::lang::exceptions::InterruptedException& e) {
+ }
+ }
+ };
+
class MediumRunnable : public decaf::lang::Runnable {
private:
@@ -151,6 +192,47 @@ namespace concurrent {
}
};
+ class MediumInterruptedRunnable : public decaf::lang::Runnable {
+ private:
+
+ ExecutorsTestSupport* parent;
+
+ public:
+
+ MediumInterruptedRunnable(ExecutorsTestSupport* parent) : decaf::lang::Runnable() {
+ }
+
+ virtual ~MediumInterruptedRunnable() {}
+
+ virtual void run() {
+ try {
+ Thread::sleep(MEDIUM_DELAY_MS);
+ parent->threadShouldThrow();
+ } catch(decaf::lang::Exception& e) {
+ }
+ }
+ };
+
+ class MediumPossiblyInterruptedRunnable : public decaf::lang::Runnable {
+ private:
+
+ ExecutorsTestSupport* parent;
+
+ public:
+
+ MediumPossiblyInterruptedRunnable(ExecutorsTestSupport* parent) : decaf::lang::Runnable() {
+ }
+
+ virtual ~MediumPossiblyInterruptedRunnable() {}
+
+ virtual void run() {
+ try {
+ Thread::sleep(MEDIUM_DELAY_MS);
+ } catch(decaf::lang::exceptions::InterruptedException& e) {
+ }
+ }
+ };
+
class LongRunnable : public decaf::lang::Runnable {
private:
@@ -172,6 +254,48 @@ namespace concurrent {
}
};
+ class LongInterruptedRunnable : public decaf::lang::Runnable {
+ private:
+
+ ExecutorsTestSupport* parent;
+
+ public:
+
+ LongInterruptedRunnable(ExecutorsTestSupport* parent) : decaf::lang::Runnable() {
+ }
+
+ virtual ~LongInterruptedRunnable() {}
+
+ virtual void run() {
+ try {
+ Thread::sleep(LONG_DELAY_MS);
+ parent->threadShouldThrow();
+ } catch(decaf::lang::Exception& e) {
+ }
+ }
+ };
+
+ class LongPossiblyInterruptedRunnable : public decaf::lang::Runnable {
+ private:
+
+ ExecutorsTestSupport* parent;
+
+ public:
+
+ LongPossiblyInterruptedRunnable(ExecutorsTestSupport* parent) : decaf::lang::Runnable() {
+ }
+
+ virtual ~LongPossiblyInterruptedRunnable() {}
+
+ virtual void run() {
+ try {
+ Thread::sleep(LONG_DELAY_MS);
+ } catch(decaf::lang::exceptions::InterruptedException& e) {
+ }
+ }
+ };
+
+
class SimpleThreadFactory : public ThreadFactory {
public:
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp?rev=1147898&r1=1147897&r2=1147898&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp Mon Jul 18 14:28:03 2011
@@ -239,7 +239,7 @@ void ThreadPoolExecutorTest::testAwaitTe
///////////////////////////////////////////////////////////////////////////////
void ThreadPoolExecutorTest::testMoreTasksThanMaxPoolSize() {
- ThreadPoolExecutor pool(1, 3, 5, TimeUnit::SECONDS, new LinkedBlockingQueue<Runnable*>());
+ ThreadPoolExecutor pool(3, 3, 5, TimeUnit::SECONDS, new LinkedBlockingQueue<Runnable*>());
CPPUNIT_ASSERT( pool.getMaximumPoolSize() == 3);
@@ -559,7 +559,7 @@ void ThreadPoolExecutorTest::testGetQueu
ThreadPoolExecutor p1(1, 1, LONG_DELAY_MS, TimeUnit::MILLISECONDS, q);
Runnable* tasks[5];
for (int i = 0; i < 5; i++){
- tasks[i] = new MediumRunnable(this);
+ tasks[i] = new MediumPossiblyInterruptedRunnable(this);
p1.execute(tasks[i]);
}
try {
@@ -583,7 +583,7 @@ void ThreadPoolExecutorTest::testRemove(
ThreadPoolExecutor p1(1, 1, LONG_DELAY_MS, TimeUnit::MILLISECONDS, q);
Runnable* tasks[5];
for (int i = 0; i < 5; i++){
- tasks[i] = new MediumRunnable(this);
+ tasks[i] = new MediumPossiblyInterruptedRunnable(this);
p1.execute(tasks[i]);
}
@@ -619,7 +619,7 @@ void ThreadPoolExecutorTest::testShutDow
try {
for (int i = 0; i < 5; i++) {
- p1.execute(new MediumRunnable(this));
+ p1.execute(new MediumPossiblyInterruptedRunnable(this));
}
}
catch(...) {
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h?rev=1147898&r1=1147897&r2=1147898&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h Mon Jul 18 14:28:03 2011
@@ -53,7 +53,7 @@ namespace concurrent{
CPPUNIT_TEST( testGetLargestPoolSize );
CPPUNIT_TEST( testGetMaximumPoolSize );
CPPUNIT_TEST( testGetPoolSize );
- //CPPUNIT_TEST( testGetTaskCount );
+ CPPUNIT_TEST( testGetTaskCount );
CPPUNIT_TEST( testIsShutdown );
CPPUNIT_TEST( testIsTerminated );
CPPUNIT_TEST( testIsTerminating );