You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@harmony.apache.org by hi...@apache.org on 2009/07/16 16:01:17 UTC
svn commit: r794678 [11/19] - in
/harmony/enhanced/classlib/trunk/modules/concurrent/src:
main/java/java/util/concurrent/ main/java/java/util/concurrent/atomic/
main/java/java/util/concurrent/locks/ test/java/
Modified: harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ThreadPoolExecutor.java?rev=794678&r1=794677&r2=794678&view=diff
==============================================================================
--- harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ThreadPoolExecutor.java (original)
+++ harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ThreadPoolExecutor.java Thu Jul 16 14:01:15 2009
@@ -6,6 +6,7 @@
package java.util.concurrent;
import java.util.concurrent.locks.*;
+import java.util.concurrent.atomic.*;
import java.util.*;
/**
@@ -18,7 +19,7 @@
* asynchronous tasks, due to reduced per-task invocation overhead,
* and they provide a means of bounding and managing the resources,
* including threads, consumed when executing a collection of tasks.
- * Each <tt>ThreadPoolExecutor</tt> also maintains some basic
+ * Each {@code ThreadPoolExecutor} also maintains some basic
* statistics, such as the number of completed tasks.
*
* <p>To be useful across a wide range of contexts, this class
@@ -37,61 +38,63 @@
*
* <dt>Core and maximum pool sizes</dt>
*
- * <dd>A <tt>ThreadPoolExecutor</tt> will automatically adjust the
- * pool size
- * (see {@link ThreadPoolExecutor#getPoolSize})
- * according to the bounds set by corePoolSize
- * (see {@link ThreadPoolExecutor#getCorePoolSize})
- * and
- * maximumPoolSize
- * (see {@link ThreadPoolExecutor#getMaximumPoolSize}).
- * When a new task is submitted in method {@link
- * ThreadPoolExecutor#execute}, and fewer than corePoolSize threads
- * are running, a new thread is created to handle the request, even if
- * other worker threads are idle. If there are more than
- * corePoolSize but less than maximumPoolSize threads running, a new
- * thread will be created only if the queue is full. By setting
- * corePoolSize and maximumPoolSize the same, you create a fixed-size
- * thread pool. By setting maximumPoolSize to an essentially unbounded
- * value such as <tt>Integer.MAX_VALUE</tt>, you allow the pool to
- * accommodate an arbitrary number of concurrent tasks. Most typically,
- * core and maximum pool sizes are set only upon construction, but they
- * may also be changed dynamically using {@link
- * ThreadPoolExecutor#setCorePoolSize} and {@link
- * ThreadPoolExecutor#setMaximumPoolSize}. <dd>
+ * <dd>A {@code ThreadPoolExecutor} will automatically adjust the
+ * pool size (see {@link #getPoolSize})
+ * according to the bounds set by
+ * corePoolSize (see {@link #getCorePoolSize}) and
+ * maximumPoolSize (see {@link #getMaximumPoolSize}).
+ *
+ * When a new task is submitted in method {@link #execute}, and fewer
+ * than corePoolSize threads are running, a new thread is created to
+ * handle the request, even if other worker threads are idle. If
+ * there are more than corePoolSize but less than maximumPoolSize
+ * threads running, a new thread will be created only if the queue is
+ * full. By setting corePoolSize and maximumPoolSize the same, you
+ * create a fixed-size thread pool. By setting maximumPoolSize to an
+ * essentially unbounded value such as {@code Integer.MAX_VALUE}, you
+ * allow the pool to accommodate an arbitrary number of concurrent
+ * tasks. Most typically, core and maximum pool sizes are set only
+ * upon construction, but they may also be changed dynamically using
+ * {@link #setCorePoolSize} and {@link #setMaximumPoolSize}. </dd>
*
- * <dt> On-demand construction
+ * <dt>On-demand construction</dt>
*
* <dd> By default, even core threads are initially created and
- * started only when needed by new tasks, but this can be overridden
- * dynamically using method {@link
- * ThreadPoolExecutor#prestartCoreThread} or
- * {@link ThreadPoolExecutor#prestartAllCoreThreads}. </dd>
+ * started only when new tasks arrive, but this can be overridden
+ * dynamically using method {@link #prestartCoreThread} or {@link
+ * #prestartAllCoreThreads}. You probably want to prestart threads if
+ * you construct the pool with a non-empty queue. </dd>
*
* <dt>Creating new threads</dt>
*
- * <dd>New threads are created using a {@link
- * java.util.concurrent.ThreadFactory}. If not otherwise specified, a
- * {@link Executors#defaultThreadFactory} is used, that creates threads to all
- * be in the same {@link ThreadGroup} and with the same
- * <tt>NORM_PRIORITY</tt> priority and non-daemon status. By supplying
- * a different ThreadFactory, you can alter the thread's name, thread
- * group, priority, daemon status, etc. </dd>
+ * <dd>New threads are created using a {@link ThreadFactory}. If not
+ * otherwise specified, a {@link Executors#defaultThreadFactory} is
+ * used, that creates threads to all be in the same {@link
+ * ThreadGroup} and with the same {@code NORM_PRIORITY} priority and
+ * non-daemon status. By supplying a different ThreadFactory, you can
+ * alter the thread's name, thread group, priority, daemon status,
+ * etc. If a {@code ThreadFactory} fails to create a thread when asked
+ * by returning null from {@code newThread}, the executor will
+ * continue, but might not be able to execute any tasks. Threads
+ * should possess the "modifyThread" {@code RuntimePermission}. If
+ * worker threads or other threads using the pool do not possess this
+ * permission, service may be degraded: configuration changes may not
+ * take effect in a timely manner, and a shutdown pool may remain in a
+ * state in which termination is possible but not completed.</dd>
*
* <dt>Keep-alive times</dt>
*
* <dd>If the pool currently has more than corePoolSize threads,
* excess threads will be terminated if they have been idle for more
- * than the keepAliveTime (see {@link
- * ThreadPoolExecutor#getKeepAliveTime}). This provides a means of
- * reducing resource consumption when the pool is not being actively
- * used. If the pool becomes more active later, new threads will be
- * constructed. This parameter can also be changed dynamically
- * using method {@link ThreadPoolExecutor#setKeepAliveTime}. Using
- * a value of <tt>Long.MAX_VALUE</tt> {@link TimeUnit#NANOSECONDS}
- * effectively disables idle threads from ever terminating prior
- * to shut down.
- * </dd>
+ * than the keepAliveTime (see {@link #getKeepAliveTime}). This
+ * provides a means of reducing resource consumption when the pool is
+ * not being actively used. If the pool becomes more active later, new
+ * threads will be constructed. This parameter can also be changed
+ * dynamically using method {@link #setKeepAliveTime}. Using a value
+ * of {@code Long.MAX_VALUE} {@link TimeUnit#NANOSECONDS} effectively
+ * disables idle threads from ever terminating prior to shut down. The
+ * keep-alive policy applies only when there are more than
+ * corePoolSizeThreads.</dd>
*
* <dt>Queuing</dt>
*
@@ -107,7 +110,7 @@
* <li> If corePoolSize or more threads are running, the Executor
* always prefers queuing a request rather than adding a new
* thread.</li>
- *
+ *
* <li> If a request cannot be queued, a new thread is created unless
* this would exceed maximumPoolSize, in which case, the task will be
* rejected.</li>
@@ -130,7 +133,7 @@
*
* <li><em> Unbounded queues.</em> Using an unbounded queue (for
* example a {@link LinkedBlockingQueue} without a predefined
- * capacity) will cause new tasks to be queued in cases where all
+ * capacity) will cause new tasks to wait in the queue when all
* corePoolSize threads are busy. Thus, no more than corePoolSize
* threads will ever be created. (And the value of the maximumPoolSize
* therefore doesn't have any effect.) This may be appropriate when
@@ -160,35 +163,33 @@
*
* <dt>Rejected tasks</dt>
*
- * <dd> New tasks submitted in method {@link
- * ThreadPoolExecutor#execute} will be <em>rejected</em> when the
- * Executor has been shut down, and also when the Executor uses finite
- * bounds for both maximum threads and work queue capacity, and is
- * saturated. In either case, the <tt>execute</tt> method invokes the
- * {@link RejectedExecutionHandler#rejectedExecution} method of its
- * {@link RejectedExecutionHandler}. Four predefined handler policies
- * are provided:
+ * <dd> New tasks submitted in method {@link #execute} will be
+ * <em>rejected</em> when the Executor has been shut down, and also
+ * when the Executor uses finite bounds for both maximum threads and
+ * work queue capacity, and is saturated. In either case, the {@code
+ * execute} method invokes the {@link
+ * RejectedExecutionHandler#rejectedExecution} method of its {@link
+ * RejectedExecutionHandler}. Four predefined handler policies are
+ * provided:
*
* <ol>
*
- * <li> In the
- * default {@link ThreadPoolExecutor.AbortPolicy}, the handler throws a
- * runtime {@link RejectedExecutionException} upon rejection. </li>
- *
- * <li> In {@link
- * ThreadPoolExecutor.CallerRunsPolicy}, the thread that invokes
- * <tt>execute</tt> itself runs the task. This provides a simple
- * feedback control mechanism that will slow down the rate that new
- * tasks are submitted. </li>
- *
- * <li> In {@link ThreadPoolExecutor.DiscardPolicy},
- * a task that cannot be executed is simply dropped. </li>
- *
- * <li>In {@link
- * ThreadPoolExecutor.DiscardOldestPolicy}, if the executor is not
- * shut down, the task at the head of the work queue is dropped, and
- * then execution is retried (which can fail again, causing this to be
- * repeated.) </li>
+ * <li> In the default {@link ThreadPoolExecutor.AbortPolicy}, the
+ * handler throws a runtime {@link RejectedExecutionException} upon
+ * rejection. </li>
+ *
+ * <li> In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread
+ * that invokes {@code execute} itself runs the task. This provides a
+ * simple feedback control mechanism that will slow down the rate that
+ * new tasks are submitted. </li>
+ *
+ * <li> In {@link ThreadPoolExecutor.DiscardPolicy}, a task that
+ * cannot be executed is simply dropped. </li>
+ *
+ * <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the
+ * executor is not shut down, the task at the head of the work queue
+ * is dropped, and then execution is retried (which can fail again,
+ * causing this to be repeated.) </li>
*
* </ol>
*
@@ -199,50 +200,62 @@
*
* <dt>Hook methods</dt>
*
- * <dd>This class provides <tt>protected</tt> overridable {@link
- * ThreadPoolExecutor#beforeExecute} and {@link
- * ThreadPoolExecutor#afterExecute} methods that are called before and
- * after execution of each task. These can be used to manipulate the
- * execution environment, for example, reinitializing ThreadLocals,
- * gathering statistics, or adding log entries. Additionally, method
- * {@link ThreadPoolExecutor#terminated} can be overridden to perform
- * any special processing that needs to be done once the Executor has
- * fully terminated.</dd>
+ * <dd>This class provides {@code protected} overridable {@link
+ * #beforeExecute} and {@link #afterExecute} methods that are called
+ * before and after execution of each task. These can be used to
+ * manipulate the execution environment; for example, reinitializing
+ * ThreadLocals, gathering statistics, or adding log
+ * entries. Additionally, method {@link #terminated} can be overridden
+ * to perform any special processing that needs to be done once the
+ * Executor has fully terminated.
+ *
+ * <p>If hook or callback methods throw exceptions, internal worker
+ * threads may in turn fail and abruptly terminate.</dd>
*
* <dt>Queue maintenance</dt>
*
- * <dd> Method {@link ThreadPoolExecutor#getQueue} allows access to
- * the work queue for purposes of monitoring and debugging. Use of
- * this method for any other purpose is strongly discouraged. Two
- * supplied methods, {@link ThreadPoolExecutor#remove} and {@link
- * ThreadPoolExecutor#purge} are available to assist in storage
- * reclamation when large numbers of queued tasks become
- * cancelled.</dd> </dl>
+ * <dd> Method {@link #getQueue} allows access to the work queue for
+ * purposes of monitoring and debugging. Use of this method for any
+ * other purpose is strongly discouraged. Two supplied methods,
+ * {@link #remove} and {@link #purge} are available to assist in
+ * storage reclamation when large numbers of queued tasks become
+ * cancelled.</dd>
+ *
+ * <dt>Finalization</dt>
+ *
+ * <dd> A pool that is no longer referenced in a program <em>AND</em>
+ * has no remaining threads will be {@code shutdown} automatically. If
+ * you would like to ensure that unreferenced pools are reclaimed even
+ * if users forget to call {@link #shutdown}, then you must arrange
+ * that unused threads eventually die, by setting appropriate
+ * keep-alive times using a lower bound of zero core threads. </dd>
+ *
+ * </dl>
*
* <p> <b>Extension example</b>. Most extensions of this class
* override one or more of the protected hook methods. For example,
* here is a subclass that adds a simple pause/resume feature:
*
- * <pre>
+ * <pre> {@code
* class PausableThreadPoolExecutor extends ThreadPoolExecutor {
* private boolean isPaused;
* private ReentrantLock pauseLock = new ReentrantLock();
* private Condition unpaused = pauseLock.newCondition();
*
* public PausableThreadPoolExecutor(...) { super(...); }
- *
+ *
* protected void beforeExecute(Thread t, Runnable r) {
* super.beforeExecute(t, r);
* pauseLock.lock();
* try {
* while (isPaused) unpaused.await();
- * } catch(InterruptedException ie) {
+ * } catch (InterruptedException ie) {
* t.interrupt();
* } finally {
* pauseLock.unlock();
* }
* }
- *
+ *
* public void pause() {
* pauseLock.lock();
* try {
@@ -251,7 +264,7 @@
* pauseLock.unlock();
* }
* }
- *
+ *
* public void resume() {
* pauseLock.lock();
* try {
@@ -261,86 +274,196 @@
* pauseLock.unlock();
* }
* }
- * }
- * </pre>
+ * }}</pre>
+ *
* @since 1.5
* @author Doug Lea
*/
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
- * Only used to force toArray() to produce a Runnable[].
+ * The main pool control state, ctl, is an atomic integer packing
+ * two conceptual fields
+ * workerCount, indicating the effective number of threads
+ * runState, indicating whether running, shutting down etc
+ *
+ * In order to pack them into one int, we limit workerCount to
+ * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
+ * billion) otherwise representable. If this is ever an issue in
+ * the future, the variable can be changed to be an AtomicLong,
+ * and the shift/mask constants below adjusted. But until the need
+ * arises, this code is a bit faster and simpler using an int.
+ *
+ * The workerCount is the number of workers that have been
+ * permitted to start and not permitted to stop. The value may be
+ * transiently different from the actual number of live threads,
+ * for example when a ThreadFactory fails to create a thread when
+ * asked, and when exiting threads are still performing
+ * bookkeeping before terminating. The user-visible pool size is
+ * reported as the current size of the workers set.
+ *
+ * The runState provides the main lifecyle control, taking on values:
+ *
+ * RUNNING: Accept new tasks and process queued tasks
+ * SHUTDOWN: Don't accept new tasks, but process queued tasks
+ * STOP: Don't accept new tasks, don't process queued tasks,
+ * and interrupt in-progress tasks
+ * TIDYING: All tasks have terminated, workerCount is zero,
+ * the thread transitioning to state TIDYING
+ * will run the terminated() hook method
+ * TERMINATED: terminated() has completed
+ *
+ * The numerical order among these values matters, to allow
+ * ordered comparisons. The runState monotonically increases over
+ * time, but need not hit each state. The transitions are:
+ *
+ * RUNNING -> SHUTDOWN
+ * On invocation of shutdown(), perhaps implicitly in finalize()
+ * (RUNNING or SHUTDOWN) -> STOP
+ * On invocation of shutdownNow()
+ * SHUTDOWN -> TIDYING
+ * When both queue and pool are empty
+ * STOP -> TIDYING
+ * When pool is empty
+ * TIDYING -> TERMINATED
+ * When the terminated() hook method has completed
+ *
+ * Threads waiting in awaitTermination() will return when the
+ * state reaches TERMINATED.
+ *
+ * Detecting the transition from SHUTDOWN to TIDYING is less
+ * straightforward than you'd like because the queue may become
+ * empty after non-empty and vice versa during SHUTDOWN state, but
+ * we can only terminate if, after seeing that it is empty, we see
+ * that workerCount is 0 (which sometimes entails a recheck -- see
+ * below).
*/
- private static final Runnable[] EMPTY_RUNNABLE_ARRAY = new Runnable[0];
+ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
+ private static final int COUNT_BITS = Integer.SIZE - 3;
+ private static final int CAPACITY = (1 << COUNT_BITS) - 1;
- /**
- * Permission for checking shutdown
+ // runState is stored in the high-order bits
+ private static final int RUNNING = -1 << COUNT_BITS;
+ private static final int SHUTDOWN = 0 << COUNT_BITS;
+ private static final int STOP = 1 << COUNT_BITS;
+ private static final int TIDYING = 2 << COUNT_BITS;
+ private static final int TERMINATED = 3 << COUNT_BITS;
+
+ // Packing and unpacking ctl
+ private static int runStateOf(int c) { return c & ~CAPACITY; }
+ private static int workerCountOf(int c) { return c & CAPACITY; }
+ private static int ctlOf(int rs, int wc) { return rs | wc; }
+
+ /*
+ * Bit field accessors that don't require unpacking ctl.
+ * These depend on the bit layout and on workerCount being never negative.
*/
- private static final RuntimePermission shutdownPerm =
- new RuntimePermission("modifyThread");
+
+ private static boolean runStateLessThan(int c, int s) {
+ return c < s;
+ }
+
+ private static boolean runStateAtLeast(int c, int s) {
+ return c >= s;
+ }
+
+ private static boolean isRunning(int c) {
+ return c < SHUTDOWN;
+ }
/**
- * Queue used for holding tasks and handing off to worker threads.
+ * Attempt to CAS-increment the workerCount field of ctl.
*/
- private final BlockingQueue<Runnable> workQueue;
+ private boolean compareAndIncrementWorkerCount(int expect) {
+ return ctl.compareAndSet(expect, expect + 1);
+ }
/**
- * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and
- * workers set.
+ * Attempt to CAS-decrement the workerCount field of ctl.
*/
- private final ReentrantLock mainLock = new ReentrantLock();
+ private boolean compareAndDecrementWorkerCount(int expect) {
+ return ctl.compareAndSet(expect, expect - 1);
+ }
/**
- * Wait condition to support awaitTermination
+ * Decrements the workerCount field of ctl. This is called only on
+ * abrupt termination of a thread (see processWorkerExit). Other
+ * decrements are performed within getTask.
*/
- private final Condition termination = mainLock.newCondition();
+ private void decrementWorkerCount() {
+ do {} while (! compareAndDecrementWorkerCount(ctl.get()));
+ }
/**
- * Set containing all worker threads in pool.
+ * The queue used for holding tasks and handing off to worker
+ * threads. We do not require that workQueue.poll() returning
+ * null necessarily means that workQueue.isEmpty(), so rely
+ * solely on isEmpty to see if the queue is empty (which we must
+ * do for example when deciding whether to transition from
+ * SHUTDOWN to TIDYING). This accommodates special-purpose
+ * queues such as DelayQueues for which poll() is allowed to
+ * return null even if it may later return non-null when delays
+ * expire.
*/
- private final HashSet<Worker> workers = new HashSet<Worker>();
+ private final BlockingQueue<Runnable> workQueue;
/**
- * Timeout in nanoseconds for idle threads waiting for work.
- * Threads use this timeout only when there are more than
- * corePoolSize present. Otherwise they wait forever for new work.
+ * Lock held on access to workers set and related bookkeeping.
+ * While we could use a concurrent set of some sort, it turns out
+ * to be generally preferable to use a lock. Among the reasons is
+ * that this serializes interruptIdleWorkers, which avoids
+ * unnecessary interrupt storms, especially during shutdown.
+ * Otherwise exiting threads would concurrently interrupt those
+ * that have not yet interrupted. It also simplifies some of the
+ * associated statistics bookkeeping of largestPoolSize etc. We
+ * also hold mainLock on shutdown and shutdownNow, for the sake of
+ * ensuring workers set is stable while separately checking
+ * permission to interrupt and actually interrupting.
*/
- private volatile long keepAliveTime;
+ private final ReentrantLock mainLock = new ReentrantLock();
/**
- * Core pool size, updated only while holding mainLock,
- * but volatile to allow concurrent readability even
- * during updates.
+ * Set containing all worker threads in pool. Accessed only when
+ * holding mainLock.
*/
- private volatile int corePoolSize;
+ private final HashSet<Worker> workers = new HashSet<Worker>();
/**
- * Maximum pool size, updated only while holding mainLock
- * but volatile to allow concurrent readability even
- * during updates.
+ * Wait condition to support awaitTermination
*/
- private volatile int maximumPoolSize;
+ private final Condition termination = mainLock.newCondition();
/**
- * Current pool size, updated only while holding mainLock
- * but volatile to allow concurrent readability even
- * during updates.
+ * Tracks largest attained pool size. Accessed only under
+ * mainLock.
*/
- private volatile int poolSize;
+ private int largestPoolSize;
/**
- * Lifecycle state
+ * Counter for completed tasks. Updated only on termination of
+ * worker threads. Accessed only under mainLock.
*/
- volatile int runState;
+ private long completedTaskCount;
- // Special values for runState
- /** Normal, not-shutdown mode */
- static final int RUNNING = 0;
- /** Controlled shutdown mode */
- static final int SHUTDOWN = 1;
- /** Immediate shutdown mode */
- static final int STOP = 2;
- /** Final state */
- static final int TERMINATED = 3;
+ /*
+ * All user control parameters are declared as volatiles so that
+ * ongoing actions are based on freshest values, but without need
+ * for locking, since no internal invariants depend on them
+ * changing synchronously with respect to other actions.
+ */
+
+ /**
+ * Factory for new threads. All threads are created using this
+ * factory (via method addWorker). All callers must be prepared
+ * for addWorker to fail, which may reflect a system or user's
+ * policy limiting the number of threads. Even though it is not
+ * treated as an error, failure to create threads may result in
+ * new tasks being rejected or existing ones remaining stuck in
+ * the queue. On the other hand, no special precautions exist to
+ * handle OutOfMemoryErrors that might be thrown while trying to
+ * create threads, since there is generally no recourse from
+ * within this class.
+ */
+ private volatile ThreadFactory threadFactory;
/**
* Handler called when saturated or shutdown in execute.
@@ -348,21 +471,24 @@
private volatile RejectedExecutionHandler handler;
/**
- * Factory for new threads.
+ * Timeout in nanoseconds for idle threads waiting for work.
+ * Threads use this timeout when there are more than corePoolSize
+ * present. Otherwise they wait forever for new work.
*/
- private volatile ThreadFactory threadFactory;
+ private volatile long keepAliveTime;
/**
- * Tracks largest attained pool size.
+ * Core pool size is the minimum number of workers to keep alive
+ * (and not allow to time out etc).
*/
- private int largestPoolSize;
+ private volatile int corePoolSize;
/**
- * Counter for completed tasks. Updated only on termination of
- * worker threads.
+ * Maximum pool size. Note that the actual maximum is internally
+ * bounded by CAPACITY.
*/
- private long completedTaskCount;
-
+ private volatile int maximumPoolSize;
+
/**
* The default rejected execution handler
*/
@@ -370,336 +496,622 @@
new AbortPolicy();
/**
- * Invoke the rejected execution handler for the given command.
+ * Permission required for callers of shutdown and shutdownNow.
+ * We additionally require (see checkShutdownAccess) that callers
+ * have permission to actually interrupt threads in the worker set
+ * (as governed by Thread.interrupt, which relies on
+ * ThreadGroup.checkAccess, which in turn relies on
+ * SecurityManager.checkAccess). Shutdowns are attempted only if
+ * these checks pass.
+ *
+ * All actual invocations of Thread.interrupt (see
+ * interruptIdleWorkers and interruptWorkers) ignore
+ * SecurityExceptions, meaning that the attempted interrupts
+ * silently fail. In the case of shutdown, they should not fail
+ * unless the SecurityManager has inconsistent policies, sometimes
+ * allowing access to a thread and sometimes not. In such cases,
+ * failure to actually interrupt threads may disable or delay full
+ * termination. Other uses of interruptIdleWorkers are advisory,
+ * and failure to actually interrupt will merely delay response to
+ * configuration changes so is not handled exceptionally.
*/
- void reject(Runnable command) {
- handler.rejectedExecution(command, this);
+ private static final RuntimePermission shutdownPerm =
+ new RuntimePermission("modifyThread");
+
+ /**
+ * Class Worker mainly maintains interrupt control state for
+ * threads running tasks, along with other minor bookkeeping.
+ * This class opportunistically extends AbstractQueuedSynchronizer
+ * to simplify acquiring and releasing a lock surrounding each
+ * task execution. This protects against interrupts that are
+ * intended to wake up a worker thread waiting for a task from
+ * instead interrupting a task being run. We implement a simple
+ * non-reentrant mutual exclusion lock rather than use ReentrantLock
+ * because we do not want worker tasks to be able to reacquire the
+ * lock when they invoke pool control methods like setCorePoolSize.
+ */
+ private final class Worker
+ extends AbstractQueuedSynchronizer
+ implements Runnable
+ {
+ /**
+ * This class will never be serialized, but we provide a
+ * serialVersionUID to suppress a javac warning.
+ */
+ private static final long serialVersionUID = 6138294804551838833L;
+
+ /** Thread this worker is running in. Null if factory fails. */
+ final Thread thread;
+ /** Initial task to run. Possibly null. */
+ Runnable firstTask;
+ /** Per-thread task counter */
+ volatile long completedTasks;
+
+ /**
+ * Creates with given first task and thread from ThreadFactory.
+ * @param firstTask the first task (null if none)
+ */
+ Worker(Runnable firstTask) {
+ this.firstTask = firstTask;
+ this.thread = getThreadFactory().newThread(this);
+ }
+
+ /** Delegates main run loop to outer runWorker */
+ public void run() {
+ runWorker(this);
+ }
+
+ // Lock methods
+ //
+ // The value 0 represents the unlocked state.
+ // The value 1 represents the locked state.
+
+ protected boolean isHeldExclusively() {
+ return getState() == 1;
+ }
+
+ protected boolean tryAcquire(int unused) {
+ if (compareAndSetState(0, 1)) {
+ setExclusiveOwnerThread(Thread.currentThread());
+ return true;
+ }
+ return false;
+ }
+
+ protected boolean tryRelease(int unused) {
+ setExclusiveOwnerThread(null);
+ setState(0);
+ return true;
+ }
+
+ public void lock() { acquire(1); }
+ public boolean tryLock() { return tryAcquire(1); }
+ public void unlock() { release(1); }
+ public boolean isLocked() { return isHeldExclusively(); }
}
+ /*
+ * Methods for setting control state
+ */
+
/**
- * Create and return a new thread running firstTask as its first
- * task. Call only while holding mainLock
- * @param firstTask the task the new thread should run first (or
- * null if none)
- * @return the new thread
+ * Transitions runState to given target, or leaves it alone if
+ * already at least the given target.
+ *
+ * @param targetState the desired state, either SHUTDOWN or STOP
+ * (but not TIDYING or TERMINATED -- use tryTerminate for that)
*/
- private Thread addThread(Runnable firstTask) {
- Worker w = new Worker(firstTask);
- Thread t = threadFactory.newThread(w);
- w.thread = t;
- workers.add(w);
- int nt = ++poolSize;
- if (nt > largestPoolSize)
- largestPoolSize = nt;
- return t;
+ private void advanceRunState(int targetState) {
+ for (;;) {
+ int c = ctl.get();
+ if (runStateAtLeast(c, targetState) ||
+ ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
+ break;
+ }
}
/**
- * Create and start a new thread running firstTask as its first
- * task, only if fewer than corePoolSize threads are running.
- * @param firstTask the task the new thread should run first (or
- * null if none)
- * @return true if successful.
+ * Transitions to TERMINATED state if either (SHUTDOWN and pool
+ * and queue empty) or (STOP and pool empty). If otherwise
+ * eligible to terminate but workerCount is nonzero, interrupts an
+ * idle worker to ensure that shutdown signals propagate. This
+ * method must be called following any action that might make
+ * termination possible -- reducing worker count or removing tasks
+ * from the queue during shutdown. The method is non-private to
+ * allow access from ScheduledThreadPoolExecutor.
+ */
+ final void tryTerminate() {
+ for (;;) {
+ int c = ctl.get();
+ if (isRunning(c) ||
+ runStateAtLeast(c, TIDYING) ||
+ (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
+ return;
+ if (workerCountOf(c) != 0) { // Eligible to terminate
+ interruptIdleWorkers(ONLY_ONE);
+ return;
+ }
+
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
+ try {
+ terminated();
+ } finally {
+ ctl.set(ctlOf(TERMINATED, 0));
+ termination.signalAll();
+ }
+ return;
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ // else retry on failed CAS
+ }
+ }
+
+ /*
+ * Methods for controlling interrupts to worker threads.
+ */
+
+ /**
+ * If there is a security manager, makes sure caller has
+ * permission to shut down threads in general (see shutdownPerm).
+ * If this passes, additionally makes sure the caller is allowed
+ * to interrupt each worker thread. This might not be true even if
+ * first check passed, if the SecurityManager treats some threads
+ * specially.
+ */
+ private void checkShutdownAccess() {
+ SecurityManager security = System.getSecurityManager();
+ if (security != null) {
+ security.checkPermission(shutdownPerm);
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ for (Worker w : workers)
+ security.checkAccess(w.thread);
+ } finally {
+ mainLock.unlock();
+ }
+ }
+ }
+
+ /**
+ * Interrupts all threads, even if active. Ignores SecurityExceptions
+ * (in which case some threads may remain uninterrupted).
*/
- private boolean addIfUnderCorePoolSize(Runnable firstTask) {
- Thread t = null;
+ private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
- if (poolSize < corePoolSize)
- t = addThread(firstTask);
+ for (Worker w : workers) {
+ try {
+ w.thread.interrupt();
+ } catch (SecurityException ignore) {
+ }
+ }
} finally {
mainLock.unlock();
}
- if (t == null)
- return false;
- t.start();
- return true;
}
/**
- * Create and start a new thread only if fewer than maximumPoolSize
- * threads are running. The new thread runs as its first task the
- * next task in queue, or if there is none, the given task.
- * @param firstTask the task the new thread should run first (or
- * null if none)
- * @return null on failure, else the first task to be run by new thread.
+ * Interrupts threads that might be waiting for tasks (as
+ * indicated by not being locked) so they can check for
+ * termination or configuration changes. Ignores
+ * SecurityExceptions (in which case some threads may remain
+ * uninterrupted).
+ *
+ * @param onlyOne If true, interrupt at most one worker. This is
+ * called only from tryTerminate when termination is otherwise
+ * enabled but there are still other workers. In this case, at
+ * most one waiting worker is interrupted to propagate shutdown
+ * signals in case all threads are currently waiting.
+ * Interrupting any arbitrary thread ensures that newly arriving
+ * workers since shutdown began will also eventually exit.
+ * To guarantee eventual termination, it suffices to always
+ * interrupt only one idle worker, but shutdown() interrupts all
+ * idle workers so that redundant workers exit promptly, not
+ * waiting for a straggler task to finish.
*/
- private Runnable addIfUnderMaximumPoolSize(Runnable firstTask) {
- Thread t = null;
- Runnable next = null;
+ private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
- if (poolSize < maximumPoolSize) {
- next = workQueue.poll();
- if (next == null)
- next = firstTask;
- t = addThread(next);
+ for (Worker w : workers) {
+ Thread t = w.thread;
+ if (!t.isInterrupted() && w.tryLock()) {
+ try {
+ t.interrupt();
+ } catch (SecurityException ignore) {
+ } finally {
+ w.unlock();
+ }
+ }
+ if (onlyOne)
+ break;
}
} finally {
mainLock.unlock();
}
- if (t == null)
- return null;
- t.start();
- return next;
}
+ /**
+ * Common form of interruptIdleWorkers, to avoid having to
+ * remember what the boolean argument means.
+ */
+ private void interruptIdleWorkers() {
+ interruptIdleWorkers(false);
+ }
+
+ private static final boolean ONLY_ONE = true;
/**
- * Get the next task for a worker thread to run.
- * @return the task
- * @throws InterruptedException if interrupted while waiting for task
+ * Ensures that unless the pool is stopping, the current thread
+ * does not have its interrupt set. This requires a double-check
+ * of state in case the interrupt was cleared concurrently with a
+ * shutdownNow -- if so, the interrupt is re-enabled.
*/
- Runnable getTask() throws InterruptedException {
- for (;;) {
- switch(runState) {
- case RUNNING: {
- if (poolSize <= corePoolSize) // untimed wait if core
- return workQueue.take();
-
- long timeout = keepAliveTime;
- if (timeout <= 0) // die immediately for 0 timeout
- return null;
- Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS);
- if (r != null)
- return r;
- if (poolSize > corePoolSize) // timed out
- return null;
- // else, after timeout, pool shrank so shouldn't die, so retry
- break;
- }
+ private void clearInterruptsForTaskRun() {
+ if (runStateLessThan(ctl.get(), STOP) &&
+ Thread.interrupted() &&
+ runStateAtLeast(ctl.get(), STOP))
+ Thread.currentThread().interrupt();
+ }
- case SHUTDOWN: {
- // Help drain queue
- Runnable r = workQueue.poll();
- if (r != null)
- return r;
-
- // Check if can terminate
- if (workQueue.isEmpty()) {
- interruptIdleWorkers();
- return null;
- }
+ /*
+ * Misc utilities, most of which are also exported to
+ * ScheduledThreadPoolExecutor
+ */
- // There could still be delayed tasks in queue.
- // Wait for one, re-checking state upon interruption
- try {
- return workQueue.take();
- } catch(InterruptedException ignore) {}
- break;
- }
+ /**
+ * Invokes the rejected execution handler for the given command.
+ * Package-protected for use by ScheduledThreadPoolExecutor.
+ */
+ final void reject(Runnable command) {
+ handler.rejectedExecution(command, this);
+ }
- case STOP:
- return null;
- default:
- assert false;
+ /**
+ * Performs any further cleanup following run state transition on
+ * invocation of shutdown. A no-op here, but used by
+ * ScheduledThreadPoolExecutor to cancel delayed tasks.
+ */
+ void onShutdown() {
+ }
+
+ /**
+ * State check needed by ScheduledThreadPoolExecutor to
+ * enable running tasks during shutdown.
+ *
+ * @param shutdownOK true if should return true if SHUTDOWN
+ */
+ final boolean isRunningOrShutdown(boolean shutdownOK) {
+ int rs = runStateOf(ctl.get());
+ return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
+ }
+
+ /**
+ * Drains the task queue into a new list, normally using
+ * drainTo. But if the queue is a DelayQueue or any other kind of
+ * queue for which poll or drainTo may fail to remove some
+ * elements, it deletes them one by one.
+ */
+ private List<Runnable> drainQueue() {
+ BlockingQueue<Runnable> q = workQueue;
+ List<Runnable> taskList = new ArrayList<Runnable>();
+ q.drainTo(taskList);
+ if (!q.isEmpty()) {
+ for (Runnable r : q.toArray(new Runnable[0])) {
+ if (q.remove(r))
+ taskList.add(r);
}
}
+ return taskList;
}
+ /*
+ * Methods for creating, running and cleaning up after workers
+ */
+
/**
- * Wake up all threads that might be waiting for tasks.
+ * Checks if a new worker can be added with respect to current
+ * pool state and the given bound (either core or maximum). If so,
+ * the worker count is adjusted accordingly, and, if possible, a
+ * new worker is created and started running firstTask as its
+ * first task. This method returns false if the pool is stopped or
+ * eligible to shut down. It also returns false if the thread
+ * factory fails to create a thread when asked, which requires a
+ * backout of workerCount, and a recheck for termination, in case
+ * the existence of this worker was holding up termination.
+ *
+ * @param firstTask the task the new thread should run first (or
+ * null if none). Workers are created with an initial first task
+ * (in method execute()) to bypass queuing when there are fewer
+ * than corePoolSize threads (in which case we always start one),
+ * or when the queue is full (in which case we must bypass queue).
+ * Initially idle threads are usually created via
+ * prestartCoreThread or to replace other dying workers.
+ *
+ * @param core if true use corePoolSize as bound, else
+ * maximumPoolSize. (A boolean indicator is used here rather than a
+ * value to ensure reads of fresh values after checking other pool
+ * state).
+ * @return true if successful
*/
- void interruptIdleWorkers() {
+ private boolean addWorker(Runnable firstTask, boolean core) {
+ retry:
+ for (;;) {
+ int c = ctl.get();
+ int rs = runStateOf(c);
+
+ // Check if queue empty only if necessary.
+ if (rs >= SHUTDOWN &&
+ ! (rs == SHUTDOWN &&
+ firstTask == null &&
+ ! workQueue.isEmpty()))
+ return false;
+
+ for (;;) {
+ int wc = workerCountOf(c);
+ if (wc >= CAPACITY ||
+ wc >= (core ? corePoolSize : maximumPoolSize))
+ return false;
+ if (compareAndIncrementWorkerCount(c))
+ break retry;
+ c = ctl.get(); // Re-read ctl
+ if (runStateOf(c) != rs)
+ continue retry;
+ // else CAS failed due to workerCount change; retry inner loop
+ }
+ }
+
+ Worker w = new Worker(firstTask);
+ Thread t = w.thread;
+
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
- for (Worker w : workers)
- w.interruptIfIdle();
+ // Recheck while holding lock.
+ // Back out on ThreadFactory failure or if
+ // shut down before lock acquired.
+ int c = ctl.get();
+ int rs = runStateOf(c);
+
+ if (t == null ||
+ (rs >= SHUTDOWN &&
+ ! (rs == SHUTDOWN &&
+ firstTask == null))) {
+ decrementWorkerCount();
+ tryTerminate();
+ return false;
+ }
+
+ workers.add(w);
+
+ int s = workers.size();
+ if (s > largestPoolSize)
+ largestPoolSize = s;
} finally {
mainLock.unlock();
}
+
+ t.start();
+ // It is possible (but unlikely) for a thread to have been
+ // added to workers, but not yet started, during transition to
+ // STOP, which could result in a rare missed interrupt,
+ // because Thread.interrupt is not guaranteed to have any effect
+ // on a non-yet-started Thread (see Thread#interrupt).
+ if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
+ t.interrupt();
+
+ return true;
}
/**
- * Perform bookkeeping for a terminated worker thread.
+ * Performs cleanup and bookkeeping for a dying worker. Called
+ * only from worker threads. Unless completedAbruptly is set,
+ * assumes that workerCount has already been adjusted to account
+ * for exit. This method removes thread from worker set, and
+ * possibly terminates the pool or replaces the worker if either
+ * it exited due to user task exception or if fewer than
+ * corePoolSize workers are running or queue is non-empty but
+ * there are no workers.
+ *
* @param w the worker
+ * @param completedAbruptly if the worker died due to user exception
*/
- void workerDone(Worker w) {
+ private void processWorkerExit(Worker w, boolean completedAbruptly) {
+ if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
+ decrementWorkerCount();
+
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
- if (--poolSize > 0)
- return;
-
- // Else, this is the last thread. Deal with potential shutdown.
-
- int state = runState;
- assert state != TERMINATED;
-
- if (state != STOP) {
- // If there are queued tasks but no threads, create
- // replacement.
- Runnable r = workQueue.poll();
- if (r != null) {
- addThread(r).start();
- return;
- }
-
- // If there are some (presumably delayed) tasks but
- // none pollable, create an idle replacement to wait.
- if (!workQueue.isEmpty()) {
- addThread(null).start();
- return;
- }
-
- // Otherwise, we can exit without replacement
- if (state == RUNNING)
- return;
- }
-
- // Either state is STOP, or state is SHUTDOWN and there is
- // no work to do. So we can terminate.
- termination.signalAll();
- runState = TERMINATED;
- // fall through to call terminate() outside of lock.
} finally {
mainLock.unlock();
}
- assert runState == TERMINATED;
- terminated();
+ tryTerminate();
+
+ int c = ctl.get();
+ if (runStateLessThan(c, STOP)) {
+ if (!completedAbruptly) {
+ int min = corePoolSize;
+ if (min == 0 && ! workQueue.isEmpty())
+ min = 1;
+ if (workerCountOf(c) >= min)
+ return; // replacement not needed
+ }
+ addWorker(null, false);
+ }
}
/**
- * Worker threads
+ * Performs blocking or timed wait for a task, depending on
+ * current configuration settings, or returns null if this worker
+ * must exit because of any of:
+ * 1. There are more than maximumPoolSize workers (due to
+ * a call to setMaximumPoolSize).
+ * 2. The pool is stopped.
+ * 3. The pool is shutdown and the queue is empty.
+ * 4. This worker timed out waiting for a task, and timed-out
+ * workers are subject to termination (that is,
+ * {@code workerCount > corePoolSize})
+ * both before and after the timed wait.
+ *
+ * @return task, or null if the worker must exit, in which case
+ * workerCount is decremented
*/
- private class Worker implements Runnable {
-
- /**
- * The runLock is acquired and released surrounding each task
- * execution. It mainly protects against interrupts that are
- * intended to cancel the worker thread from instead
- * interrupting the task being run.
- */
- private final ReentrantLock runLock = new ReentrantLock();
-
- /**
- * Initial task to run before entering run loop
- */
- private Runnable firstTask;
+ private Runnable getTask() {
+ boolean timedOut = false; // Did the last poll() time out?
- /**
- * Per thread completed task counter; accumulated
- * into completedTaskCount upon termination.
- */
- volatile long completedTasks;
+ retry:
+ for (;;) {
+ int c = ctl.get();
+ int rs = runStateOf(c);
- /**
- * Thread this worker is running in. Acts as a final field,
- * but cannot be set until thread is created.
- */
- Thread thread;
+ // Check if queue empty only if necessary.
+ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
+ decrementWorkerCount();
+ return null;
+ }
- Worker(Runnable firstTask) {
- this.firstTask = firstTask;
- }
+ boolean timed; // Are workers subject to culling?
- boolean isActive() {
- return runLock.isLocked();
- }
+ for (;;) {
+ int wc = workerCountOf(c);
+ timed = wc > corePoolSize;
- /**
- * Interrupt thread if not running a task
- */
- void interruptIfIdle() {
- final ReentrantLock runLock = this.runLock;
- if (runLock.tryLock()) {
- try {
- thread.interrupt();
- } finally {
- runLock.unlock();
- }
+ if (wc <= maximumPoolSize && ! (timedOut && timed))
+ break;
+ if (compareAndDecrementWorkerCount(c))
+ return null;
+ c = ctl.get(); // Re-read ctl
+ if (runStateOf(c) != rs)
+ continue retry;
+ // else CAS failed due to workerCount change; retry inner loop
}
- }
- /**
- * Cause thread to die even if running a task.
- */
- void interruptNow() {
- thread.interrupt();
- }
-
- /**
- * Run a single task between before/after methods.
- */
- private void runTask(Runnable task) {
- final ReentrantLock runLock = this.runLock;
- runLock.lock();
try {
- // Abort now if immediate cancel. Otherwise, we have
- // committed to run this task.
- if (runState == STOP)
- return;
-
- Thread.interrupted(); // clear interrupt status on entry
- boolean ran = false;
- beforeExecute(thread, task);
- try {
- task.run();
- ran = true;
- afterExecute(task, null);
- ++completedTasks;
- } catch(RuntimeException ex) {
- if (!ran)
- afterExecute(task, ex);
- // Else the exception occurred within
- // afterExecute itself in which case we don't
- // want to call it again.
- throw ex;
- }
- } finally {
- runLock.unlock();
+ Runnable r = timed ?
+ workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
+ workQueue.take();
+ if (r != null)
+ return r;
+ timedOut = true;
+ } catch (InterruptedException retry) {
+ timedOut = false;
}
}
+ }
- /**
- * Main run loop
- */
- public void run() {
- try {
- Runnable task = firstTask;
- firstTask = null;
- while (task != null || (task = getTask()) != null) {
- runTask(task);
- task = null; // unnecessary but can help GC
+ /**
+ * Main worker run loop. Repeatedly gets tasks from queue and
+ * executes them, while coping with a number of issues:
+ *
+ * 1. We may start out with an initial task, in which case we
+ * don't need to get the first one. Otherwise, as long as pool is
+ * running, we get tasks from getTask. If it returns null then the
+ * worker exits due to changed pool state or configuration
+ * parameters. Other exits result from exception throws in
+ * external code, in which case completedAbruptly holds, which
+ * usually leads processWorkerExit to replace this thread.
+ *
+ * 2. Before running any task, the lock is acquired to prevent
+ * other pool interrupts while the task is executing, and
+ * clearInterruptsForTaskRun called to ensure that unless pool is
+ * stopping, this thread does not have its interrupt set.
+ *
+ * 3. Each task run is preceded by a call to beforeExecute, which
+ * might throw an exception, in which case we cause thread to die
+ * (breaking loop with completedAbruptly true) without processing
+ * the task.
+ *
+ * 4. Assuming beforeExecute completes normally, we run the task,
+ * gathering any of its thrown exceptions to send to
+ * afterExecute. We separately handle RuntimeException, Error
+ * (both of which the specs guarantee that we trap) and arbitrary
+ * Throwables. Because we cannot rethrow Throwables within
+ * Runnable.run, we wrap them within Errors on the way out (to the
+ * thread's UncaughtExceptionHandler). Any thrown exception also
+ * conservatively causes thread to die.
+ *
+ * 5. After task.run completes, we call afterExecute, which may
+ * also throw an exception, which will also cause thread to
+ * die. According to JLS Sec 14.20, this exception is the one that
+ * will be in effect even if task.run throws.
+ *
+ * The net effect of the exception mechanics is that afterExecute
+ * and the thread's UncaughtExceptionHandler have as accurate
+ * information as we can provide about any problems encountered by
+ * user code.
+ *
+ * @param w the worker
+ */
+ final void runWorker(Worker w) {
+ Runnable task = w.firstTask;
+ w.firstTask = null;
+ boolean completedAbruptly = true;
+ try {
+ while (task != null || (task = getTask()) != null) {
+ w.lock();
+ clearInterruptsForTaskRun();
+ try {
+ beforeExecute(w.thread, task);
+ Throwable thrown = null;
+ try {
+ task.run();
+ } catch (RuntimeException x) {
+ thrown = x; throw x;
+ } catch (Error x) {
+ thrown = x; throw x;
+ } catch (Throwable x) {
+ thrown = x; throw new Error(x);
+ } finally {
+ afterExecute(task, thrown);
+ }
+ } finally {
+ task = null;
+ w.completedTasks++;
+ w.unlock();
}
- } catch(InterruptedException ie) {
- // fall through
- } finally {
- workerDone(this);
}
+ completedAbruptly = false;
+ } finally {
+ processWorkerExit(w, completedAbruptly);
}
}
- // Public methods
+ // Public constructors and methods
/**
- * Creates a new <tt>ThreadPoolExecutor</tt> with the given
- * initial parameters and default thread factory and handler. It
- * may be more convenient to use one of the {@link Executors}
- * factory methods instead of this general purpose constructor.
+ * Creates a new {@code ThreadPoolExecutor} with the given initial
+ * parameters and default thread factory and rejected execution handler.
+ * It may be more convenient to use one of the {@link Executors} factory
+ * methods instead of this general purpose constructor.
*
- * @param corePoolSize the number of threads to keep in the
- * pool, even if they are idle.
+ * @param corePoolSize the number of threads to keep in the pool, even
+ * if they are idle
* @param maximumPoolSize the maximum number of threads to allow in the
- * pool.
+ * pool
* @param keepAliveTime when the number of threads is greater than
- * the core, this is the maximum time that excess idle threads
- * will wait for new tasks before terminating.
- * @param unit the time unit for the keepAliveTime
- * argument.
- * @param workQueue the queue to use for holding tasks before they
- * are executed. This queue will hold only the <tt>Runnable</tt>
- * tasks submitted by the <tt>execute</tt> method.
- * @throws IllegalArgumentException if corePoolSize, or
- * keepAliveTime less than zero, or if maximumPoolSize less than or
- * equal to zero, or if corePoolSize greater than maximumPoolSize.
- * @throws NullPointerException if <tt>workQueue</tt> is null
+ * the core, this is the maximum time that excess idle threads
+ * will wait for new tasks before terminating.
+ * @param unit the time unit for the {@code keepAliveTime} argument
+ * @param workQueue the queue to use for holding tasks before they are
+ * executed. This queue will hold only the {@code Runnable}
+ * tasks submitted by the {@code execute} method.
+ * @throws IllegalArgumentException if one of the following holds:<br>
+ * {@code corePoolSize < 0}<br>
+ * {@code keepAliveTime < 0}<br>
+ * {@code maximumPoolSize <= 0}<br>
+ * {@code maximumPoolSize < corePoolSize}
+ * @throws NullPointerException if {@code workQueue} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
@@ -711,28 +1123,29 @@
}
/**
- * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
- * parameters.
+ * Creates a new {@code ThreadPoolExecutor} with the given initial
+ * parameters and default rejected execution handler.
*
- * @param corePoolSize the number of threads to keep in the
- * pool, even if they are idle.
+ * @param corePoolSize the number of threads to keep in the pool, even
+ * if they are idle
* @param maximumPoolSize the maximum number of threads to allow in the
- * pool.
+ * pool
* @param keepAliveTime when the number of threads is greater than
- * the core, this is the maximum time that excess idle threads
- * will wait for new tasks before terminating.
- * @param unit the time unit for the keepAliveTime
- * argument.
- * @param workQueue the queue to use for holding tasks before they
- * are executed. This queue will hold only the <tt>Runnable</tt>
- * tasks submitted by the <tt>execute</tt> method.
+ * the core, this is the maximum time that excess idle threads
+ * will wait for new tasks before terminating.
+ * @param unit the time unit for the {@code keepAliveTime} argument
+ * @param workQueue the queue to use for holding tasks before they are
+ * executed. This queue will hold only the {@code Runnable}
+ * tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
- * creates a new thread.
- * @throws IllegalArgumentException if corePoolSize, or
- * keepAliveTime less than zero, or if maximumPoolSize less than or
- * equal to zero, or if corePoolSize greater than maximumPoolSize.
- * @throws NullPointerException if <tt>workQueue</tt>
- * or <tt>threadFactory</tt> are null.
+ * creates a new thread
+ * @throws IllegalArgumentException if one of the following holds:<br>
+ * {@code corePoolSize < 0}<br>
+ * {@code keepAliveTime < 0}<br>
+ * {@code maximumPoolSize <= 0}<br>
+ * {@code maximumPoolSize < corePoolSize}
+ * @throws NullPointerException if {@code workQueue}
+ * or {@code threadFactory} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
@@ -745,28 +1158,29 @@
}
/**
- * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
- * parameters.
+ * Creates a new {@code ThreadPoolExecutor} with the given initial
+ * parameters and default thread factory.
*
- * @param corePoolSize the number of threads to keep in the
- * pool, even if they are idle.
+ * @param corePoolSize the number of threads to keep in the pool, even
+ * if they are idle
* @param maximumPoolSize the maximum number of threads to allow in the
- * pool.
+ * pool
* @param keepAliveTime when the number of threads is greater than
- * the core, this is the maximum time that excess idle threads
- * will wait for new tasks before terminating.
- * @param unit the time unit for the keepAliveTime
- * argument.
- * @param workQueue the queue to use for holding tasks before they
- * are executed. This queue will hold only the <tt>Runnable</tt>
- * tasks submitted by the <tt>execute</tt> method.
+ * the core, this is the maximum time that excess idle threads
+ * will wait for new tasks before terminating.
+ * @param unit the time unit for the {@code keepAliveTime} argument
+ * @param workQueue the queue to use for holding tasks before they are
+ * executed. This queue will hold only the {@code Runnable}
+ * tasks submitted by the {@code execute} method.
* @param handler the handler to use when execution is blocked
- * because the thread bounds and queue capacities are reached.
- * @throws IllegalArgumentException if corePoolSize, or
- * keepAliveTime less than zero, or if maximumPoolSize less than or
- * equal to zero, or if corePoolSize greater than maximumPoolSize.
- * @throws NullPointerException if <tt>workQueue</tt>
- * or <tt>handler</tt> are null.
+ * because the thread bounds and queue capacities are reached
+ * @throws IllegalArgumentException if one of the following holds:<br>
+ * {@code corePoolSize < 0}<br>
+ * {@code keepAliveTime < 0}<br>
+ * {@code maximumPoolSize <= 0}<br>
+ * {@code maximumPoolSize < corePoolSize}
+ * @throws NullPointerException if {@code workQueue}
+ * or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
@@ -779,30 +1193,31 @@
}
/**
- * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
+ * Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
- * @param corePoolSize the number of threads to keep in the
- * pool, even if they are idle.
+ * @param corePoolSize the number of threads to keep in the pool, even
+ * if they are idle
* @param maximumPoolSize the maximum number of threads to allow in the
- * pool.
+ * pool
* @param keepAliveTime when the number of threads is greater than
- * the core, this is the maximum time that excess idle threads
- * will wait for new tasks before terminating.
- * @param unit the time unit for the keepAliveTime
- * argument.
- * @param workQueue the queue to use for holding tasks before they
- * are executed. This queue will hold only the <tt>Runnable</tt>
- * tasks submitted by the <tt>execute</tt> method.
+ * the core, this is the maximum time that excess idle threads
+ * will wait for new tasks before terminating.
+ * @param unit the time unit for the {@code keepAliveTime} argument
+ * @param workQueue the queue to use for holding tasks before they are
+ * executed. This queue will hold only the {@code Runnable}
+ * tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
- * creates a new thread.
+ * creates a new thread
* @param handler the handler to use when execution is blocked
- * because the thread bounds and queue capacities are reached.
- * @throws IllegalArgumentException if corePoolSize, or
- * keepAliveTime less than zero, or if maximumPoolSize less than or
- * equal to zero, or if corePoolSize greater than maximumPoolSize.
- * @throws NullPointerException if <tt>workQueue</tt>
- * or <tt>threadFactory</tt> or <tt>handler</tt> are null.
+ * because the thread bounds and queue capacities are reached
+ * @throws IllegalArgumentException if one of the following holds:<br>
+ * {@code corePoolSize < 0}<br>
+ * {@code keepAliveTime < 0}<br>
+ * {@code maximumPoolSize <= 0}<br>
+ * {@code maximumPoolSize < corePoolSize}
+ * @throws NullPointerException if {@code workQueue}
+ * or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
@@ -826,181 +1241,140 @@
this.handler = handler;
}
-
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
- * the task is handled by the current <tt>RejectedExecutionHandler</tt>.
+ * the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
- * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
- * for execution
- * @throws NullPointerException if command is null
+ * {@code RejectedExecutionHandler}, if the task
+ * cannot be accepted for execution
+ * @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
- for (;;) {
- if (runState != RUNNING) {
- reject(command);
- return;
- }
- if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
- return;
- if (workQueue.offer(command))
- return;
- Runnable r = addIfUnderMaximumPoolSize(command);
- if (r == command)
+ /*
+ * Proceed in 3 steps:
+ *
+ * 1. If fewer than corePoolSize threads are running, try to
+ * start a new thread with the given command as its first
+ * task. The call to addWorker atomically checks runState and
+ * workerCount, and so prevents false alarms that would add
+ * threads when it shouldn't, by returning false.
+ *
+ * 2. If a task can be successfully queued, then we still need
+ * to double-check whether we should have added a thread
+ * (because existing ones died since last checking) or that
+ * the pool shut down since entry into this method. So we
+ * recheck state and if necessary roll back the enqueuing if
+ * stopped, or start a new thread if there are none.
+ *
+ * 3. If we cannot queue task, then we try to add a new
+ * thread. If it fails, we know we are shut down or saturated
+ * and so reject the task.
+ */
+ int c = ctl.get();
+ if (workerCountOf(c) < corePoolSize) {
+ if (addWorker(command, true))
return;
- if (r == null) {
+ c = ctl.get();
+ }
+ if (isRunning(c) && workQueue.offer(command)) {
+ int recheck = ctl.get();
+ if (! isRunning(recheck) && remove(command))
reject(command);
- return;
- }
- // else retry
+ else if (workerCountOf(recheck) == 0)
+ addWorker(null, false);
}
+ else if (!addWorker(command, false))
+ reject(command);
}
/**
* Initiates an orderly shutdown in which previously submitted
- * tasks are executed, but no new tasks will be
- * accepted. Invocation has no additional effect if already shut
- * down.
- * @throws SecurityException if a security manager exists and
- * shutting down this ExecutorService may manipulate threads that
- * the caller is not permitted to modify because it does not hold
- * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
- * or the security manager's <tt>checkAccess</tt> method denies access.
+ * tasks are executed, but no new tasks will be accepted.
+ * Invocation has no additional effect if already shut down.
+ *
+ * <p>This method does not wait for previously submitted tasks to
+ * complete execution. Use {@link #awaitTermination awaitTermination}
+ * to do that.
+ *
+ * @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
- // Fail if caller doesn't have modifyThread permission
- SecurityManager security = System.getSecurityManager();
- if (security != null)
- java.security.AccessController.checkPermission(shutdownPerm);
-
- boolean fullyTerminated = false;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
- if (workers.size() > 0) {
- // Check if caller can modify worker threads. This
- // might not be true even if passed above check, if
- // the SecurityManager treats some threads specially.
- if (security != null) {
- for (Worker w: workers)
- security.checkAccess(w.thread);
- }
-
- int state = runState;
- if (state == RUNNING) // don't override shutdownNow
- runState = SHUTDOWN;
-
- try {
- for (Worker w: workers)
- w.interruptIfIdle();
- } catch(SecurityException se) {
- // If SecurityManager allows above checks, but
- // then unexpectedly throws exception when
- // interrupting threads (which it ought not do),
- // back out as cleanly as we can. Some threads may
- // have been killed but we remain in non-shutdown
- // state.
- runState = state;
- throw se;
- }
- }
- else { // If no workers, trigger full termination now
- fullyTerminated = true;
- runState = TERMINATED;
- termination.signalAll();
- }
+ checkShutdownAccess();
+ advanceRunState(SHUTDOWN);
+ interruptIdleWorkers();
+ onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
- if (fullyTerminated)
- terminated();
+ tryTerminate();
}
-
/**
* Attempts to stop all actively executing tasks, halts the
- * processing of waiting tasks, and returns a list of the tasks that were
- * awaiting execution.
- *
- * <p>This implementation cancels tasks via {@link
- * Thread#interrupt}, so if any tasks mask or fail to respond to
- * interrupts, they may never terminate.
- *
- * @return list of tasks that never commenced execution
- * @throws SecurityException if a security manager exists and
- * shutting down this ExecutorService may manipulate threads that
- * the caller is not permitted to modify because it does not hold
- * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
- * or the security manager's <tt>checkAccess</tt> method denies access.
+ * processing of waiting tasks, and returns a list of the tasks
+ * that were awaiting execution. These tasks are drained (removed)
+ * from the task queue upon return from this method.
+ *
+ * <p>This method does not wait for actively executing tasks to
+ * terminate. Use {@link #awaitTermination awaitTermination} to
+ * do that.
+ *
+ * <p>There are no guarantees beyond best-effort attempts to stop
+ * processing actively executing tasks. This implementation
+ * cancels tasks via {@link Thread#interrupt}, so any task that
+ * fails to respond to interrupts may never terminate.
+ *
+ * @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
- // Almost the same code as shutdown()
- SecurityManager security = System.getSecurityManager();
- if (security != null)
- java.security.AccessController.checkPermission(shutdownPerm);
-
- boolean fullyTerminated = false;
+ List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
- if (workers.size() > 0) {
- if (security != null) {
- for (Worker w: workers)
- security.checkAccess(w.thread);
- }
-
- int state = runState;
- if (state != TERMINATED)
- runState = STOP;
- try {
- for (Worker w : workers)
- w.interruptNow();
- } catch(SecurityException se) {
- runState = state; // back out;
- throw se;
- }
- }
- else { // If no workers, trigger full termination now
- fullyTerminated = true;
- runState = TERMINATED;
- termination.signalAll();
- }
+ checkShutdownAccess();
+ advanceRunState(STOP);
+ interruptWorkers();
+ tasks = drainQueue();
} finally {
mainLock.unlock();
}
- if (fullyTerminated)
- terminated();
- return Arrays.asList(workQueue.toArray(EMPTY_RUNNABLE_ARRAY));
+ tryTerminate();
+ return tasks;
}
public boolean isShutdown() {
- return runState != RUNNING;
+ return ! isRunning(ctl.get());
}
- /**
+ /**
* Returns true if this executor is in the process of terminating
- * after <tt>shutdown</tt> or <tt>shutdownNow</tt> but has not
+ * after {@link #shutdown} or {@link #shutdownNow} but has not
* completely terminated. This method may be useful for
- * debugging. A return of <tt>true</tt> reported a sufficient
+ * debugging. A return of {@code true} reported a sufficient
* period after shutdown may indicate that submitted tasks have
* ignored or suppressed interruption, causing this executor not
* to properly terminate.
- * @return true if terminating but not yet terminated.
+ *
+ * @return true if terminating but not yet terminated
*/
public boolean isTerminating() {
- return runState == STOP;
+ int c = ctl.get();
+ return ! isRunning(c) && runStateLessThan(c, TERMINATED);
}
public boolean isTerminated() {
- return runState == TERMINATED;
+ return runStateAtLeast(ctl.get(), TERMINATED);
}
public boolean awaitTermination(long timeout, TimeUnit unit)
@@ -1010,7 +1384,7 @@
mainLock.lock();
try {
for (;;) {
- if (runState == TERMINATED)
+ if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
@@ -1022,10 +1396,10 @@
}
/**
- * Invokes <tt>shutdown</tt> when this executor is no longer
- * referenced.
- */
- protected void finalize() {
+ * Invokes {@code shutdown} when this executor is no longer
+ * referenced and it has no threads.
+ */
+ protected void finalize() {
shutdown();
}
@@ -1076,103 +1450,33 @@
}
/**
- * Returns the task queue used by this executor. Access to the
- * task queue is intended primarily for debugging and monitoring.
- * This queue may be in active use. Retrieving the task queue
- * does not prevent queued tasks from executing.
- *
- * @return the task queue
- */
- public BlockingQueue<Runnable> getQueue() {
- return workQueue;
- }
-
- /**
- * Removes this task from the executor's internal queue if it is
- * present, thus causing it not to be run if it has not already
- * started.
- *
- * <p> This method may be useful as one part of a cancellation
- * scheme. It may fail to remove tasks that have been converted
- * into other forms before being placed on the internal queue. For
- * example, a task entered using <tt>submit</tt> might be
- * converted into a form that maintains <tt>Future</tt> status.
- * However, in such cases, method {@link ThreadPoolExecutor#purge}
- * may be used to remove those Futures that have been cancelled.
- *
- *
- * @param task the task to remove
- * @return true if the task was removed
- */
- public boolean remove(Runnable task) {
- return getQueue().remove(task);
- }
-
-
- /**
- * Tries to remove from the work queue all {@link Future}
- * tasks that have been cancelled. This method can be useful as a
- * storage reclamation operation, that has no other impact on
- * functionality. Cancelled tasks are never executed, but may
- * accumulate in work queues until worker threads can actively
- * remove them. Invoking this method instead tries to remove them now.
- * However, this method may fail to remove tasks in
- * the presence of interference by other threads.
- */
- public void purge() {
- // Fail if we encounter interference during traversal
- try {
- Iterator<Runnable> it = getQueue().iterator();
- while (it.hasNext()) {
- Runnable r = it.next();
- if (r instanceof Future<?>) {
- Future<?> c = (Future<?>)r;
- if (c.isCancelled())
- it.remove();
- }
- }
- }
- catch(ConcurrentModificationException ex) {
- return;
- }
- }
-
- /**
* Sets the core number of threads. This overrides any value set
* in the constructor. If the new value is smaller than the
* current value, excess existing threads will be terminated when
- * they next become idle. If larger, new threads will, if needed,
+ * they next become idle. If larger, new threads will, if needed,
* be started to execute any queued tasks.
*
* @param corePoolSize the new core size
- * @throws IllegalArgumentException if <tt>corePoolSize</tt>
- * less than zero
+ * @throws IllegalArgumentException if {@code corePoolSize < 0}
* @see #getCorePoolSize
*/
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- int extra = this.corePoolSize - corePoolSize;
- this.corePoolSize = corePoolSize;
- if (extra < 0) {
- Runnable r;
- while (extra++ < 0 && poolSize < corePoolSize &&
- (r = workQueue.poll()) != null)
- addThread(r).start();
- }
- else if (extra > 0 && poolSize > corePoolSize) {
- Iterator<Worker> it = workers.iterator();
- while (it.hasNext() &&
- extra-- > 0 &&
- poolSize > corePoolSize &&
- workQueue.remainingCapacity() == 0)
- it.next().interruptIfIdle();
+ int delta = corePoolSize - this.corePoolSize;
+ this.corePoolSize = corePoolSize;
+ if (workerCountOf(ctl.get()) > corePoolSize)
+ interruptIdleWorkers();
+ else if (delta > 0) {
+ // We don't really know how many new threads are "needed".
+ // As a heuristic, prestart enough new workers (up to new
+ // core size) to handle the current number of tasks in
+ // queue, but stop if queue becomes empty while doing so.
+ int k = Math.min(delta, workQueue.size());
+ while (k-- > 0 && addWorker(null, true)) {
+ if (workQueue.isEmpty())
+ break;
}
- } finally {
- mainLock.unlock();
}
}
@@ -1189,23 +1493,26 @@
/**
* Starts a core thread, causing it to idly wait for work. This
* overrides the default policy of starting core threads only when
- * new tasks are executed. This method will return <tt>false</tt>
+ * new tasks are executed. This method will return {@code false}
* if all core threads have already been started.
- * @return true if a thread was started
- */
+ *
+ * @return {@code true} if a thread was started
+ */
public boolean prestartCoreThread() {
- return addIfUnderCorePoolSize(null);
+ return workerCountOf(ctl.get()) < corePoolSize &&
+ addWorker(null, true);
}
/**
* Starts all core threads, causing them to idly wait for work. This
* overrides the default policy of starting core threads only when
- * new tasks are executed.
- * @return the number of threads started.
- */
+ * new tasks are executed.
+ *
+ * @return the number of threads started
+ */
public int prestartAllCoreThreads() {
int n = 0;
- while (addIfUnderCorePoolSize(null))
+ while (addWorker(null, true))
++n;
return n;
}
@@ -1217,30 +1524,17 @@
* terminated when they next become idle.
*
* @param maximumPoolSize the new maximum
- * @throws IllegalArgumentException if maximumPoolSize less than zero or
- * the {@link #getCorePoolSize core pool size}
+ * @throws IllegalArgumentException if the new maximum is
+ * less than or equal to zero, or
+ * less than the {@linkplain #getCorePoolSize core pool size}
* @see #getMaximumPoolSize
*/
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- int extra = this.maximumPoolSize - maximumPoolSize;
- this.maximumPoolSize = maximumPoolSize;
- if (extra > 0 && poolSize > maximumPoolSize) {
- Iterator<Worker> it = workers.iterator();
- while (it.hasNext() &&
- extra > 0 &&
- poolSize > maximumPoolSize) {
- it.next().interruptIfIdle();
- --extra;
- }
- }
- } finally {
- mainLock.unlock();
- }
+ this.maximumPoolSize = maximumPoolSize;
+ if (workerCountOf(ctl.get()) > maximumPoolSize)
+ interruptIdleWorkers();
}
/**
@@ -1259,21 +1553,27 @@
* threads currently in the pool, after waiting this amount of
* time without processing a task, excess threads will be
* terminated. This overrides any value set in the constructor.
+ *
* @param time the time to wait. A time value of zero will cause
- * excess threads to terminate immediately after executing tasks.
- * @param unit the time unit of the time argument
- * @throws IllegalArgumentException if time less than zero
+ * excess threads to terminate immediately after executing tasks.
+ * @param unit the time unit of the {@code time} argument
+ * @throws IllegalArgumentException if {@code time} less than zero or
+ * if {@code time} is zero and {@code allowsCoreThreadTimeOut}
* @see #getKeepAliveTime
*/
public void setKeepAliveTime(long time, TimeUnit unit) {
if (time < 0)
throw new IllegalArgumentException();
- this.keepAliveTime = unit.toNanos(time);
+ long keepAliveTime = unit.toNanos(time);
+ long delta = keepAliveTime - this.keepAliveTime;
+ this.keepAliveTime = keepAliveTime;
+ if (delta < 0)
+ interruptIdleWorkers();
}
/**
* Returns the thread keep-alive time, which is the amount of time
- * which threads in excess of the core pool size may remain
+ * that threads in excess of the core pool size may remain
* idle before being terminated.
*
* @param unit the desired time unit of the result
@@ -1284,6 +1584,73 @@
return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
}
+ /* User-level queue utilities */
+
+ /**
+ * Returns the task queue used by this executor. Access to the
+ * task queue is intended primarily for debugging and monitoring.
+ * This queue may be in active use. Retrieving the task queue
+ * does not prevent queued tasks from executing.
+ *
+ * @return the task queue
+ */
+ public BlockingQueue<Runnable> getQueue() {
+ return workQueue;
+ }
+
+ /**
+ * Removes this task from the executor's internal queue if it is
+ * present, thus causing it not to be run if it has not already
+ * started.
+ *
+ * <p> This method may be useful as one part of a cancellation
+ * scheme. It may fail to remove tasks that have been converted
+ * into other forms before being placed on the internal queue. For
+ * example, a task entered using {@code submit} might be
+ * converted into a form that maintains {@code Future} status.
+ * However, in such cases, method {@link #purge} may be used to
+ * remove those Futures that have been cancelled.
+ *
+ * @param task the task to remove
+ * @return true if the task was removed
+ */
+ public boolean remove(Runnable task) {
+ boolean removed = workQueue.remove(task);
+ tryTerminate(); // In case SHUTDOWN and now empty
+ return removed;
+ }
+
+ /**
+ * Tries to remove from the work queue all {@link Future}
+ * tasks that have been cancelled. This method can be useful as a
+ * storage reclamation operation, that has no other impact on
+ * functionality. Cancelled tasks are never executed, but may
+ * accumulate in work queues until worker threads can actively
+ * remove them. Invoking this method instead tries to remove them now.
+ * However, this method may fail to remove tasks in
+ * the presence of interference by other threads.
+ */
+ public void purge() {
+ final BlockingQueue<Runnable> q = workQueue;
+ try {
+ Iterator<Runnable> it = q.iterator();
+ while (it.hasNext()) {
+ Runnable r = it.next();
+ if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
+ it.remove();
+ }
+ } catch (ConcurrentModificationException fallThrough) {
+ // Take slow path if we encounter interference during traversal.
+ // Make copy for traversal and call remove for cancelled entries.
+ // The slow path is more likely to be O(N*N).
+ for (Object r : q.toArray())
+ if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
+ q.remove(r);
+ }
+
+ tryTerminate(); // In case SHUTDOWN and now empty
+ }
+
/* Statistics */
/**
@@ -1292,7 +1659,16 @@
* @return the number of threads
*/
public int getPoolSize() {
- return poolSize;
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ // Remove rare and surprising possibility of
+ // isTerminated() && getPoolSize() > 0
[... 221 lines stripped ...]