You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2021/07/22 14:48:14 UTC
[tomcat] branch 9.0.x updated: Fix BZ 65454. Correct a timing issue
that could delay a request
This is an automated email from the ASF dual-hosted git repository.
markt pushed a commit to branch 9.0.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/9.0.x by this push:
new dae37f4 Fix BZ 65454. Correct a timing issue that could delay a request
dae37f4 is described below
commit dae37f4421088a2430c765443d3b6a4fb63ace7b
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Thu Jul 22 15:47:22 2021 +0100
Fix BZ 65454. Correct a timing issue that could delay a request
https://bz.apache.org/bugzilla/show_bug.cgi?id=65454
If the work queue is not empty, it is likely that a task was added to
the work queue between this thread timing out and the worker count being
decremented a few lines above this comment. In this case, create a
replacement worker so that the task isn't held in the queue waiting for
one of the other workers to finish.
Note: Based on the ThreadPoolExecutir from JSR 166
---
.../tomcat/util/threads/ThreadPoolExecutor.java | 2365 +++++++++++++++++++-
webapps/docs/changelog.xml | 6 +
2 files changed, 2264 insertions(+), 107 deletions(-)
diff --git a/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java b/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java
index 3038baa..6f73fda 100644
--- a/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java
+++ b/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java
@@ -14,83 +14,2174 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
+/*
+ * The original version of this file carried the following notice:
+ *
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/publicdomain/zero/1.0/
+ */
package org.apache.tomcat.util.threads;
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.tomcat.util.res.StringManager;
/**
- * Same as a java.util.concurrent.ThreadPoolExecutor but implements a much more efficient
- * {@link #getSubmittedCount()} method, to be used to properly handle the work queue.
- * If a RejectedExecutionHandler is not specified a default one will be configured
- * and that one will always throw a RejectedExecutionException
+ * An {@link java.util.concurrent.ExecutorService}
+ * that executes each submitted task using
+ * one of possibly several pooled threads, normally configured
+ * using {@link Executors} factory methods.
+ *
+ * <p>Thread pools address two different problems: they usually
+ * provide improved performance when executing large numbers of
+ * asynchronous tasks, due to reduced per-task invocation overhead,
+ * and they provide a means of bounding and managing the resources,
+ * including threads, consumed when executing a collection of tasks.
+ * Each {@code ThreadPoolExecutor} also maintains some basic
+ * statistics, such as the number of completed tasks.
+ *
+ * <p>To be useful across a wide range of contexts, this class
+ * provides many adjustable parameters and extensibility
+ * hooks. However, programmers are urged to use the more convenient
+ * {@link Executors} factory methods {@link
+ * Executors#newCachedThreadPool} (unbounded thread pool, with
+ * automatic thread reclamation), {@link Executors#newFixedThreadPool}
+ * (fixed size thread pool) and {@link
+ * Executors#newSingleThreadExecutor} (single background thread), that
+ * preconfigure settings for the most common usage
+ * scenarios. Otherwise, use the following guide when manually
+ * configuring and tuning this class:
+ *
+ * <dl>
+ *
+ * <dt>Core and maximum pool sizes</dt>
+ *
+ * <dd>A {@code ThreadPoolExecutor} will automatically adjust the
+ * pool size (see {@link #getPoolSize})
+ * according to the bounds set by
+ * corePoolSize (see {@link #getCorePoolSize}) and
+ * maximumPoolSize (see {@link #getMaximumPoolSize}).
+ *
+ * When a new task is submitted in method {@link #execute(Runnable)},
+ * if fewer than corePoolSize threads are running, a new thread is
+ * created to handle the request, even if other worker threads are
+ * idle. Else if fewer than maximumPoolSize threads are running, a
+ * new thread will be created to handle the request only if the queue
+ * is full. By setting corePoolSize and maximumPoolSize the same, you
+ * create a fixed-size thread pool. By setting maximumPoolSize to an
+ * essentially unbounded value such as {@code Integer.MAX_VALUE}, you
+ * allow the pool to accommodate an arbitrary number of concurrent
+ * tasks. Most typically, core and maximum pool sizes are set only
+ * upon construction, but they may also be changed dynamically using
+ * {@link #setCorePoolSize} and {@link #setMaximumPoolSize}. </dd>
+ *
+ * <dt>On-demand construction</dt>
+ *
+ * <dd>By default, even core threads are initially created and
+ * started only when new tasks arrive, but this can be overridden
+ * dynamically using method {@link #prestartCoreThread} or {@link
+ * #prestartAllCoreThreads}. You probably want to prestart threads if
+ * you construct the pool with a non-empty queue. </dd>
+ *
+ * <dt>Creating new threads</dt>
+ *
+ * <dd>New threads are created using a {@link ThreadFactory}. If not
+ * otherwise specified, a {@link Executors#defaultThreadFactory} is
+ * used, that creates threads to all be in the same {@link
+ * ThreadGroup} and with the same {@code NORM_PRIORITY} priority and
+ * non-daemon status. By supplying a different ThreadFactory, you can
+ * alter the thread's name, thread group, priority, daemon status,
+ * etc. If a {@code ThreadFactory} fails to create a thread when asked
+ * by returning null from {@code newThread}, the executor will
+ * continue, but might not be able to execute any tasks. Threads
+ * should possess the "modifyThread" {@code RuntimePermission}. If
+ * worker threads or other threads using the pool do not possess this
+ * permission, service may be degraded: configuration changes may not
+ * take effect in a timely manner, and a shutdown pool may remain in a
+ * state in which termination is possible but not completed.</dd>
+ *
+ * <dt>Keep-alive times</dt>
+ *
+ * <dd>If the pool currently has more than corePoolSize threads,
+ * excess threads will be terminated if they have been idle for more
+ * than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}).
+ * This provides a means of reducing resource consumption when the
+ * pool is not being actively used. If the pool becomes more active
+ * later, new threads will be constructed. This parameter can also be
+ * changed dynamically using method {@link #setKeepAliveTime(long,
+ * TimeUnit)}. Using a value of {@code Long.MAX_VALUE} {@link
+ * TimeUnit#NANOSECONDS} effectively disables idle threads from ever
+ * terminating prior to shut down. By default, the keep-alive policy
+ * applies only when there are more than corePoolSize threads, but
+ * method {@link #allowCoreThreadTimeOut(boolean)} can be used to
+ * apply this time-out policy to core threads as well, so long as the
+ * keepAliveTime value is non-zero. </dd>
+ *
+ * <dt>Queuing</dt>
+ *
+ * <dd>Any {@link BlockingQueue} may be used to transfer and hold
+ * submitted tasks. The use of this queue interacts with pool sizing:
+ *
+ * <ul>
+ *
+ * <li>If fewer than corePoolSize threads are running, the Executor
+ * always prefers adding a new thread
+ * rather than queuing.
+ *
+ * <li>If corePoolSize or more threads are running, the Executor
+ * always prefers queuing a request rather than adding a new
+ * thread.
+ *
+ * <li>If a request cannot be queued, a new thread is created unless
+ * this would exceed maximumPoolSize, in which case, the task will be
+ * rejected.
+ *
+ * </ul>
+ *
+ * There are three general strategies for queuing:
+ * <ol>
+ *
+ * <li><em> Direct handoffs.</em> A good default choice for a work
+ * queue is a {@link java.util.concurrent.SynchronousQueue}
+ * that hands off tasks to threads
+ * without otherwise holding them. Here, an attempt to queue a task
+ * will fail if no threads are immediately available to run it, so a
+ * new thread will be constructed. This policy avoids lockups when
+ * handling sets of requests that might have internal dependencies.
+ * Direct handoffs generally require unbounded maximumPoolSizes to
+ * avoid rejection of new submitted tasks. This in turn admits the
+ * possibility of unbounded thread growth when commands continue to
+ * arrive on average faster than they can be processed.
+ *
+ * <li><em> Unbounded queues.</em> Using an unbounded queue (for
+ * example a {@link java.util.concurrent.LinkedBlockingQueue}
+ * without a predefined
+ * capacity) will cause new tasks to wait in the queue when all
+ * corePoolSize threads are busy. Thus, no more than corePoolSize
+ * threads will ever be created. (And the value of the maximumPoolSize
+ * therefore doesn't have any effect.) This may be appropriate when
+ * each task is completely independent of others, so tasks cannot
+ * affect each others execution; for example, in a web page server.
+ * While this style of queuing can be useful in smoothing out
+ * transient bursts of requests, it admits the possibility of
+ * unbounded work queue growth when commands continue to arrive on
+ * average faster than they can be processed.
+ *
+ * <li><em>Bounded queues.</em> A bounded queue (for example, an
+ * {@link java.util.concurrent.ArrayBlockingQueue})
+ * helps prevent resource exhaustion when
+ * used with finite maximumPoolSizes, but can be more difficult to
+ * tune and control. Queue sizes and maximum pool sizes may be traded
+ * off for each other: Using large queues and small pools minimizes
+ * CPU usage, OS resources, and context-switching overhead, but can
+ * lead to artificially low throughput. If tasks frequently block (for
+ * example if they are I/O bound), a system may be able to schedule
+ * time for more threads than you otherwise allow. Use of small queues
+ * generally requires larger pool sizes, which keeps CPUs busier but
+ * may encounter unacceptable scheduling overhead, which also
+ * decreases throughput.
+ *
+ * </ol>
+ *
+ * </dd>
+ *
+ * <dt>Rejected tasks</dt>
+ *
+ * <dd>New tasks submitted in method {@link #execute(Runnable)} will be
+ * <em>rejected</em> when the Executor has been shut down, and also when
+ * the Executor uses finite bounds for both maximum threads and work queue
+ * capacity, and is saturated. In either case, the {@code execute} method
+ * invokes the {@link
+ * RejectedExecutionHandler#rejectedExecution(Runnable, ThreadPoolExecutor)}
+ * method of its {@link RejectedExecutionHandler}. Four predefined handler
+ * policies are provided:
+ *
+ * <ol>
*
+ * <li>In the default {@link ThreadPoolExecutor.AbortPolicy}, the handler
+ * throws a runtime {@link RejectedExecutionException} upon rejection.
+ *
+ * <li>In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread
+ * that invokes {@code execute} itself runs the task. This provides a
+ * simple feedback control mechanism that will slow down the rate that
+ * new tasks are submitted.
+ *
+ * <li>In {@link ThreadPoolExecutor.DiscardPolicy}, a task that cannot
+ * be executed is simply dropped. This policy is designed only for
+ * those rare cases in which task completion is never relied upon.
+ *
+ * <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the
+ * executor is not shut down, the task at the head of the work queue
+ * is dropped, and then execution is retried (which can fail again,
+ * causing this to be repeated.) This policy is rarely acceptable. In
+ * nearly all cases, you should also cancel the task to cause an
+ * exception in any component waiting for its completion, and/or log
+ * the failure, as illustrated in {@link
+ * ThreadPoolExecutor.DiscardOldestPolicy} documentation.
+ *
+ * </ol>
+ *
+ * It is possible to define and use other kinds of {@link
+ * RejectedExecutionHandler} classes. Doing so requires some care
+ * especially when policies are designed to work only under particular
+ * capacity or queuing policies. </dd>
+ *
+ * <dt>Hook methods</dt>
+ *
+ * <dd>This class provides {@code protected} overridable
+ * {@link #beforeExecute(Thread, Runnable)} and
+ * {@link #afterExecute(Runnable, Throwable)} methods that are called
+ * before and after execution of each task. These can be used to
+ * manipulate the execution environment; for example, reinitializing
+ * ThreadLocals, gathering statistics, or adding log entries.
+ * Additionally, method {@link #terminated} can be overridden to perform
+ * any special processing that needs to be done once the Executor has
+ * fully terminated.
+ *
+ * <p>If hook, callback, or BlockingQueue methods throw exceptions,
+ * internal worker threads may in turn fail, abruptly terminate, and
+ * possibly be replaced.</dd>
+ *
+ * <dt>Queue maintenance</dt>
+ *
+ * <dd>Method {@link #getQueue()} allows access to the work queue
+ * for purposes of monitoring and debugging. Use of this method for
+ * any other purpose is strongly discouraged. Two supplied methods,
+ * {@link #remove(Runnable)} and {@link #purge} are available to
+ * assist in storage reclamation when large numbers of queued tasks
+ * become cancelled.</dd>
+ *
+ * <dt>Reclamation</dt>
+ *
+ * <dd>A pool that is no longer referenced in a program <em>AND</em>
+ * has no remaining threads may be reclaimed (garbage collected)
+ * without being explicitly shutdown. You can configure a pool to
+ * allow all unused threads to eventually die by setting appropriate
+ * keep-alive times, using a lower bound of zero core threads and/or
+ * setting {@link #allowCoreThreadTimeOut(boolean)}. </dd>
+ *
+ * </dl>
+ *
+ * <p><b>Extension example.</b> Most extensions of this class
+ * override one or more of the protected hook methods. For example,
+ * here is a subclass that adds a simple pause/resume feature:
+ *
+ * <pre> {@code
+ * class PausableThreadPoolExecutor extends ThreadPoolExecutor {
+ * private boolean isPaused;
+ * private ReentrantLock pauseLock = new ReentrantLock();
+ * private Condition unpaused = pauseLock.newCondition();
+ *
+ * public PausableThreadPoolExecutor(...) { super(...); }
+ *
+ * protected void beforeExecute(Thread t, Runnable r) {
+ * super.beforeExecute(t, r);
+ * pauseLock.lock();
+ * try {
+ * while (isPaused) unpaused.await();
+ * } catch (InterruptedException ie) {
+ * t.interrupt();
+ * } finally {
+ * pauseLock.unlock();
+ * }
+ * }
+ *
+ * public void pause() {
+ * pauseLock.lock();
+ * try {
+ * isPaused = true;
+ * } finally {
+ * pauseLock.unlock();
+ * }
+ * }
+ *
+ * public void resume() {
+ * pauseLock.lock();
+ * try {
+ * isPaused = false;
+ * unpaused.signalAll();
+ * } finally {
+ * pauseLock.unlock();
+ * }
+ * }
+ * }}</pre>
+ *
+ * @since 1.5
+ * @author Doug Lea
*/
-public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
+public class ThreadPoolExecutor extends AbstractExecutorService {
+
+ protected static final StringManager sm = StringManager.getManager("org.apache.tomcat.util.threads.res");
+
+ /**
+ * The main pool control state, ctl, is an atomic integer packing
+ * two conceptual fields
+ * workerCount, indicating the effective number of threads
+ * runState, indicating whether running, shutting down etc
+ *
+ * In order to pack them into one int, we limit workerCount to
+ * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
+ * billion) otherwise representable. If this is ever an issue in
+ * the future, the variable can be changed to be an AtomicLong,
+ * and the shift/mask constants below adjusted. But until the need
+ * arises, this code is a bit faster and simpler using an int.
+ *
+ * The workerCount is the number of workers that have been
+ * permitted to start and not permitted to stop. The value may be
+ * transiently different from the actual number of live threads,
+ * for example when a ThreadFactory fails to create a thread when
+ * asked, and when exiting threads are still performing
+ * bookkeeping before terminating. The user-visible pool size is
+ * reported as the current size of the workers set.
+ *
+ * The runState provides the main lifecycle control, taking on values:
+ *
+ * RUNNING: Accept new tasks and process queued tasks
+ * SHUTDOWN: Don't accept new tasks, but process queued tasks
+ * STOP: Don't accept new tasks, don't process queued tasks,
+ * and interrupt in-progress tasks
+ * TIDYING: All tasks have terminated, workerCount is zero,
+ * the thread transitioning to state TIDYING
+ * will run the terminated() hook method
+ * TERMINATED: terminated() has completed
+ *
+ * The numerical order among these values matters, to allow
+ * ordered comparisons. The runState monotonically increases over
+ * time, but need not hit each state. The transitions are:
+ *
+ * RUNNING -> SHUTDOWN
+ * On invocation of shutdown()
+ * (RUNNING or SHUTDOWN) -> STOP
+ * On invocation of shutdownNow()
+ * SHUTDOWN -> TIDYING
+ * When both queue and pool are empty
+ * STOP -> TIDYING
+ * When pool is empty
+ * TIDYING -> TERMINATED
+ * When the terminated() hook method has completed
+ *
+ * Threads waiting in awaitTermination() will return when the
+ * state reaches TERMINATED.
+ *
+ * Detecting the transition from SHUTDOWN to TIDYING is less
+ * straightforward than you'd like because the queue may become
+ * empty after non-empty and vice versa during SHUTDOWN state, but
+ * we can only terminate if, after seeing that it is empty, we see
+ * that workerCount is 0 (which sometimes entails a recheck -- see
+ * below).
+ */
+ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
+ private static final int COUNT_BITS = Integer.SIZE - 3;
+ private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
+
+ // runState is stored in the high-order bits
+ private static final int RUNNING = -1 << COUNT_BITS;
+ private static final int SHUTDOWN = 0 << COUNT_BITS;
+ private static final int STOP = 1 << COUNT_BITS;
+ private static final int TIDYING = 2 << COUNT_BITS;
+ private static final int TERMINATED = 3 << COUNT_BITS;
+
+ // Packing and unpacking ctl
+ private static int workerCountOf(int c) { return c & COUNT_MASK; }
+ private static int ctlOf(int rs, int wc) { return rs | wc; }
+
+ /*
+ * Bit field accessors that don't require unpacking ctl.
+ * These depend on the bit layout and on workerCount being never negative.
+ */
+
+ private static boolean runStateLessThan(int c, int s) {
+ return c < s;
+ }
+
+ private static boolean runStateAtLeast(int c, int s) {
+ return c >= s;
+ }
+
+ private static boolean isRunning(int c) {
+ return c < SHUTDOWN;
+ }
+
+ /**
+ * Attempts to CAS-increment the workerCount field of ctl.
+ */
+ private boolean compareAndIncrementWorkerCount(int expect) {
+ return ctl.compareAndSet(expect, expect + 1);
+ }
+
+ /**
+ * Attempts to CAS-decrement the workerCount field of ctl.
+ */
+ private boolean compareAndDecrementWorkerCount(int expect) {
+ return ctl.compareAndSet(expect, expect - 1);
+ }
+
+ /**
+ * Decrements the workerCount field of ctl. This is called only on
+ * abrupt termination of a thread (see processWorkerExit). Other
+ * decrements are performed within getTask.
+ */
+ private void decrementWorkerCount() {
+ ctl.addAndGet(-1);
+ }
+
+ /**
+ * The queue used for holding tasks and handing off to worker
+ * threads. We do not require that workQueue.poll() returning
+ * null necessarily means that workQueue.isEmpty(), so rely
+ * solely on isEmpty to see if the queue is empty (which we must
+ * do for example when deciding whether to transition from
+ * SHUTDOWN to TIDYING). This accommodates special-purpose
+ * queues such as DelayQueues for which poll() is allowed to
+ * return null even if it may later return non-null when delays
+ * expire.
+ */
+ private final BlockingQueue<Runnable> workQueue;
+
+ /**
+ * Lock held on access to workers set and related bookkeeping.
+ * While we could use a concurrent set of some sort, it turns out
+ * to be generally preferable to use a lock. Among the reasons is
+ * that this serializes interruptIdleWorkers, which avoids
+ * unnecessary interrupt storms, especially during shutdown.
+ * Otherwise exiting threads would concurrently interrupt those
+ * that have not yet interrupted. It also simplifies some of the
+ * associated statistics bookkeeping of largestPoolSize etc. We
+ * also hold mainLock on shutdown and shutdownNow, for the sake of
+ * ensuring workers set is stable while separately checking
+ * permission to interrupt and actually interrupting.
+ */
+ private final ReentrantLock mainLock = new ReentrantLock();
+
+ /**
+ * Set containing all worker threads in pool. Accessed only when
+ * holding mainLock.
+ */
+ private final HashSet<Worker> workers = new HashSet<>();
+
+ /**
+ * Wait condition to support awaitTermination.
+ */
+ private final Condition termination = mainLock.newCondition();
+
+ /**
+ * Tracks largest attained pool size. Accessed only under
+ * mainLock.
+ */
+ private int largestPoolSize;
+
+ /**
+ * Counter for completed tasks. Updated only on termination of
+ * worker threads. Accessed only under mainLock.
+ */
+ private long completedTaskCount;
+
+ /**
+ * The number of tasks submitted but not yet finished. This includes tasks
+ * in the queue and tasks that have been handed to a worker thread but the
+ * latter did not start executing the task yet.
+ * This number is always greater or equal to {@link #getActiveCount()}.
+ */
+ private final AtomicInteger submittedCount = new AtomicInteger(0);
+ private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);
+
+ /**
+ * Most recent time in ms when a thread decided to kill itself to avoid
+ * potential memory leaks. Useful to throttle the rate of renewals of
+ * threads.
+ */
+ private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L);
+
+ /*
+ * All user control parameters are declared as volatiles so that
+ * ongoing actions are based on freshest values, but without need
+ * for locking, since no internal invariants depend on them
+ * changing synchronously with respect to other actions.
+ */
+
+ /**
+ * Delay in ms between 2 threads being renewed. If negative, do not renew threads.
+ */
+ private volatile long threadRenewalDelay = Constants.DEFAULT_THREAD_RENEWAL_DELAY;
+
+ /**
+ * Factory for new threads. All threads are created using this
+ * factory (via method addWorker). All callers must be prepared
+ * for addWorker to fail, which may reflect a system or user's
+ * policy limiting the number of threads. Even though it is not
+ * treated as an error, failure to create threads may result in
+ * new tasks being rejected or existing ones remaining stuck in
+ * the queue.
+ *
+ * We go further and preserve pool invariants even in the face of
+ * errors such as OutOfMemoryError, that might be thrown while
+ * trying to create threads. Such errors are rather common due to
+ * the need to allocate a native stack in Thread.start, and users
+ * will want to perform clean pool shutdown to clean up. There
+ * will likely be enough memory available for the cleanup code to
+ * complete without encountering yet another OutOfMemoryError.
+ */
+ private volatile ThreadFactory threadFactory;
+
+ /**
+ * Handler called when saturated or shutdown in execute.
+ */
+ private volatile RejectedExecutionHandler handler;
+
+ /**
+ * Timeout in nanoseconds for idle threads waiting for work.
+ * Threads use this timeout when there are more than corePoolSize
+ * present or if allowCoreThreadTimeOut. Otherwise they wait
+ * forever for new work.
+ */
+ private volatile long keepAliveTime;
+
+ /**
+ * If false (default), core threads stay alive even when idle.
+ * If true, core threads use keepAliveTime to time out waiting
+ * for work.
+ */
+ private volatile boolean allowCoreThreadTimeOut;
+
+ /**
+ * Core pool size is the minimum number of workers to keep alive
+ * (and not allow to time out etc) unless allowCoreThreadTimeOut
+ * is set, in which case the minimum is zero.
+ *
+ * Since the worker count is actually stored in COUNT_BITS bits,
+ * the effective limit is {@code corePoolSize & COUNT_MASK}.
+ */
+ private volatile int corePoolSize;
+
+ /**
+ * Maximum pool size.
+ *
+ * Since the worker count is actually stored in COUNT_BITS bits,
+ * the effective limit is {@code maximumPoolSize & COUNT_MASK}.
+ */
+ private volatile int maximumPoolSize;
+
+ /**
+ * The default rejected execution handler.
+ */
+ private static final RejectedExecutionHandler defaultHandler = new RejectHandler();
+
+ /**
+ * Permission required for callers of shutdown and shutdownNow.
+ * We additionally require (see checkShutdownAccess) that callers
+ * have permission to actually interrupt threads in the worker set
+ * (as governed by Thread.interrupt, which relies on
+ * ThreadGroup.checkAccess, which in turn relies on
+ * SecurityManager.checkAccess). Shutdowns are attempted only if
+ * these checks pass.
+ *
+ * All actual invocations of Thread.interrupt (see
+ * interruptIdleWorkers and interruptWorkers) ignore
+ * SecurityExceptions, meaning that the attempted interrupts
+ * silently fail. In the case of shutdown, they should not fail
+ * unless the SecurityManager has inconsistent policies, sometimes
+ * allowing access to a thread and sometimes not. In such cases,
+ * failure to actually interrupt threads may disable or delay full
+ * termination. Other uses of interruptIdleWorkers are advisory,
+ * and failure to actually interrupt will merely delay response to
+ * configuration changes so is not handled exceptionally.
+ */
+ private static final RuntimePermission shutdownPerm =
+ new RuntimePermission("modifyThread");
+
+ /**
+ * Class Worker mainly maintains interrupt control state for
+ * threads running tasks, along with other minor bookkeeping.
+ * This class opportunistically extends AbstractQueuedSynchronizer
+ * to simplify acquiring and releasing a lock surrounding each
+ * task execution. This protects against interrupts that are
+ * intended to wake up a worker thread waiting for a task from
+ * instead interrupting a task being run. We implement a simple
+ * non-reentrant mutual exclusion lock rather than use
+ * ReentrantLock because we do not want worker tasks to be able to
+ * reacquire the lock when they invoke pool control methods like
+ * setCorePoolSize. Additionally, to suppress interrupts until
+ * the thread actually starts running tasks, we initialize lock
+ * state to a negative value, and clear it upon start (in
+ * runWorker).
+ */
+ private final class Worker
+ extends AbstractQueuedSynchronizer
+ implements Runnable
+ {
+ /**
+ * This class will never be serialized, but we provide a
+ * serialVersionUID to suppress a javac warning.
+ */
+ private static final long serialVersionUID = 6138294804551838833L;
+
+ /** Thread this worker is running in. Null if factory fails. */
+ final Thread thread;
+ /** Initial task to run. Possibly null. */
+ Runnable firstTask;
+ /** Per-thread task counter */
+ volatile long completedTasks;
+
+ // TODO: switch to AbstractQueuedLongSynchronizer and move
+ // completedTasks into the lock word.
+
+ /**
+ * Creates with given first task and thread from ThreadFactory.
+ * @param firstTask the first task (null if none)
+ */
+ Worker(Runnable firstTask) {
+ setState(-1); // inhibit interrupts until runWorker
+ this.firstTask = firstTask;
+ this.thread = getThreadFactory().newThread(this);
+ }
+
+ /** Delegates main run loop to outer runWorker. */
+ @Override
+ public void run() {
+ runWorker(this);
+ }
+
+ // Lock methods
+ //
+ // The value 0 represents the unlocked state.
+ // The value 1 represents the locked state.
+
+ @Override
+ protected boolean isHeldExclusively() {
+ return getState() != 0;
+ }
+
+ @Override
+ protected boolean tryAcquire(int unused) {
+ if (compareAndSetState(0, 1)) {
+ setExclusiveOwnerThread(Thread.currentThread());
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ protected boolean tryRelease(int unused) {
+ setExclusiveOwnerThread(null);
+ setState(0);
+ return true;
+ }
+
+ public void lock() { acquire(1); }
+ public boolean tryLock() { return tryAcquire(1); }
+ public void unlock() { release(1); }
+ public boolean isLocked() { return isHeldExclusively(); }
+
+ void interruptIfStarted() {
+ Thread t;
+ if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
+ try {
+ t.interrupt();
+ } catch (SecurityException ignore) {
+ }
+ }
+ }
+ }
+
+ /*
+ * Methods for setting control state
+ */
+
+ /**
+ * Transitions runState to given target, or leaves it alone if
+ * already at least the given target.
+ *
+ * @param targetState the desired state, either SHUTDOWN or STOP
+ * (but not TIDYING or TERMINATED -- use tryTerminate for that)
+ */
+ private void advanceRunState(int targetState) {
+ // assert targetState == SHUTDOWN || targetState == STOP;
+ for (;;) {
+ int c = ctl.get();
+ if (runStateAtLeast(c, targetState) ||
+ ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Transitions to TERMINATED state if either (SHUTDOWN and pool
+ * and queue empty) or (STOP and pool empty). If otherwise
+ * eligible to terminate but workerCount is nonzero, interrupts an
+ * idle worker to ensure that shutdown signals propagate. This
+ * method must be called following any action that might make
+ * termination possible -- reducing worker count or removing tasks
+ * from the queue during shutdown. The method is non-private to
+ * allow access from ScheduledThreadPoolExecutor.
+ */
+ final void tryTerminate() {
+ for (;;) {
+ int c = ctl.get();
+ if (isRunning(c) ||
+ runStateAtLeast(c, TIDYING) ||
+ (runStateLessThan(c, STOP) && ! workQueue.isEmpty())) {
+ return;
+ }
+ if (workerCountOf(c) != 0) { // Eligible to terminate
+ interruptIdleWorkers(ONLY_ONE);
+ return;
+ }
+
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
+ try {
+ terminated();
+ } finally {
+ ctl.set(ctlOf(TERMINATED, 0));
+ termination.signalAll();
+ }
+ return;
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ // else retry on failed CAS
+ }
+ }
+
+ /*
+ * Methods for controlling interrupts to worker threads.
+ */
+
+ /**
+ * If there is a security manager, makes sure caller has
+ * permission to shut down threads in general (see shutdownPerm).
+ * If this passes, additionally makes sure the caller is allowed
+ * to interrupt each worker thread. This might not be true even if
+ * first check passed, if the SecurityManager treats some threads
+ * specially.
+ */
+ private void checkShutdownAccess() {
+ // assert mainLock.isHeldByCurrentThread();
+ SecurityManager security = System.getSecurityManager();
+ if (security != null) {
+ security.checkPermission(shutdownPerm);
+ for (Worker w : workers) {
+ security.checkAccess(w.thread);
+ }
+ }
+ }
+
+ /**
+ * Interrupts all threads, even if active. Ignores SecurityExceptions
+ * (in which case some threads may remain uninterrupted).
+ */
+ private void interruptWorkers() {
+ // assert mainLock.isHeldByCurrentThread();
+ for (Worker w : workers) {
+ w.interruptIfStarted();
+ }
+ }
+
+ /**
+ * Interrupts threads that might be waiting for tasks (as
+ * indicated by not being locked) so they can check for
+ * termination or configuration changes. Ignores
+ * SecurityExceptions (in which case some threads may remain
+ * uninterrupted).
+ *
+ * @param onlyOne If true, interrupt at most one worker. This is
+ * called only from tryTerminate when termination is otherwise
+ * enabled but there are still other workers. In this case, at
+ * most one waiting worker is interrupted to propagate shutdown
+ * signals in case all threads are currently waiting.
+ * Interrupting any arbitrary thread ensures that newly arriving
+ * workers since shutdown began will also eventually exit.
+ * To guarantee eventual termination, it suffices to always
+ * interrupt only one idle worker, but shutdown() interrupts all
+ * idle workers so that redundant workers exit promptly, not
+ * waiting for a straggler task to finish.
+ */
+ private void interruptIdleWorkers(boolean onlyOne) {
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ for (Worker w : workers) {
+ Thread t = w.thread;
+ if (!t.isInterrupted() && w.tryLock()) {
+ try {
+ t.interrupt();
+ } catch (SecurityException ignore) {
+ } finally {
+ w.unlock();
+ }
+ }
+ if (onlyOne) {
+ break;
+ }
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Common form of interruptIdleWorkers, to avoid having to
+ * remember what the boolean argument means.
+ */
+ private void interruptIdleWorkers() {
+ interruptIdleWorkers(false);
+ }
+
+ private static final boolean ONLY_ONE = true;
+
+ /*
+ * Misc utilities, most of which are also exported to
+ * ScheduledThreadPoolExecutor
+ */
+
+ /**
+ * Invokes the rejected execution handler for the given command.
+ * Package-protected for use by ScheduledThreadPoolExecutor.
+ */
+ final void reject(Runnable command) {
+ handler.rejectedExecution(command, this);
+ }
+
+ /**
+ * Performs any further cleanup following run state transition on
+ * invocation of shutdown. A no-op here, but used by
+ * ScheduledThreadPoolExecutor to cancel delayed tasks.
+ */
+ void onShutdown() {
+ }
+
+ /**
+ * Drains the task queue into a new list, normally using
+ * drainTo. But if the queue is a DelayQueue or any other kind of
+ * queue for which poll or drainTo may fail to remove some
+ * elements, it deletes them one by one.
+ */
+ private List<Runnable> drainQueue() {
+ BlockingQueue<Runnable> q = workQueue;
+ ArrayList<Runnable> taskList = new ArrayList<>();
+ q.drainTo(taskList);
+ if (!q.isEmpty()) {
+ for (Runnable r : q.toArray(new Runnable[0])) {
+ if (q.remove(r)) {
+ taskList.add(r);
+ }
+ }
+ }
+ return taskList;
+ }
+
+ /*
+ * Methods for creating, running and cleaning up after workers
+ */
+
+ /**
+ * Checks if a new worker can be added with respect to current
+ * pool state and the given bound (either core or maximum). If so,
+ * the worker count is adjusted accordingly, and, if possible, a
+ * new worker is created and started, running firstTask as its
+ * first task. This method returns false if the pool is stopped or
+ * eligible to shut down. It also returns false if the thread
+ * factory fails to create a thread when asked. If the thread
+ * creation fails, either due to the thread factory returning
+ * null, or due to an exception (typically OutOfMemoryError in
+ * Thread.start()), we roll back cleanly.
+ *
+ * @param firstTask the task the new thread should run first (or
+ * null if none). Workers are created with an initial first task
+ * (in method execute()) to bypass queuing when there are fewer
+ * than corePoolSize threads (in which case we always start one),
+ * or when the queue is full (in which case we must bypass queue).
+ * Initially idle threads are usually created via
+ * prestartCoreThread or to replace other dying workers.
+ *
+ * @param core if true use corePoolSize as bound, else
+ * maximumPoolSize. (A boolean indicator is used here rather than a
+ * value to ensure reads of fresh values after checking other pool
+ * state).
+ * @return true if successful
+ */
+ private boolean addWorker(Runnable firstTask, boolean core) {
+ retry:
+ for (int c = ctl.get();;) {
+ // Check if queue empty only if necessary.
+ if (runStateAtLeast(c, SHUTDOWN)
+ && (runStateAtLeast(c, STOP)
+ || firstTask != null
+ || workQueue.isEmpty())) {
+ return false;
+ }
+
+ for (;;) {
+ if (workerCountOf(c)
+ >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) {
+ return false;
+ }
+ if (compareAndIncrementWorkerCount(c)) {
+ break retry;
+ }
+ c = ctl.get(); // Re-read ctl
+ if (runStateAtLeast(c, SHUTDOWN))
+ {
+ continue retry;
+ // else CAS failed due to workerCount change; retry inner loop
+ }
+ }
+ }
+
+ boolean workerStarted = false;
+ boolean workerAdded = false;
+ Worker w = null;
+ try {
+ w = new Worker(firstTask);
+ final Thread t = w.thread;
+ if (t != null) {
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ // Recheck while holding lock.
+ // Back out on ThreadFactory failure or if
+ // shut down before lock acquired.
+ int c = ctl.get();
+
+ if (isRunning(c) ||
+ (runStateLessThan(c, STOP) && firstTask == null)) {
+ if (t.getState() != Thread.State.NEW) {
+ throw new IllegalThreadStateException();
+ }
+ workers.add(w);
+ workerAdded = true;
+ int s = workers.size();
+ if (s > largestPoolSize) {
+ largestPoolSize = s;
+ }
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ if (workerAdded) {
+ t.start();
+ workerStarted = true;
+ }
+ }
+ } finally {
+ if (! workerStarted) {
+ addWorkerFailed(w);
+ }
+ }
+ return workerStarted;
+ }
+
+ /**
+ * Rolls back the worker thread creation.
+ * - removes worker from workers, if present
+ * - decrements worker count
+ * - rechecks for termination, in case the existence of this
+ * worker was holding up termination
+ */
+ private void addWorkerFailed(Worker w) {
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ if (w != null) {
+ workers.remove(w);
+ }
+ decrementWorkerCount();
+ tryTerminate();
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Performs cleanup and bookkeeping for a dying worker. Called
+ * only from worker threads. Unless completedAbruptly is set,
+ * assumes that workerCount has already been adjusted to account
+ * for exit. This method removes thread from worker set, and
+ * possibly terminates the pool or replaces the worker if either
+ * it exited due to user task exception or if fewer than
+ * corePoolSize workers are running or queue is non-empty but
+ * there are no workers.
+ *
+ * @param w the worker
+ * @param completedAbruptly if the worker died due to user exception
+ */
+ private void processWorkerExit(Worker w, boolean completedAbruptly) {
+ if (completedAbruptly) {
+ decrementWorkerCount();
+ }
+
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ completedTaskCount += w.completedTasks;
+ workers.remove(w);
+ } finally {
+ mainLock.unlock();
+ }
+
+ tryTerminate();
+
+ int c = ctl.get();
+ if (runStateLessThan(c, STOP)) {
+ if (!completedAbruptly) {
+ int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
+ if (min == 0 && ! workQueue.isEmpty()) {
+ min = 1;
+ }
+ // https://bz.apache.org/bugzilla/show_bug.cgi?id=65454
+ // If the work queue is not empty, it is likely that a task was
+ // added to the work queue between this thread timing out and
+ // the worker count being decremented a few lines above this
+ // comment. In this case, create a replacement worker so that
+ // the task isn't held in the queue waiting for one of the other
+ // workers to finish.
+ if (workerCountOf(c) >= min && workQueue.isEmpty()) {
+ return; // replacement not needed
+ }
+ }
+ addWorker(null, false);
+ }
+ }
+
+ /**
+ * Performs blocking or timed wait for a task, depending on
+ * current configuration settings, or returns null if this worker
+ * must exit because of any of:
+ * 1. There are more than maximumPoolSize workers (due to
+ * a call to setMaximumPoolSize).
+ * 2. The pool is stopped.
+ * 3. The pool is shutdown and the queue is empty.
+ * 4. This worker timed out waiting for a task, and timed-out
+ * workers are subject to termination (that is,
+ * {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
+ * both before and after the timed wait, and if the queue is
+ * non-empty, this worker is not the last thread in the pool.
+ *
+ * @return task, or null if the worker must exit, in which case
+ * workerCount is decremented
+ */
+ private Runnable getTask() {
+ boolean timedOut = false; // Did the last poll() time out?
+
+ for (;;) {
+ int c = ctl.get();
+
+ // Check if queue empty only if necessary.
+ if (runStateAtLeast(c, SHUTDOWN)
+ && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
+ decrementWorkerCount();
+ return null;
+ }
+
+ int wc = workerCountOf(c);
+
+ // Are workers subject to culling?
+ boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
+
+ if ((wc > maximumPoolSize || (timed && timedOut))
+ && (wc > 1 || workQueue.isEmpty())) {
+ if (compareAndDecrementWorkerCount(c)) {
+ return null;
+ }
+ continue;
+ }
+
+ try {
+ Runnable r = timed ?
+ workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
+ workQueue.take();
+ if (r != null) {
+ return r;
+ }
+ timedOut = true;
+ } catch (InterruptedException retry) {
+ timedOut = false;
+ }
+ }
+ }
+
+ /**
+ * Main worker run loop. Repeatedly gets tasks from queue and
+ * executes them, while coping with a number of issues:
+ *
+ * 1. We may start out with an initial task, in which case we
+ * don't need to get the first one. Otherwise, as long as pool is
+ * running, we get tasks from getTask. If it returns null then the
+ * worker exits due to changed pool state or configuration
+ * parameters. Other exits result from exception throws in
+ * external code, in which case completedAbruptly holds, which
+ * usually leads processWorkerExit to replace this thread.
+ *
+ * 2. Before running any task, the lock is acquired to prevent
+ * other pool interrupts while the task is executing, and then we
+ * ensure that unless pool is stopping, this thread does not have
+ * its interrupt set.
+ *
+ * 3. Each task run is preceded by a call to beforeExecute, which
+ * might throw an exception, in which case we cause thread to die
+ * (breaking loop with completedAbruptly true) without processing
+ * the task.
+ *
+ * 4. Assuming beforeExecute completes normally, we run the task,
+ * gathering any of its thrown exceptions to send to afterExecute.
+ * We separately handle RuntimeException, Error (both of which the
+ * specs guarantee that we trap) and arbitrary Throwables.
+ * Because we cannot rethrow Throwables within Runnable.run, we
+ * wrap them within Errors on the way out (to the thread's
+ * UncaughtExceptionHandler). Any thrown exception also
+ * conservatively causes thread to die.
+ *
+ * 5. After task.run completes, we call afterExecute, which may
+ * also throw an exception, which will also cause thread to
+ * die. According to JLS Sec 14.20, this exception is the one that
+ * will be in effect even if task.run throws.
+ *
+ * The net effect of the exception mechanics is that afterExecute
+ * and the thread's UncaughtExceptionHandler have as accurate
+ * information as we can provide about any problems encountered by
+ * user code.
+ *
+ * @param w the worker
+ */
+ @SuppressWarnings("null") // task cannot be null
+ final void runWorker(Worker w) {
+ Thread wt = Thread.currentThread();
+ Runnable task = w.firstTask;
+ w.firstTask = null;
+ w.unlock(); // allow interrupts
+ boolean completedAbruptly = true;
+ try {
+ while (task != null || (task = getTask()) != null) {
+ w.lock();
+ // If pool is stopping, ensure thread is interrupted;
+ // if not, ensure thread is not interrupted. This
+ // requires a recheck in second case to deal with
+ // shutdownNow race while clearing interrupt
+ if ((runStateAtLeast(ctl.get(), STOP) ||
+ (Thread.interrupted() &&
+ runStateAtLeast(ctl.get(), STOP))) &&
+ !wt.isInterrupted()) {
+ wt.interrupt();
+ }
+ try {
+ beforeExecute(wt, task);
+ try {
+ task.run();
+ afterExecute(task, null);
+ } catch (Throwable ex) {
+ afterExecute(task, ex);
+ throw ex;
+ }
+ } finally {
+ task = null;
+ w.completedTasks++;
+ w.unlock();
+ }
+ }
+ completedAbruptly = false;
+ } finally {
+ processWorkerExit(w, completedAbruptly);
+ }
+ }
+
+ // Public constructors and methods
+
+ /**
+ * Creates a new {@code ThreadPoolExecutor} with the given initial
+ * parameters, the
+ * {@linkplain Executors#defaultThreadFactory default thread factory}
+ * and the {@linkplain ThreadPoolExecutor.AbortPolicy
+ * default rejected execution handler}.
+ *
+ * <p>It may be more convenient to use one of the {@link Executors}
+ * factory methods instead of this general purpose constructor.
+ *
+ * @param corePoolSize the number of threads to keep in the pool, even
+ * if they are idle, unless {@code allowCoreThreadTimeOut} is set
+ * @param maximumPoolSize the maximum number of threads to allow in the
+ * pool
+ * @param keepAliveTime when the number of threads is greater than
+ * the core, this is the maximum time that excess idle threads
+ * will wait for new tasks before terminating.
+ * @param unit the time unit for the {@code keepAliveTime} argument
+ * @param workQueue the queue to use for holding tasks before they are
+ * executed. This queue will hold only the {@code Runnable}
+ * tasks submitted by the {@code execute} method.
+ * @throws IllegalArgumentException if one of the following holds:<br>
+ * {@code corePoolSize < 0}<br>
+ * {@code keepAliveTime < 0}<br>
+ * {@code maximumPoolSize <= 0}<br>
+ * {@code maximumPoolSize < corePoolSize}
+ * @throws NullPointerException if {@code workQueue} is null
+ */
+ public ThreadPoolExecutor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue) {
+ this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
+ Executors.defaultThreadFactory(), defaultHandler);
+ }
+
+ /**
+ * Creates a new {@code ThreadPoolExecutor} with the given initial
+ * parameters and the {@linkplain ThreadPoolExecutor.AbortPolicy
+ * default rejected execution handler}.
+ *
+ * @param corePoolSize the number of threads to keep in the pool, even
+ * if they are idle, unless {@code allowCoreThreadTimeOut} is set
+ * @param maximumPoolSize the maximum number of threads to allow in the
+ * pool
+ * @param keepAliveTime when the number of threads is greater than
+ * the core, this is the maximum time that excess idle threads
+ * will wait for new tasks before terminating.
+ * @param unit the time unit for the {@code keepAliveTime} argument
+ * @param workQueue the queue to use for holding tasks before they are
+ * executed. This queue will hold only the {@code Runnable}
+ * tasks submitted by the {@code execute} method.
+ * @param threadFactory the factory to use when the executor
+ * creates a new thread
+ * @throws IllegalArgumentException if one of the following holds:<br>
+ * {@code corePoolSize < 0}<br>
+ * {@code keepAliveTime < 0}<br>
+ * {@code maximumPoolSize <= 0}<br>
+ * {@code maximumPoolSize < corePoolSize}
+ * @throws NullPointerException if {@code workQueue}
+ * or {@code threadFactory} is null
+ */
+ public ThreadPoolExecutor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ ThreadFactory threadFactory) {
+ this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
+ threadFactory, defaultHandler);
+ }
+
+ /**
+ * Creates a new {@code ThreadPoolExecutor} with the given initial
+ * parameters and the
+ * {@linkplain Executors#defaultThreadFactory default thread factory}.
+ *
+ * @param corePoolSize the number of threads to keep in the pool, even
+ * if they are idle, unless {@code allowCoreThreadTimeOut} is set
+ * @param maximumPoolSize the maximum number of threads to allow in the
+ * pool
+ * @param keepAliveTime when the number of threads is greater than
+ * the core, this is the maximum time that excess idle threads
+ * will wait for new tasks before terminating.
+ * @param unit the time unit for the {@code keepAliveTime} argument
+ * @param workQueue the queue to use for holding tasks before they are
+ * executed. This queue will hold only the {@code Runnable}
+ * tasks submitted by the {@code execute} method.
+ * @param handler the handler to use when execution is blocked
+ * because the thread bounds and queue capacities are reached
+ * @throws IllegalArgumentException if one of the following holds:<br>
+ * {@code corePoolSize < 0}<br>
+ * {@code keepAliveTime < 0}<br>
+ * {@code maximumPoolSize <= 0}<br>
+ * {@code maximumPoolSize < corePoolSize}
+ * @throws NullPointerException if {@code workQueue}
+ * or {@code handler} is null
+ */
+ public ThreadPoolExecutor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ RejectedExecutionHandler handler) {
+ this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
+ Executors.defaultThreadFactory(), handler);
+ }
+
+ /**
+ * Creates a new {@code ThreadPoolExecutor} with the given initial
+ * parameters.
+ *
+ * @param corePoolSize the number of threads to keep in the pool, even
+ * if they are idle, unless {@code allowCoreThreadTimeOut} is set
+ * @param maximumPoolSize the maximum number of threads to allow in the
+ * pool
+ * @param keepAliveTime when the number of threads is greater than
+ * the core, this is the maximum time that excess idle threads
+ * will wait for new tasks before terminating.
+ * @param unit the time unit for the {@code keepAliveTime} argument
+ * @param workQueue the queue to use for holding tasks before they are
+ * executed. This queue will hold only the {@code Runnable}
+ * tasks submitted by the {@code execute} method.
+ * @param threadFactory the factory to use when the executor
+ * creates a new thread
+ * @param handler the handler to use when execution is blocked
+ * because the thread bounds and queue capacities are reached
+ * @throws IllegalArgumentException if one of the following holds:<br>
+ * {@code corePoolSize < 0}<br>
+ * {@code keepAliveTime < 0}<br>
+ * {@code maximumPoolSize <= 0}<br>
+ * {@code maximumPoolSize < corePoolSize}
+ * @throws NullPointerException if {@code workQueue}
+ * or {@code threadFactory} or {@code handler} is null
+ */
+ public ThreadPoolExecutor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ ThreadFactory threadFactory,
+ RejectedExecutionHandler handler) {
+ if (corePoolSize < 0 ||
+ maximumPoolSize <= 0 ||
+ maximumPoolSize < corePoolSize ||
+ keepAliveTime < 0) {
+ throw new IllegalArgumentException();
+ }
+ if (workQueue == null || threadFactory == null || handler == null) {
+ throw new NullPointerException();
+ }
+ this.corePoolSize = corePoolSize;
+ this.maximumPoolSize = maximumPoolSize;
+ this.workQueue = workQueue;
+ this.keepAliveTime = unit.toNanos(keepAliveTime);
+ this.threadFactory = threadFactory;
+ this.handler = handler;
+ }
+
+
+ @Override
+ public void execute(Runnable command) {
+ execute(command,0,TimeUnit.MILLISECONDS);
+ }
+
+
+ /**
+ * Executes the given command at some time in the future. The command
+ * may execute in a new thread, in a pooled thread, or in the calling
+ * thread, at the discretion of the <code>Executor</code> implementation.
+ * If no threads are available, it will be added to the work queue.
+ * If the work queue is full, the system will wait for the specified
+ * time and it throw a RejectedExecutionException if the queue is still
+ * full after that.
+ *
+ * @param command the runnable task
+ * @param timeout A timeout for the completion of the task
+ * @param unit The timeout time unit
+ * @throws RejectedExecutionException if this task cannot be
+ * accepted for execution - the queue is full
+ * @throws NullPointerException if command or unit is null
+ *
+ * @deprecated This will be removed in Tomcat 10.1.x onwards
+ */
+ @Deprecated
+ public void execute(Runnable command, long timeout, TimeUnit unit) {
+ submittedCount.incrementAndGet();
+ try {
+ executeInternal(command);
+ } catch (RejectedExecutionException rx) {
+ if (getQueue() instanceof TaskQueue) {
+ // If the Executor is close to maximum pool size, concurrent
+ // calls to execute() may result (due to Tomcat's use of
+ // TaskQueue) in some tasks being rejected rather than queued.
+ // If this happens, add them to the queue.
+ final TaskQueue queue = (TaskQueue) getQueue();
+ try {
+ if (!queue.force(command, timeout, unit)) {
+ submittedCount.decrementAndGet();
+ throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
+ }
+ } catch (InterruptedException x) {
+ submittedCount.decrementAndGet();
+ throw new RejectedExecutionException(x);
+ }
+ } else {
+ submittedCount.decrementAndGet();
+ throw rx;
+ }
+ }
+ }
+
+
+ /**
+ * Executes the given task sometime in the future. The task
+ * may execute in a new thread or in an existing pooled thread.
+ *
+ * If the task cannot be submitted for execution, either because this
+ * executor has been shutdown or because its capacity has been reached,
+ * the task is handled by the current {@link RejectedExecutionHandler}.
+ *
+ * @param command the task to execute
+ * @throws RejectedExecutionException at discretion of
+ * {@code RejectedExecutionHandler}, if the task
+ * cannot be accepted for execution
+ * @throws NullPointerException if {@code command} is null
+ */
+ private void executeInternal(Runnable command) {
+ if (command == null) {
+ throw new NullPointerException();
+ }
+ /*
+ * Proceed in 3 steps:
+ *
+ * 1. If fewer than corePoolSize threads are running, try to
+ * start a new thread with the given command as its first
+ * task. The call to addWorker atomically checks runState and
+ * workerCount, and so prevents false alarms that would add
+ * threads when it shouldn't, by returning false.
+ *
+ * 2. If a task can be successfully queued, then we still need
+ * to double-check whether we should have added a thread
+ * (because existing ones died since last checking) or that
+ * the pool shut down since entry into this method. So we
+ * recheck state and if necessary roll back the enqueuing if
+ * stopped, or start a new thread if there are none.
+ *
+ * 3. If we cannot queue task, then we try to add a new
+ * thread. If it fails, we know we are shut down or saturated
+ * and so reject the task.
+ */
+ int c = ctl.get();
+ if (workerCountOf(c) < corePoolSize) {
+ if (addWorker(command, true)) {
+ return;
+ }
+ c = ctl.get();
+ }
+ if (isRunning(c) && workQueue.offer(command)) {
+ int recheck = ctl.get();
+ if (! isRunning(recheck) && remove(command)) {
+ reject(command);
+ } else if (workerCountOf(recheck) == 0) {
+ addWorker(null, false);
+ }
+ }
+ else if (!addWorker(command, false)) {
+ reject(command);
+ }
+ }
+
+ /**
+ * Initiates an orderly shutdown in which previously submitted
+ * tasks are executed, but no new tasks will be accepted.
+ * Invocation has no additional effect if already shut down.
+ *
+ * <p>This method does not wait for previously submitted tasks to
+ * complete execution. Use {@link #awaitTermination awaitTermination}
+ * to do that.
+ *
+ * @throws SecurityException {@inheritDoc}
+ */
+ @Override
+ public void shutdown() {
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ checkShutdownAccess();
+ advanceRunState(SHUTDOWN);
+ interruptIdleWorkers();
+ onShutdown(); // hook for ScheduledThreadPoolExecutor
+ } finally {
+ mainLock.unlock();
+ }
+ tryTerminate();
+ }
+
+ /**
+ * Attempts to stop all actively executing tasks, halts the
+ * processing of waiting tasks, and returns a list of the tasks
+ * that were awaiting execution. These tasks are drained (removed)
+ * from the task queue upon return from this method.
+ *
+ * <p>This method does not wait for actively executing tasks to
+ * terminate. Use {@link #awaitTermination awaitTermination} to
+ * do that.
+ *
+ * <p>There are no guarantees beyond best-effort attempts to stop
+ * processing actively executing tasks. This implementation
+ * interrupts tasks via {@link Thread#interrupt}; any task that
+ * fails to respond to interrupts may never terminate.
+ *
+ * @throws SecurityException {@inheritDoc}
+ */
+ @Override
+ public List<Runnable> shutdownNow() {
+ List<Runnable> tasks;
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ checkShutdownAccess();
+ advanceRunState(STOP);
+ interruptWorkers();
+ tasks = drainQueue();
+ } finally {
+ mainLock.unlock();
+ }
+ tryTerminate();
+ return tasks;
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return runStateAtLeast(ctl.get(), SHUTDOWN);
+ }
+
+ /** Used by ScheduledThreadPoolExecutor. */
+ boolean isStopped() {
+ return runStateAtLeast(ctl.get(), STOP);
+ }
+
+ /**
+ * Returns true if this executor is in the process of terminating
+ * after {@link #shutdown} or {@link #shutdownNow} but has not
+ * completely terminated. This method may be useful for
+ * debugging. A return of {@code true} reported a sufficient
+ * period after shutdown may indicate that submitted tasks have
+ * ignored or suppressed interruption, causing this executor not
+ * to properly terminate.
+ *
+ * @return {@code true} if terminating but not yet terminated
+ */
+ public boolean isTerminating() {
+ int c = ctl.get();
+ return runStateAtLeast(c, SHUTDOWN) && runStateLessThan(c, TERMINATED);
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return runStateAtLeast(ctl.get(), TERMINATED);
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ long nanos = unit.toNanos(timeout);
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ while (runStateLessThan(ctl.get(), TERMINATED)) {
+ if (nanos <= 0L) {
+ return false;
+ }
+ nanos = termination.awaitNanos(nanos);
+ }
+ return true;
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Sets the thread factory used to create new threads.
+ *
+ * @param threadFactory the new thread factory
+ * @throws NullPointerException if threadFactory is null
+ * @see #getThreadFactory
+ */
+ public void setThreadFactory(ThreadFactory threadFactory) {
+ if (threadFactory == null) {
+ throw new NullPointerException();
+ }
+ this.threadFactory = threadFactory;
+ }
+
+ /**
+ * Returns the thread factory used to create new threads.
+ *
+ * @return the current thread factory
+ * @see #setThreadFactory(ThreadFactory)
+ */
+ public ThreadFactory getThreadFactory() {
+ return threadFactory;
+ }
+
+ /**
+ * Sets a new handler for unexecutable tasks.
+ *
+ * @param handler the new handler
+ * @throws NullPointerException if handler is null
+ * @see #getRejectedExecutionHandler
+ */
+ public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
+ if (handler == null) {
+ throw new NullPointerException();
+ }
+ this.handler = handler;
+ }
+
+ /**
+ * Returns the current handler for unexecutable tasks.
+ *
+ * @return the current handler
+ * @see #setRejectedExecutionHandler(RejectedExecutionHandler)
+ */
+ public RejectedExecutionHandler getRejectedExecutionHandler() {
+ return handler;
+ }
+
+ /**
+ * Sets the core number of threads. This overrides any value set
+ * in the constructor. If the new value is smaller than the
+ * current value, excess existing threads will be terminated when
+ * they next become idle. If larger, new threads will, if needed,
+ * be started to execute any queued tasks.
+ *
+ * @param corePoolSize the new core size
+ * @throws IllegalArgumentException if {@code corePoolSize < 0}
+ * or {@code corePoolSize} is greater than the {@linkplain
+ * #getMaximumPoolSize() maximum pool size}
+ * @see #getCorePoolSize
+ */
+ public void setCorePoolSize(int corePoolSize) {
+ if (corePoolSize < 0 || maximumPoolSize < corePoolSize) {
+ throw new IllegalArgumentException();
+ }
+ int delta = corePoolSize - this.corePoolSize;
+ this.corePoolSize = corePoolSize;
+ if (workerCountOf(ctl.get()) > corePoolSize) {
+ interruptIdleWorkers();
+ } else if (delta > 0) {
+ // We don't really know how many new threads are "needed".
+ // As a heuristic, prestart enough new workers (up to new
+ // core size) to handle the current number of tasks in
+ // queue, but stop if queue becomes empty while doing so.
+ int k = Math.min(delta, workQueue.size());
+ while (k-- > 0 && addWorker(null, true)) {
+ if (workQueue.isEmpty()) {
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns the core number of threads.
+ *
+ * @return the core number of threads
+ * @see #setCorePoolSize
+ */
+ public int getCorePoolSize() {
+ return corePoolSize;
+ }
+
+ /**
+ * Starts a core thread, causing it to idly wait for work. This
+ * overrides the default policy of starting core threads only when
+ * new tasks are executed. This method will return {@code false}
+ * if all core threads have already been started.
+ *
+ * @return {@code true} if a thread was started
+ */
+ public boolean prestartCoreThread() {
+ return workerCountOf(ctl.get()) < corePoolSize &&
+ addWorker(null, true);
+ }
+
/**
- * The string manager for this package.
+ * Same as prestartCoreThread except arranges that at least one
+ * thread is started even if corePoolSize is 0.
*/
- protected static final StringManager sm = StringManager
- .getManager("org.apache.tomcat.util.threads.res");
+ void ensurePrestart() {
+ int wc = workerCountOf(ctl.get());
+ if (wc < corePoolSize) {
+ addWorker(null, true);
+ } else if (wc == 0) {
+ addWorker(null, false);
+ }
+ }
/**
- * The number of tasks submitted but not yet finished. This includes tasks
- * in the queue and tasks that have been handed to a worker thread but the
- * latter did not start executing the task yet.
- * This number is always greater or equal to {@link #getActiveCount()}.
+ * Starts all core threads, causing them to idly wait for work. This
+ * overrides the default policy of starting core threads only when
+ * new tasks are executed.
+ *
+ * @return the number of threads started
*/
- private final AtomicInteger submittedCount = new AtomicInteger(0);
- private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);
+ public int prestartAllCoreThreads() {
+ int n = 0;
+ while (addWorker(null, true)) {
+ ++n;
+ }
+ return n;
+ }
/**
- * Most recent time in ms when a thread decided to kill itself to avoid
- * potential memory leaks. Useful to throttle the rate of renewals of
- * threads.
+ * Returns true if this pool allows core threads to time out and
+ * terminate if no tasks arrive within the keepAlive time, being
+ * replaced if needed when new tasks arrive. When true, the same
+ * keep-alive policy applying to non-core threads applies also to
+ * core threads. When false (the default), core threads are never
+ * terminated due to lack of incoming tasks.
+ *
+ * @return {@code true} if core threads are allowed to time out,
+ * else {@code false}
+ *
+ * @since 1.6
*/
- private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L);
+ public boolean allowsCoreThreadTimeOut() {
+ return allowCoreThreadTimeOut;
+ }
/**
- * Delay in ms between 2 threads being renewed. If negative, do not renew threads.
+ * Sets the policy governing whether core threads may time out and
+ * terminate if no tasks arrive within the keep-alive time, being
+ * replaced if needed when new tasks arrive. When false, core
+ * threads are never terminated due to lack of incoming
+ * tasks. When true, the same keep-alive policy applying to
+ * non-core threads applies also to core threads. To avoid
+ * continual thread replacement, the keep-alive time must be
+ * greater than zero when setting {@code true}. This method
+ * should in general be called before the pool is actively used.
+ *
+ * @param value {@code true} if should time out, else {@code false}
+ * @throws IllegalArgumentException if value is {@code true}
+ * and the current keep-alive time is not greater than zero
+ *
+ * @since 1.6
*/
- private long threadRenewalDelay = Constants.DEFAULT_THREAD_RENEWAL_DELAY;
+ public void allowCoreThreadTimeOut(boolean value) {
+ if (value && keepAliveTime <= 0) {
+ throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
+ }
+ if (value != allowCoreThreadTimeOut) {
+ allowCoreThreadTimeOut = value;
+ if (value) {
+ interruptIdleWorkers();
+ }
+ }
+ }
- public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
- prestartAllCoreThreads();
+ /**
+ * Sets the maximum allowed number of threads. This overrides any
+ * value set in the constructor. If the new value is smaller than
+ * the current value, excess existing threads will be
+ * terminated when they next become idle.
+ *
+ * @param maximumPoolSize the new maximum
+ * @throws IllegalArgumentException if the new maximum is
+ * less than or equal to zero, or
+ * less than the {@linkplain #getCorePoolSize core pool size}
+ * @see #getMaximumPoolSize
+ */
+ public void setMaximumPoolSize(int maximumPoolSize) {
+ if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
+ throw new IllegalArgumentException();
+ }
+ this.maximumPoolSize = maximumPoolSize;
+ if (workerCountOf(ctl.get()) > maximumPoolSize) {
+ interruptIdleWorkers();
+ }
}
- public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
- RejectedExecutionHandler handler) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
- prestartAllCoreThreads();
+ /**
+ * Returns the maximum allowed number of threads.
+ *
+ * @return the maximum allowed number of threads
+ * @see #setMaximumPoolSize
+ */
+ public int getMaximumPoolSize() {
+ return maximumPoolSize;
}
- public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new RejectHandler());
- prestartAllCoreThreads();
+ /**
+ * Sets the thread keep-alive time, which is the amount of time
+ * that threads may remain idle before being terminated.
+ * Threads that wait this amount of time without processing a
+ * task will be terminated if there are more than the core
+ * number of threads currently in the pool, or if this pool
+ * {@linkplain #allowsCoreThreadTimeOut() allows core thread timeout}.
+ * This overrides any value set in the constructor.
+ *
+ * @param time the time to wait. A time value of zero will cause
+ * excess threads to terminate immediately after executing tasks.
+ * @param unit the time unit of the {@code time} argument
+ * @throws IllegalArgumentException if {@code time} less than zero or
+ * if {@code time} is zero and {@code allowsCoreThreadTimeOut}
+ * @see #getKeepAliveTime(TimeUnit)
+ */
+ public void setKeepAliveTime(long time, TimeUnit unit) {
+ if (time < 0) {
+ throw new IllegalArgumentException();
+ }
+ if (time == 0 && allowsCoreThreadTimeOut()) {
+ throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
+ }
+ long keepAliveTime = unit.toNanos(time);
+ long delta = keepAliveTime - this.keepAliveTime;
+ this.keepAliveTime = keepAliveTime;
+ if (delta < 0) {
+ interruptIdleWorkers();
+ }
}
- public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new RejectHandler());
- prestartAllCoreThreads();
+ /**
+ * Returns the thread keep-alive time, which is the amount of time
+ * that threads may remain idle before being terminated.
+ * Threads that wait this amount of time without processing a
+ * task will be terminated if there are more than the core
+ * number of threads currently in the pool, or if this pool
+ * {@linkplain #allowsCoreThreadTimeOut() allows core thread timeout}.
+ *
+ * @param unit the desired time unit of the result
+ * @return the time limit
+ * @see #setKeepAliveTime(long, TimeUnit)
+ */
+ public long getKeepAliveTime(TimeUnit unit) {
+ return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
}
+
public long getThreadRenewalDelay() {
return threadRenewalDelay;
}
+
public void setThreadRenewalDelay(long threadRenewalDelay) {
this.threadRenewalDelay = threadRenewalDelay;
}
+
+ /* User-level queue utilities */
+
+ /**
+ * Returns the task queue used by this executor. Access to the
+ * task queue is intended primarily for debugging and monitoring.
+ * This queue may be in active use. Retrieving the task queue
+ * does not prevent queued tasks from executing.
+ *
+ * @return the task queue
+ */
+ public BlockingQueue<Runnable> getQueue() {
+ return workQueue;
+ }
+
+ /**
+ * Removes this task from the executor's internal queue if it is
+ * present, thus causing it not to be run if it has not already
+ * started.
+ *
+ * <p>This method may be useful as one part of a cancellation
+ * scheme. It may fail to remove tasks that have been converted
+ * into other forms before being placed on the internal queue.
+ * For example, a task entered using {@code submit} might be
+ * converted into a form that maintains {@code Future} status.
+ * However, in such cases, method {@link #purge} may be used to
+ * remove those Futures that have been cancelled.
+ *
+ * @param task the task to remove
+ * @return {@code true} if the task was removed
+ */
+ public boolean remove(Runnable task) {
+ boolean removed = workQueue.remove(task);
+ tryTerminate(); // In case SHUTDOWN and now empty
+ return removed;
+ }
+
+ /**
+ * Tries to remove from the work queue all {@link Future}
+ * tasks that have been cancelled. This method can be useful as a
+ * storage reclamation operation, that has no other impact on
+ * functionality. Cancelled tasks are never executed, but may
+ * accumulate in work queues until worker threads can actively
+ * remove them. Invoking this method instead tries to remove them now.
+ * However, this method may fail to remove tasks in
+ * the presence of interference by other threads.
+ */
+ public void purge() {
+ final BlockingQueue<Runnable> q = workQueue;
+ try {
+ Iterator<Runnable> it = q.iterator();
+ while (it.hasNext()) {
+ Runnable r = it.next();
+ if (r instanceof Future<?> && ((Future<?>)r).isCancelled()) {
+ it.remove();
+ }
+ }
+ } catch (ConcurrentModificationException fallThrough) {
+ // Take slow path if we encounter interference during traversal.
+ // Make copy for traversal and call remove for cancelled entries.
+ // The slow path is more likely to be O(N*N).
+ for (Object r : q.toArray()) {
+ if (r instanceof Future<?> && ((Future<?>)r).isCancelled()) {
+ q.remove(r);
+ }
+ }
+ }
+
+ tryTerminate(); // In case SHUTDOWN and now empty
+ }
+
+
+ public void contextStopping() {
+ this.lastContextStoppedTime.set(System.currentTimeMillis());
+
+ // save the current pool parameters to restore them later
+ int savedCorePoolSize = this.getCorePoolSize();
+ TaskQueue taskQueue =
+ getQueue() instanceof TaskQueue ? (TaskQueue) getQueue() : null;
+ if (taskQueue != null) {
+ // note by slaurent : quite oddly threadPoolExecutor.setCorePoolSize
+ // checks that queue.remainingCapacity()==0. I did not understand
+ // why, but to get the intended effect of waking up idle threads, I
+ // temporarily fake this condition.
+ taskQueue.setForcedRemainingCapacity(0);
+ }
+
+ // setCorePoolSize(0) wakes idle threads
+ this.setCorePoolSize(0);
+
+ // TaskQueue.take() takes care of timing out, so that we are sure that
+ // all threads of the pool are renewed in a limited time, something like
+ // (threadKeepAlive + longest request time)
+
+ if (taskQueue != null) {
+ // ok, restore the state of the queue and pool
+ taskQueue.resetForcedRemainingCapacity();
+ }
+ this.setCorePoolSize(savedCorePoolSize);
+ }
+
+
+ /* Statistics */
+
+ /**
+ * Returns the current number of threads in the pool.
+ *
+ * @return the number of threads
+ */
+ public int getPoolSize() {
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ // Remove rare and surprising possibility of
+ // isTerminated() && getPoolSize() > 0
+ return runStateAtLeast(ctl.get(), TIDYING) ? 0
+ : workers.size();
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Returns the approximate number of threads that are actively
+ * executing tasks.
+ *
+ * @return the number of threads
+ */
+ public int getActiveCount() {
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ int n = 0;
+ for (Worker w : workers) {
+ if (w.isLocked()) {
+ ++n;
+ }
+ }
+ return n;
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Returns the largest number of threads that have ever
+ * simultaneously been in the pool.
+ *
+ * @return the number of threads
+ */
+ public int getLargestPoolSize() {
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ return largestPoolSize;
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Returns the approximate total number of tasks that have ever been
+ * scheduled for execution. Because the states of tasks and
+ * threads may change dynamically during computation, the returned
+ * value is only an approximation.
+ *
+ * @return the number of tasks
+ */
+ public long getTaskCount() {
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ long n = completedTaskCount;
+ for (Worker w : workers) {
+ n += w.completedTasks;
+ if (w.isLocked()) {
+ ++n;
+ }
+ }
+ return n + workQueue.size();
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Returns the approximate total number of tasks that have
+ * completed execution. Because the states of tasks and threads
+ * may change dynamically during computation, the returned value
+ * is only an approximation, but one that does not ever decrease
+ * across successive calls.
+ *
+ * @return the number of tasks
+ */
+ public long getCompletedTaskCount() {
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ long n = completedTaskCount;
+ for (Worker w : workers) {
+ n += w.completedTasks;
+ }
+ return n;
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+
+ public int getSubmittedCount() {
+ return submittedCount.get();
+ }
+
+
+ /**
+ * Returns a string identifying this pool, as well as its state,
+ * including indications of run state and estimated worker and
+ * task counts.
+ *
+ * @return a string identifying this pool, as well as its state
+ */
@Override
+ public String toString() {
+ long ncompleted;
+ int nworkers, nactive;
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ ncompleted = completedTaskCount;
+ nactive = 0;
+ nworkers = workers.size();
+ for (Worker w : workers) {
+ ncompleted += w.completedTasks;
+ if (w.isLocked()) {
+ ++nactive;
+ }
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ int c = ctl.get();
+ String runState =
+ isRunning(c) ? "Running" :
+ runStateAtLeast(c, TERMINATED) ? "Terminated" :
+ "Shutting down";
+ return super.toString() +
+ "[" + runState +
+ ", pool size = " + nworkers +
+ ", active threads = " + nactive +
+ ", queued tasks = " + workQueue.size() +
+ ", completed tasks = " + ncompleted +
+ "]";
+ }
+
+ /* Extension hooks */
+
+ /**
+ * Method invoked prior to executing the given Runnable in the
+ * given thread. This method is invoked by thread {@code t} that
+ * will execute task {@code r}, and may be used to re-initialize
+ * ThreadLocals, or to perform logging.
+ *
+ * <p>This implementation does nothing, but may be customized in
+ * subclasses. Note: To properly nest multiple overridings, subclasses
+ * should generally invoke {@code super.beforeExecute} at the end of
+ * this method.
+ *
+ * @param t the thread that will run task {@code r}
+ * @param r the task that will be executed
+ */
+ protected void beforeExecute(Thread t, Runnable r) { }
+
+
+ /**
+ * Method invoked upon completion of execution of the given Runnable.
+ * This method is invoked by the thread that executed the task. If
+ * non-null, the Throwable is the uncaught {@code RuntimeException}
+ * or {@code Error} that caused execution to terminate abruptly.
+ *
+ * <p>This implementation does nothing, but may be customized in
+ * subclasses. Note: To properly nest multiple overridings, subclasses
+ * should generally invoke {@code super.afterExecute} at the
+ * beginning of this method.
+ *
+ * <p><b>Note:</b> When actions are enclosed in tasks (such as
+ * {@link java.util.concurrent.FutureTask})
+ * either explicitly or via methods such as
+ * {@code submit}, these task objects catch and maintain
+ * computational exceptions, and so they do not cause abrupt
+ * termination, and the internal exceptions are <em>not</em>
+ * passed to this method. If you would like to trap both kinds of
+ * failures in this method, you can further probe for such cases,
+ * as in this sample subclass that prints either the direct cause
+ * or the underlying exception if a task has been aborted:
+ *
+ * <pre> {@code
+ * class ExtendedExecutor extends ThreadPoolExecutor {
+ * // ...
+ * protected void afterExecute(Runnable r, Throwable t) {
+ * super.afterExecute(r, t);
+ * if (t == null
+ * && r instanceof Future<?>
+ * && ((Future<?>)r).isDone()) {
+ * try {
+ * Object result = ((Future<?>) r).get();
+ * } catch (CancellationException ce) {
+ * t = ce;
+ * } catch (ExecutionException ee) {
+ * t = ee.getCause();
+ * } catch (InterruptedException ie) {
+ * // ignore/reset
+ * Thread.currentThread().interrupt();
+ * }
+ * }
+ * if (t != null)
+ * System.out.println(t);
+ * }
+ * }}</pre>
+ *
+ * @param r the runnable that has completed
+ * @param t the exception that caused termination, or null if
+ * execution completed normally
+ */
protected void afterExecute(Runnable r, Throwable t) {
// Throwing StopPooledThreadException is likely to cause this method to
// be called more than once for a given task based on the typical
@@ -105,6 +2196,7 @@ public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor
}
}
+
/**
* If the current thread was started before the last time when a context was
* stopped, an exception is thrown so that the current thread is stopped.
@@ -127,6 +2219,7 @@ public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor
}
}
+
protected boolean currentThreadShouldBeStopped() {
if (threadRenewalDelay >= 0
&& Thread.currentThread() instanceof TaskThread) {
@@ -139,102 +2232,160 @@ public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor
return false;
}
- public int getSubmittedCount() {
- return submittedCount.get();
- }
/**
- * {@inheritDoc}
+ * Method invoked when the Executor has terminated. Default
+ * implementation does nothing. Note: To properly nest multiple
+ * overridings, subclasses should generally invoke
+ * {@code super.terminated} within this method.
*/
- @Override
- public void execute(Runnable command) {
- execute(command,0,TimeUnit.MILLISECONDS);
+ protected void terminated() { }
+
+ /* Predefined RejectedExecutionHandlers */
+
+ /**
+ * A handler for rejected tasks that runs the rejected task
+ * directly in the calling thread of the {@code execute} method,
+ * unless the executor has been shut down, in which case the task
+ * is discarded.
+ */
+ public static class CallerRunsPolicy implements RejectedExecutionHandler {
+ /**
+ * Creates a {@code CallerRunsPolicy}.
+ */
+ public CallerRunsPolicy() { }
+
+ /**
+ * Executes task r in the caller's thread, unless the executor
+ * has been shut down, in which case the task is discarded.
+ *
+ * @param r the runnable task requested to be executed
+ * @param e the executor attempting to execute this task
+ */
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
+ if (!e.isShutdown()) {
+ r.run();
+ }
+ }
}
/**
- * Executes the given command at some time in the future. The command
- * may execute in a new thread, in a pooled thread, or in the calling
- * thread, at the discretion of the <code>Executor</code> implementation.
- * If no threads are available, it will be added to the work queue.
- * If the work queue is full, the system will wait for the specified
- * time and it throw a RejectedExecutionException if the queue is still
- * full after that.
+ * A handler for rejected tasks that throws a
+ * {@link RejectedExecutionException}.
*
- * @param command the runnable task
- * @param timeout A timeout for the completion of the task
- * @param unit The timeout time unit
- * @throws RejectedExecutionException if this task cannot be
- * accepted for execution - the queue is full
- * @throws NullPointerException if command or unit is null
- *
- * @deprecated This will be removed in Tomcat 10.1.x onwards
+ * This is the default handler for {@link ThreadPoolExecutor} and
+ * {@link ScheduledThreadPoolExecutor}.
*/
- @Deprecated
- public void execute(Runnable command, long timeout, TimeUnit unit) {
- submittedCount.incrementAndGet();
- try {
- super.execute(command);
- } catch (RejectedExecutionException rx) {
- if (super.getQueue() instanceof TaskQueue) {
- // If the Executor is close to maximum pool size, concurrent
- // calls to execute() may result (due to Tomcat's use of
- // TaskQueue) in some tasks being rejected rather than queued.
- // If this happens, add them to the queue.
- final TaskQueue queue = (TaskQueue)super.getQueue();
- try {
- if (!queue.force(command, timeout, unit)) {
- submittedCount.decrementAndGet();
- throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
- }
- } catch (InterruptedException x) {
- submittedCount.decrementAndGet();
- throw new RejectedExecutionException(x);
- }
- } else {
- submittedCount.decrementAndGet();
- throw rx;
- }
+ public static class AbortPolicy implements RejectedExecutionHandler {
+ /**
+ * Creates an {@code AbortPolicy}.
+ */
+ public AbortPolicy() { }
+ /**
+ * Always throws RejectedExecutionException.
+ *
+ * @param r the runnable task requested to be executed
+ * @param e the executor attempting to execute this task
+ * @throws RejectedExecutionException always
+ */
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
+ throw new RejectedExecutionException("Task " + r.toString() +
+ " rejected from " +
+ e.toString());
}
}
- public void contextStopping() {
- this.lastContextStoppedTime.set(System.currentTimeMillis());
+ /**
+ * A handler for rejected tasks that silently discards the
+ * rejected task.
+ */
+ public static class DiscardPolicy implements RejectedExecutionHandler {
+ /**
+ * Creates a {@code DiscardPolicy}.
+ */
+ public DiscardPolicy() { }
- // save the current pool parameters to restore them later
- int savedCorePoolSize = this.getCorePoolSize();
- TaskQueue taskQueue =
- getQueue() instanceof TaskQueue ? (TaskQueue) getQueue() : null;
- if (taskQueue != null) {
- // note by slaurent : quite oddly threadPoolExecutor.setCorePoolSize
- // checks that queue.remainingCapacity()==0. I did not understand
- // why, but to get the intended effect of waking up idle threads, I
- // temporarily fake this condition.
- taskQueue.setForcedRemainingCapacity(0);
+ /**
+ * Does nothing, which has the effect of discarding task r.
+ *
+ * @param r the runnable task requested to be executed
+ * @param e the executor attempting to execute this task
+ */
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
+ }
- // setCorePoolSize(0) wakes idle threads
- this.setCorePoolSize(0);
-
- // TaskQueue.take() takes care of timing out, so that we are sure that
- // all threads of the pool are renewed in a limited time, something like
- // (threadKeepAlive + longest request time)
+ /**
+ * A handler for rejected tasks that discards the oldest unhandled
+ * request and then retries {@code execute}, unless the executor
+ * is shut down, in which case the task is discarded. This policy is
+ * rarely useful in cases where other threads may be waiting for
+ * tasks to terminate, or failures must be recorded. Instead consider
+ * using a handler of the form:
+ * <pre> {@code
+ * new RejectedExecutionHandler() {
+ * public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
+ * Runnable dropped = e.getQueue().poll();
+ * if (dropped instanceof Future<?>) {
+ * ((Future<?>)dropped).cancel(false);
+ * // also consider logging the failure
+ * }
+ * e.execute(r); // retry
+ * }}}</pre>
+ */
+ public static class DiscardOldestPolicy implements RejectedExecutionHandler {
+ /**
+ * Creates a {@code DiscardOldestPolicy} for the given executor.
+ */
+ public DiscardOldestPolicy() { }
- if (taskQueue != null) {
- // ok, restore the state of the queue and pool
- taskQueue.resetForcedRemainingCapacity();
+ /**
+ * Obtains and ignores the next task that the executor
+ * would otherwise execute, if one is immediately available,
+ * and then retries execution of task r, unless the executor
+ * is shut down, in which case task r is instead discarded.
+ *
+ * @param r the runnable task requested to be executed
+ * @param e the executor attempting to execute this task
+ */
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
+ if (!e.isShutdown()) {
+ e.getQueue().poll();
+ e.execute(r);
+ }
}
- this.setCorePoolSize(savedCorePoolSize);
}
private static class RejectHandler implements RejectedExecutionHandler {
@Override
- public void rejectedExecution(Runnable r,
- java.util.concurrent.ThreadPoolExecutor executor) {
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new RejectedExecutionException();
}
-
}
-}
+ public interface RejectedExecutionHandler {
+
+ /**
+ * Method that may be invoked by a {@link ThreadPoolExecutor} when
+ * {@link ThreadPoolExecutor#execute execute} cannot accept a
+ * task. This may occur when no more threads or queue slots are
+ * available because their bounds would be exceeded, or upon
+ * shutdown of the Executor.
+ *
+ * <p>In the absence of other alternatives, the method may throw
+ * an unchecked {@link RejectedExecutionException}, which will be
+ * propagated to the caller of {@code execute}.
+ *
+ * @param r the runnable task requested to be executed
+ * @param executor the executor attempting to execute this task
+ * @throws RejectedExecutionException if there is no remedy
+ */
+ void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
+ }
+}
\ No newline at end of file
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 3a234be..0bbed3b 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -146,6 +146,12 @@
associated with the response until the connection timed out at which
point the final packet would be sent and the connection closed. (markt)
</fix>
+ <fix>
+ <bug>65454</bug>: Fix a race condition that could result in a delay to
+ a new request. The new request could be queued to wait for an existing
+ request to finish processing rather than the thread pool creating a new
+ thread to process the new request. (markt)
+ </fix>
</changelog>
</subsection>
<subsection name="Web applications">
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org