You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by ga...@apache.org on 2008/03/07 20:56:28 UTC
svn commit: r634792 [4/9] - in /geronimo/sandbox/concurrent: ./
concurrent-deployer/ concurrent-deployer/src/ concurrent-deployer/src/main/
concurrent-deployer/src/main/plan/ concurrent/ concurrent/src/
concurrent/src/main/ concurrent/src/main/plan/ ge...
Added: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/harmony/FutureTask.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/harmony/FutureTask.java?rev=634792&view=auto
==============================================================================
--- geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/harmony/FutureTask.java (added)
+++ geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/harmony/FutureTask.java Fri Mar 7 11:56:14 2008
@@ -0,0 +1,334 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+/*
+ * Contains the following modifications:
+ * 1) support for skipped state (for periodic tasks)
+ * 2) support for taskStart() and taskDone() callbacks
+ */
+
+package org.apache.geronimo.concurrent.harmony;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+import javax.util.concurrent.SkippedException;
+
+/**
+ * A cancellable asynchronous computation. This class provides a base
+ * implementation of {@link Future}, with methods to start and cancel
+ * a computation, query to see if the computation is complete, and
+ * retrieve the result of the computation. The result can only be
+ * retrieved when the computation has completed; the <tt>get</tt>
+ * method will block if the computation has not yet completed. Once
+ * the computation has completed, the computation cannot be restarted
+ * or cancelled.
+ *
+ * <p>A <tt>FutureTask</tt> can be used to wrap a {@link Callable} or
+ * {@link java.lang.Runnable} object. Because <tt>FutureTask</tt>
+ * implements <tt>Runnable</tt>, a <tt>FutureTask</tt> can be
+ * submitted to an {@link Executor} for execution.
+ *
+ * <p>In addition to serving as a standalone class, this class provides
+ * <tt>protected</tt> functionality that may be useful when creating
+ * customized task classes.
+ *
+ * @since 1.5
+ * @author Doug Lea
+ * @param <V> The result type returned by this FutureTask's <tt>get</tt> method
+ */
+public class FutureTask<V> implements Future<V>, Runnable {
+ /** Synchronization control for FutureTask */
+ private final Sync sync;
+
+ /**
+ * Creates a <tt>FutureTask</tt> that will upon running, execute the
+ * given <tt>Callable</tt>.
+ *
+ * @param callable the callable task
+ * @throws NullPointerException if callable is null
+ */
+ public FutureTask(Callable<V> callable) {
+ if (callable == null)
+ throw new NullPointerException();
+ sync = new Sync(callable);
+ }
+
+ /**
+ * Creates a <tt>FutureTask</tt> that will upon running, execute the
+ * given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return the
+ * given result on successful completion.
+ *
+ * @param runnable the runnable task
+ * @param result the result to return on successful completion. If
+ * you don't need a particular result, consider using
+ * constructions of the form:
+ * <tt>Future<?> f = new FutureTask<Object>(runnable, null)</tt>
+ * @throws NullPointerException if runnable is null
+ */
+ public FutureTask(Runnable runnable, V result) {
+ sync = new Sync(Executors.callable(runnable, result));
+ }
+
+ public boolean isCancelled() {
+ return sync.innerIsCancelled();
+ }
+
+ public boolean isDone() {
+ return sync.innerIsDone();
+ }
+
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return sync.innerCancel(mayInterruptIfRunning);
+ }
+
+ public V get() throws InterruptedException, ExecutionException {
+ return sync.innerGet();
+ }
+
+ public V get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return sync.innerGet(unit.toNanos(timeout));
+ }
+
+ /**
+ * Protected method invoked when this task transitions to state
+ * <tt>isDone</tt> (whether normally or via cancellation). The
+ * default implementation does nothing. Subclasses may override
+ * this method to invoke completion callbacks or perform
+ * bookkeeping. Note that you can query status inside the
+ * implementation of this method to determine whether this task
+ * has been cancelled.
+ */
+ protected void done() { }
+
+ /**
+ * Sets the result of this Future to the given value unless
+ * this future has already been set or has been cancelled.
+ * @param v the value
+ */
+ protected void set(V v) {
+ sync.innerSet(v);
+ }
+
+ /**
+ * Causes this future to report an <tt>ExecutionException</tt>
+ * with the given throwable as its cause, unless this Future has
+ * already been set or has been cancelled.
+ * @param t the cause of failure.
+ */
+ protected void setException(Throwable t) {
+ sync.innerSetException(t);
+ }
+
+ /**
+ * Sets this Future to the result of computation unless
+ * it has been cancelled.
+ */
+ public void run() {
+ sync.innerRun();
+ }
+
+ /**
+ * Executes the computation without setting its result, and then
+ * resets this Future to initial state, failing to do so if the
+ * computation encounters an exception or is cancelled. This is
+ * designed for use with tasks that intrinsically execute more
+ * than once.
+ * @return true if successfully run and reset
+ */
+ protected boolean runAndReset() {
+ return sync.innerRunAndReset();
+ }
+
+ public boolean isSkipped() {
+ return sync.innerIsSkipped();
+ }
+
+ protected boolean setSkipped() {
+ return sync.innerSetSkipped();
+ }
+
+ protected void taskStart() {
+ }
+
+ protected void taskDone(Throwable exception) {
+ }
+
+ /**
+ * Synchronization control for FutureTask. Note that this must be
+ * a non-static inner class in order to invoke the protected
+ * <tt>done</tt> method. For clarity, all inner class support
+ * methods are same as outer, prefixed with "inner".
+ *
+ * Uses AQS sync state to represent run status
+ */
+ private final class Sync extends AbstractQueuedSynchronizer {
+ /** State value representing that task is running */
+ private static final int RUNNING = 1;
+ /** State value representing that task ran */
+ private static final int RAN = 2;
+ /** State value representing that task was cancelled */
+ private static final int CANCELLED = 4;
+
+ private static final int SKIPPED = 8;
+
+ /** The underlying callable */
+ private final Callable<V> callable;
+ /** The result to return from get() */
+ private V result;
+ /** The exception to throw from get() */
+ private Throwable exception;
+
+ /**
+ * The thread running task. When nulled after set/cancel, this
+ * indicates that the results are accessible. Must be
+ * volatile, to serve as write barrier on completion.
+ */
+ private volatile Thread runner;
+
+ Sync(Callable<V> callable) {
+ this.callable = callable;
+ }
+
+ private boolean ranOrCancelled(int state) {
+ return (state & (RAN | CANCELLED)) != 0;
+ }
+
+ /**
+ * Implements AQS base acquire to succeed if ran or cancelled
+ */
+ protected int tryAcquireShared(int ignore) {
+ return (innerIsDone() || innerIsSkipped()) ? 1 : -1;
+ }
+
+ /**
+ * Implements AQS base release to always signal after setting
+ * final done status by nulling runner thread.
+ */
+ protected boolean tryReleaseShared(int ignore) {
+ runner = null;
+ return true;
+ }
+
+ boolean innerIsCancelled() {
+ return getState() == CANCELLED;
+ }
+
+ boolean innerIsDone() {
+ return ranOrCancelled(getState()) && runner == null;
+ }
+
+ V innerGet() throws InterruptedException, ExecutionException {
+ acquireSharedInterruptibly(0);
+ if (getState() == CANCELLED)
+ throw new CancellationException();
+ if (getState() == SKIPPED) {
+ throw new SkippedException();
+ }
+ if (exception != null)
+ throw new ExecutionException(exception);
+ return result;
+ }
+
+ V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
+ if (!tryAcquireSharedNanos(0, nanosTimeout))
+ throw new TimeoutException();
+ if (getState() == CANCELLED)
+ throw new CancellationException();
+ if (getState() == SKIPPED) {
+ throw new SkippedException();
+ }
+ if (exception != null)
+ throw new ExecutionException(exception);
+ return result;
+ }
+
+ void innerSet(V v) {
+ int s = getState();
+ if (ranOrCancelled(s) || !compareAndSetState(s, RAN))
+ return;
+ result = v;
+ releaseShared(0);
+ done();
+ }
+
+ void innerSetException(Throwable t) {
+ int s = getState();
+ if (ranOrCancelled(s) || !compareAndSetState(s, RAN))
+ return;
+ exception = t;
+ result = null;
+ releaseShared(0);
+ done();
+ }
+
+ boolean innerCancel(boolean mayInterruptIfRunning) {
+ int s = getState();
+ if (ranOrCancelled(s) || !compareAndSetState(s, CANCELLED))
+ return false;
+ if (mayInterruptIfRunning) {
+ Thread r = runner;
+ if (r != null)
+ r.interrupt();
+ }
+ releaseShared(0);
+ done();
+ return true;
+ }
+
+ boolean innerIsSkipped() {
+ return getState() == SKIPPED;
+ }
+
+ boolean innerSetSkipped() {
+ int s = getState();
+ if (ranOrCancelled(s) || !compareAndSetState(s, SKIPPED)) {
+ return false;
+ }
+ releaseShared(0);
+ return true;
+ }
+
+ void innerRun() {
+ if (!compareAndSetState(0, RUNNING))
+ return;
+ try {
+ taskStart();
+ runner = Thread.currentThread();
+ V v = callable.call();
+ taskDone(null);
+ innerSet(v);
+ } catch(Throwable ex) {
+ taskDone(ex);
+ innerSetException(ex);
+ }
+ }
+
+ boolean innerRunAndReset() {
+ if (!(compareAndSetState(0, RUNNING) || compareAndSetState(SKIPPED, RUNNING)))
+ return false;
+ try {
+ taskStart();
+ runner = Thread.currentThread();
+ callable.call(); // don't set result
+ runner = null;
+ taskDone(null);
+ return compareAndSetState(RUNNING, 0);
+ } catch(Throwable ex) {
+ taskDone(ex);
+ innerSetException(ex);
+ return false;
+ }
+ }
+ }
+}
Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/harmony/FutureTask.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/harmony/RejectedExecutionHandler.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/harmony/RejectedExecutionHandler.java?rev=634792&view=auto
==============================================================================
--- geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/harmony/RejectedExecutionHandler.java (added)
+++ geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/harmony/RejectedExecutionHandler.java Fri Mar 7 11:56:14 2008
@@ -0,0 +1,33 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package org.apache.geronimo.concurrent.harmony;
+
+/**
+ * A handler for tasks that cannot be executed by a {@link
+ * ThreadPoolExecutor}.
+ *
+ * @since 1.5
+ * @author Doug Lea
+ */
+public interface RejectedExecutionHandler {
+
+ /**
+ * Method that may be invoked by a {@link ThreadPoolExecutor} when
+ * <tt>execute</tt> cannot accept a task. This may occur when no
+ * more threads or queue slots are available because their bounds
+ * would be exceeded, or upon shutdown of the Executor.
+ *
+ * In the absence other alternatives, the method may throw an
+ * unchecked {@link RejectedExecutionException}, which will be
+ * propagated to the caller of <tt>execute</tt>.
+ *
+ * @param r the runnable task requested to be executed
+ * @param executor the executor attempting to execute this task
+ * @throws RejectedExecutionException if there is no remedy
+ */
+ void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
+}
Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/harmony/RejectedExecutionHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/harmony/ThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/harmony/ThreadPoolExecutor.java?rev=634792&view=auto
==============================================================================
--- geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/harmony/ThreadPoolExecutor.java (added)
+++ geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/harmony/ThreadPoolExecutor.java Fri Mar 7 11:56:14 2008
@@ -0,0 +1,1522 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package org.apache.geronimo.concurrent.harmony;
+
+import java.util.Arrays;
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+/**
+ * An {@link ExecutorService} that executes each submitted task using
+ * one of possibly several pooled threads, normally configured
+ * using {@link Executors} factory methods.
+ *
+ * <p>Thread pools address two different problems: they usually
+ * provide improved performance when executing large numbers of
+ * asynchronous tasks, due to reduced per-task invocation overhead,
+ * and they provide a means of bounding and managing the resources,
+ * including threads, consumed when executing a collection of tasks.
+ * Each <tt>ThreadPoolExecutor</tt> also maintains some basic
+ * statistics, such as the number of completed tasks.
+ *
+ * <p>To be useful across a wide range of contexts, this class
+ * provides many adjustable parameters and extensibility
+ * hooks. However, programmers are urged to use the more convenient
+ * {@link Executors} factory methods {@link
+ * Executors#newCachedThreadPool} (unbounded thread pool, with
+ * automatic thread reclamation), {@link Executors#newFixedThreadPool}
+ * (fixed size thread pool) and {@link
+ * Executors#newSingleThreadExecutor} (single background thread), that
+ * preconfigure settings for the most common usage
+ * scenarios. Otherwise, use the following guide when manually
+ * configuring and tuning this class:
+ *
+ * <dl>
+ *
+ * <dt>Core and maximum pool sizes</dt>
+ *
+ * <dd>A <tt>ThreadPoolExecutor</tt> will automatically adjust the
+ * pool size
+ * (see {@link ThreadPoolExecutor#getPoolSize})
+ * according to the bounds set by corePoolSize
+ * (see {@link ThreadPoolExecutor#getCorePoolSize})
+ * and
+ * maximumPoolSize
+ * (see {@link ThreadPoolExecutor#getMaximumPoolSize}).
+ * When a new task is submitted in method {@link
+ * ThreadPoolExecutor#execute}, and fewer than corePoolSize threads
+ * are running, a new thread is created to handle the request, even if
+ * other worker threads are idle. If there are more than
+ * corePoolSize but less than maximumPoolSize threads running, a new
+ * thread will be created only if the queue is full. By setting
+ * corePoolSize and maximumPoolSize the same, you create a fixed-size
+ * thread pool. By setting maximumPoolSize to an essentially unbounded
+ * value such as <tt>Integer.MAX_VALUE</tt>, you allow the pool to
+ * accommodate an arbitrary number of concurrent tasks. Most typically,
+ * core and maximum pool sizes are set only upon construction, but they
+ * may also be changed dynamically using {@link
+ * ThreadPoolExecutor#setCorePoolSize} and {@link
+ * ThreadPoolExecutor#setMaximumPoolSize}. <dd>
+ *
+ * <dt> On-demand construction
+ *
+ * <dd> By default, even core threads are initially created and
+ * started only when needed by new tasks, but this can be overridden
+ * dynamically using method {@link
+ * ThreadPoolExecutor#prestartCoreThread} or
+ * {@link ThreadPoolExecutor#prestartAllCoreThreads}. </dd>
+ *
+ * <dt>Creating new threads</dt>
+ *
+ * <dd>New threads are created using a {@link
+ * java.util.concurrent.ThreadFactory}. If not otherwise specified, a
+ * {@link Executors#defaultThreadFactory} is used, that creates threads to all
+ * be in the same {@link ThreadGroup} and with the same
+ * <tt>NORM_PRIORITY</tt> priority and non-daemon status. By supplying
+ * a different ThreadFactory, you can alter the thread's name, thread
+ * group, priority, daemon status, etc. </dd>
+ *
+ * <dt>Keep-alive times</dt>
+ *
+ * <dd>If the pool currently has more than corePoolSize threads,
+ * excess threads will be terminated if they have been idle for more
+ * than the keepAliveTime (see {@link
+ * ThreadPoolExecutor#getKeepAliveTime}). This provides a means of
+ * reducing resource consumption when the pool is not being actively
+ * used. If the pool becomes more active later, new threads will be
+ * constructed. This parameter can also be changed dynamically
+ * using method {@link ThreadPoolExecutor#setKeepAliveTime}. Using
+ * a value of <tt>Long.MAX_VALUE</tt> {@link TimeUnit#NANOSECONDS}
+ * effectively disables idle threads from ever terminating prior
+ * to shut down.
+ * </dd>
+ *
+ * <dt>Queuing</dt>
+ *
+ * <dd>Any {@link BlockingQueue} may be used to transfer and hold
+ * submitted tasks. The use of this queue interacts with pool sizing:
+ *
+ * <ul>
+ *
+ * <li> If fewer than corePoolSize threads are running, the Executor
+ * always prefers adding a new thread
+ * rather than queuing.</li>
+ *
+ * <li> If corePoolSize or more threads are running, the Executor
+ * always prefers queuing a request rather than adding a new
+ * thread.</li>
+ *
+ * <li> If a request cannot be queued, a new thread is created unless
+ * this would exceed maximumPoolSize, in which case, the task will be
+ * rejected.</li>
+ *
+ * </ul>
+ *
+ * There are three general strategies for queuing:
+ * <ol>
+ *
+ * <li> <em> Direct handoffs.</em> A good default choice for a work
+ * queue is a {@link SynchronousQueue} that hands off tasks to threads
+ * without otherwise holding them. Here, an attempt to queue a task
+ * will fail if no threads are immediately available to run it, so a
+ * new thread will be constructed. This policy avoids lockups when
+ * handling sets of requests that might have internal dependencies.
+ * Direct handoffs generally require unbounded maximumPoolSizes to
+ * avoid rejection of new submitted tasks. This in turn admits the
+ * possibility of unbounded thread growth when commands continue to
+ * arrive on average faster than they can be processed. </li>
+ *
+ * <li><em> Unbounded queues.</em> Using an unbounded queue (for
+ * example a {@link LinkedBlockingQueue} without a predefined
+ * capacity) will cause new tasks to be queued in cases where all
+ * corePoolSize threads are busy. Thus, no more than corePoolSize
+ * threads will ever be created. (And the value of the maximumPoolSize
+ * therefore doesn't have any effect.) This may be appropriate when
+ * each task is completely independent of others, so tasks cannot
+ * affect each others execution; for example, in a web page server.
+ * While this style of queuing can be useful in smoothing out
+ * transient bursts of requests, it admits the possibility of
+ * unbounded work queue growth when commands continue to arrive on
+ * average faster than they can be processed. </li>
+ *
+ * <li><em>Bounded queues.</em> A bounded queue (for example, an
+ * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when
+ * used with finite maximumPoolSizes, but can be more difficult to
+ * tune and control. Queue sizes and maximum pool sizes may be traded
+ * off for each other: Using large queues and small pools minimizes
+ * CPU usage, OS resources, and context-switching overhead, but can
+ * lead to artificially low throughput. If tasks frequently block (for
+ * example if they are I/O bound), a system may be able to schedule
+ * time for more threads than you otherwise allow. Use of small queues
+ * generally requires larger pool sizes, which keeps CPUs busier but
+ * may encounter unacceptable scheduling overhead, which also
+ * decreases throughput. </li>
+ *
+ * </ol>
+ *
+ * </dd>
+ *
+ * <dt>Rejected tasks</dt>
+ *
+ * <dd> New tasks submitted in method {@link
+ * ThreadPoolExecutor#execute} will be <em>rejected</em> when the
+ * Executor has been shut down, and also when the Executor uses finite
+ * bounds for both maximum threads and work queue capacity, and is
+ * saturated. In either case, the <tt>execute</tt> method invokes the
+ * {@link RejectedExecutionHandler#rejectedExecution} method of its
+ * {@link RejectedExecutionHandler}. Four predefined handler policies
+ * are provided:
+ *
+ * <ol>
+ *
+ * <li> In the
+ * default {@link ThreadPoolExecutor.AbortPolicy}, the handler throws a
+ * runtime {@link RejectedExecutionException} upon rejection. </li>
+ *
+ * <li> In {@link
+ * ThreadPoolExecutor.CallerRunsPolicy}, the thread that invokes
+ * <tt>execute</tt> itself runs the task. This provides a simple
+ * feedback control mechanism that will slow down the rate that new
+ * tasks are submitted. </li>
+ *
+ * <li> In {@link ThreadPoolExecutor.DiscardPolicy},
+ * a task that cannot be executed is simply dropped. </li>
+ *
+ * <li>In {@link
+ * ThreadPoolExecutor.DiscardOldestPolicy}, if the executor is not
+ * shut down, the task at the head of the work queue is dropped, and
+ * then execution is retried (which can fail again, causing this to be
+ * repeated.) </li>
+ *
+ * </ol>
+ *
+ * It is possible to define and use other kinds of {@link
+ * RejectedExecutionHandler} classes. Doing so requires some care
+ * especially when policies are designed to work only under particular
+ * capacity or queuing policies. </dd>
+ *
+ * <dt>Hook methods</dt>
+ *
+ * <dd>This class provides <tt>protected</tt> overridable {@link
+ * ThreadPoolExecutor#beforeExecute} and {@link
+ * ThreadPoolExecutor#afterExecute} methods that are called before and
+ * after execution of each task. These can be used to manipulate the
+ * execution environment, for example, reinitializing ThreadLocals,
+ * gathering statistics, or adding log entries. Additionally, method
+ * {@link ThreadPoolExecutor#terminated} can be overridden to perform
+ * any special processing that needs to be done once the Executor has
+ * fully terminated.</dd>
+ *
+ * <dt>Queue maintenance</dt>
+ *
+ * <dd> Method {@link ThreadPoolExecutor#getQueue} allows access to
+ * the work queue for purposes of monitoring and debugging. Use of
+ * this method for any other purpose is strongly discouraged. Two
+ * supplied methods, {@link ThreadPoolExecutor#remove} and {@link
+ * ThreadPoolExecutor#purge} are available to assist in storage
+ * reclamation when large numbers of queued tasks become
+ * cancelled.</dd> </dl>
+ *
+ * <p> <b>Extension example</b>. Most extensions of this class
+ * override one or more of the protected hook methods. For example,
+ * here is a subclass that adds a simple pause/resume feature:
+ *
+ * <pre>
+ * class PausableThreadPoolExecutor extends ThreadPoolExecutor {
+ * private boolean isPaused;
+ * private ReentrantLock pauseLock = new ReentrantLock();
+ * private Condition unpaused = pauseLock.newCondition();
+ *
+ * public PausableThreadPoolExecutor(...) { super(...); }
+ *
+ * protected void beforeExecute(Thread t, Runnable r) {
+ * super.beforeExecute(t, r);
+ * pauseLock.lock();
+ * try {
+ * while (isPaused) unpaused.await();
+ * } catch(InterruptedException ie) {
+ * t.interrupt();
+ * } finally {
+ * pauseLock.unlock();
+ * }
+ * }
+ *
+ * public void pause() {
+ * pauseLock.lock();
+ * try {
+ * isPaused = true;
+ * } finally {
+ * pauseLock.unlock();
+ * }
+ * }
+ *
+ * public void resume() {
+ * pauseLock.lock();
+ * try {
+ * isPaused = false;
+ * unpaused.signalAll();
+ * } finally {
+ * pauseLock.unlock();
+ * }
+ * }
+ * }
+ * </pre>
+ * @since 1.5
+ * @author Doug Lea
+ */
+public class ThreadPoolExecutor extends AbstractExecutorService {
+ /**
+ * Only used to force toArray() to produce a Runnable[].
+ */
+ private static final Runnable[] EMPTY_RUNNABLE_ARRAY = new Runnable[0];
+
+ /**
+ * Permission for checking shutdown
+ */
+ private static final RuntimePermission shutdownPerm =
+ new RuntimePermission("modifyThread");
+
+ /**
+ * Queue used for holding tasks and handing off to worker threads.
+ */
+ private final BlockingQueue<Runnable> workQueue;
+
+ /**
+ * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and
+ * workers set.
+ */
+ private final ReentrantLock mainLock = new ReentrantLock();
+
+ /**
+ * Wait condition to support awaitTermination
+ */
+ private final Condition termination = mainLock.newCondition();
+
+ /**
+ * Set containing all worker threads in pool.
+ */
+ private final HashSet<Worker> workers = new HashSet<Worker>();
+
+ /**
+ * Timeout in nanoseconds for idle threads waiting for work.
+ * Threads use this timeout only when there are more than
+ * corePoolSize present. Otherwise they wait forever for new work.
+ */
+ private volatile long keepAliveTime;
+
+ /**
+ * Core pool size, updated only while holding mainLock,
+ * but volatile to allow concurrent readability even
+ * during updates.
+ */
+ private volatile int corePoolSize;
+
+ /**
+ * Maximum pool size, updated only while holding mainLock
+ * but volatile to allow concurrent readability even
+ * during updates.
+ */
+ private volatile int maximumPoolSize;
+
+ /**
+ * Current pool size, updated only while holding mainLock
+ * but volatile to allow concurrent readability even
+ * during updates.
+ */
+ private volatile int poolSize;
+
+ /**
+ * Lifecycle state
+ */
+ volatile int runState;
+
+ // Special values for runState
+ /** Normal, not-shutdown mode */
+ static final int RUNNING = 0;
+ /** Controlled shutdown mode */
+ static final int SHUTDOWN = 1;
+ /** Immediate shutdown mode */
+ static final int STOP = 2;
+ /** Final state */
+ static final int TERMINATED = 3;
+
+ /**
+ * Handler called when saturated or shutdown in execute.
+ */
+ private volatile RejectedExecutionHandler handler;
+
+ /**
+ * Factory for new threads.
+ */
+ private volatile ThreadFactory threadFactory;
+
+ /**
+ * Tracks largest attained pool size.
+ */
+ private int largestPoolSize;
+
+ /**
+ * Counter for completed tasks. Updated only on termination of
+ * worker threads.
+ */
+ private long completedTaskCount;
+
+ /**
+ * The default rejected execution handler
+ */
+ private static final RejectedExecutionHandler defaultHandler =
+ new AbortPolicy();
+
+ /**
+ * Invoke the rejected execution handler for the given command.
+ */
+ protected void reject(Runnable command) {
+ handler.rejectedExecution(command, this);
+ }
+
+ /**
+ * Create and return a new thread running firstTask as its first
+ * task. Call only while holding mainLock
+ * @param firstTask the task the new thread should run first (or
+ * null if none)
+ * @return the new thread
+ */
+ private Thread addThread(Runnable firstTask) {
+ Worker w = new Worker(firstTask);
+ Thread t = threadFactory.newThread(w);
+ w.thread = t;
+ workers.add(w);
+ int nt = ++poolSize;
+ if (nt > largestPoolSize)
+ largestPoolSize = nt;
+ return t;
+ }
+
+ /**
+ * Create and start a new thread running firstTask as its first
+ * task, only if fewer than corePoolSize threads are running.
+ * @param firstTask the task the new thread should run first (or
+ * null if none)
+ * @return true if successful.
+ */
+ private boolean addIfUnderCorePoolSize(Runnable firstTask) {
+ Thread t = null;
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ if (poolSize < corePoolSize)
+ t = addThread(firstTask);
+ } finally {
+ mainLock.unlock();
+ }
+ if (t == null)
+ return false;
+ t.start();
+ return true;
+ }
+
+ /**
+ * Create and start a new thread only if fewer than maximumPoolSize
+ * threads are running. The new thread runs as its first task the
+ * next task in queue, or if there is none, the given task.
+ * @param firstTask the task the new thread should run first (or
+ * null if none)
+ * @return null on failure, else the first task to be run by new thread.
+ */
+ private Runnable addIfUnderMaximumPoolSize(Runnable firstTask) {
+ Thread t = null;
+ Runnable next = null;
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ if (poolSize < maximumPoolSize) {
+ next = workQueue.poll();
+ if (next == null)
+ next = firstTask;
+ t = addThread(next);
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ if (t == null)
+ return null;
+ t.start();
+ return next;
+ }
+
+
+ /**
+ * Get the next task for a worker thread to run.
+ * @return the task
+ * @throws InterruptedException if interrupted while waiting for task
+ */
+ Runnable getTask() throws InterruptedException {
+ for (;;) {
+ switch(runState) {
+ case RUNNING: {
+ if (poolSize <= corePoolSize) // untimed wait if core
+ return workQueue.take();
+
+ long timeout = keepAliveTime;
+ if (timeout <= 0) // die immediately for 0 timeout
+ return null;
+ Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS);
+ if (r != null)
+ return r;
+ if (poolSize > corePoolSize) // timed out
+ return null;
+ // else, after timeout, pool shrank so shouldn't die, so retry
+ break;
+ }
+
+ case SHUTDOWN: {
+ // Help drain queue
+ Runnable r = workQueue.poll();
+ if (r != null)
+ return r;
+
+ // Check if can terminate
+ if (workQueue.isEmpty()) {
+ interruptIdleWorkers();
+ return null;
+ }
+
+ // There could still be delayed tasks in queue.
+ // Wait for one, re-checking state upon interruption
+ try {
+ return workQueue.take();
+ } catch(InterruptedException ignore) {}
+ break;
+ }
+
+ case STOP:
+ return null;
+ default:
+ assert false;
+ }
+ }
+ }
+
+ /**
+ * Wake up all threads that might be waiting for tasks.
+ */
+ protected void interruptIdleWorkers() {
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ for (Worker w : workers)
+ w.interruptIfIdle();
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Perform bookkeeping for a terminated worker thread.
+ * @param w the worker
+ */
+ void workerDone(Worker w) {
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ completedTaskCount += w.completedTasks;
+ workers.remove(w);
+ if (--poolSize > 0)
+ return;
+
+ // Else, this is the last thread. Deal with potential shutdown.
+
+ int state = runState;
+ assert state != TERMINATED;
+
+ if (state != STOP) {
+ // If there are queued tasks but no threads, create
+ // replacement.
+ Runnable r = workQueue.poll();
+ if (r != null) {
+ addThread(r).start();
+ return;
+ }
+
+ // If there are some (presumably delayed) tasks but
+ // none pollable, create an idle replacement to wait.
+ if (!workQueue.isEmpty()) {
+ addThread(null).start();
+ return;
+ }
+
+ // Otherwise, we can exit without replacement
+ if (state == RUNNING)
+ return;
+ }
+
+ // Either state is STOP, or state is SHUTDOWN and there is
+ // no work to do. So we can terminate.
+ termination.signalAll();
+ runState = TERMINATED;
+ // fall through to call terminate() outside of lock.
+ } finally {
+ mainLock.unlock();
+ }
+
+ assert runState == TERMINATED;
+ terminated();
+ }
+
+ /**
+ * Worker threads
+ */
+ private class Worker implements Runnable {
+
+ /**
+ * The runLock is acquired and released surrounding each task
+ * execution. It mainly protects against interrupts that are
+ * intended to cancel the worker thread from instead
+ * interrupting the task being run.
+ */
+ private final ReentrantLock runLock = new ReentrantLock();
+
+ /**
+ * Initial task to run before entering run loop
+ */
+ private Runnable firstTask;
+
+ /**
+ * Per thread completed task counter; accumulated
+ * into completedTaskCount upon termination.
+ */
+ volatile long completedTasks;
+
+ /**
+ * Thread this worker is running in. Acts as a final field,
+ * but cannot be set until thread is created.
+ */
+ Thread thread;
+
+ Worker(Runnable firstTask) {
+ this.firstTask = firstTask;
+ }
+
+ boolean isActive() {
+ return runLock.isLocked();
+ }
+
+ /**
+ * Interrupt thread if not running a task
+ */
+ void interruptIfIdle() {
+ final ReentrantLock runLock = this.runLock;
+ if (runLock.tryLock()) {
+ try {
+ thread.interrupt();
+ } finally {
+ runLock.unlock();
+ }
+ }
+ }
+
+ /**
+ * Cause thread to die even if running a task.
+ */
+ void interruptNow() {
+ thread.interrupt();
+ }
+
+ /**
+ * Run a single task between before/after methods.
+ */
+ private void runTask(Runnable task) {
+ final ReentrantLock runLock = this.runLock;
+ runLock.lock();
+ try {
+ // Abort now if immediate cancel. Otherwise, we have
+ // committed to run this task.
+ if (runState == STOP)
+ return;
+
+ Thread.interrupted(); // clear interrupt status on entry
+ boolean ran = false;
+ beforeExecute(thread, task);
+ try {
+ task.run();
+ ran = true;
+ afterExecute(task, null);
+ ++completedTasks;
+ } catch(RuntimeException ex) {
+ if (!ran)
+ afterExecute(task, ex);
+ // Else the exception occurred within
+ // afterExecute itself in which case we don't
+ // want to call it again.
+ throw ex;
+ }
+ } finally {
+ runLock.unlock();
+ }
+ }
+
+ /**
+ * Main run loop
+ */
+ public void run() {
+ try {
+ Runnable task = firstTask;
+ firstTask = null;
+ while (task != null || (task = getTask()) != null) {
+ runTask(task);
+ task = null; // unnecessary but can help GC
+ }
+ } catch(InterruptedException ie) {
+ // fall through
+ } finally {
+ workerDone(this);
+ }
+ }
+ }
+
+ // Public methods
+
+ /**
+ * Creates a new <tt>ThreadPoolExecutor</tt> with the given
+ * initial parameters and default thread factory and handler. It
+ * may be more convenient to use one of the {@link Executors}
+ * factory methods instead of this general purpose constructor.
+ *
+ * @param corePoolSize the number of threads to keep in the
+ * pool, even if they are idle.
+ * @param maximumPoolSize the maximum number of threads to allow in the
+ * pool.
+ * @param keepAliveTime when the number of threads is greater than
+ * the core, this is the maximum time that excess idle threads
+ * will wait for new tasks before terminating.
+ * @param unit the time unit for the keepAliveTime
+ * argument.
+ * @param workQueue the queue to use for holding tasks before they
+ * are executed. This queue will hold only the <tt>Runnable</tt>
+ * tasks submitted by the <tt>execute</tt> method.
+ * @throws IllegalArgumentException if corePoolSize, or
+ * keepAliveTime less than zero, or if maximumPoolSize less than or
+ * equal to zero, or if corePoolSize greater than maximumPoolSize.
+ * @throws NullPointerException if <tt>workQueue</tt> is null
+ */
+ public ThreadPoolExecutor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue) {
+ this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
+ Executors.defaultThreadFactory(), defaultHandler);
+ }
+
+ /**
+ * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
+ * parameters.
+ *
+ * @param corePoolSize the number of threads to keep in the
+ * pool, even if they are idle.
+ * @param maximumPoolSize the maximum number of threads to allow in the
+ * pool.
+ * @param keepAliveTime when the number of threads is greater than
+ * the core, this is the maximum time that excess idle threads
+ * will wait for new tasks before terminating.
+ * @param unit the time unit for the keepAliveTime
+ * argument.
+ * @param workQueue the queue to use for holding tasks before they
+ * are executed. This queue will hold only the <tt>Runnable</tt>
+ * tasks submitted by the <tt>execute</tt> method.
+ * @param threadFactory the factory to use when the executor
+ * creates a new thread.
+ * @throws IllegalArgumentException if corePoolSize, or
+ * keepAliveTime less than zero, or if maximumPoolSize less than or
+ * equal to zero, or if corePoolSize greater than maximumPoolSize.
+ * @throws NullPointerException if <tt>workQueue</tt>
+ * or <tt>threadFactory</tt> are null.
+ */
+ public ThreadPoolExecutor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ ThreadFactory threadFactory) {
+ this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
+ threadFactory, defaultHandler);
+ }
+
+ /**
+ * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
+ * parameters.
+ *
+ * @param corePoolSize the number of threads to keep in the
+ * pool, even if they are idle.
+ * @param maximumPoolSize the maximum number of threads to allow in the
+ * pool.
+ * @param keepAliveTime when the number of threads is greater than
+ * the core, this is the maximum time that excess idle threads
+ * will wait for new tasks before terminating.
+ * @param unit the time unit for the keepAliveTime
+ * argument.
+ * @param workQueue the queue to use for holding tasks before they
+ * are executed. This queue will hold only the <tt>Runnable</tt>
+ * tasks submitted by the <tt>execute</tt> method.
+ * @param handler the handler to use when execution is blocked
+ * because the thread bounds and queue capacities are reached.
+ * @throws IllegalArgumentException if corePoolSize, or
+ * keepAliveTime less than zero, or if maximumPoolSize less than or
+ * equal to zero, or if corePoolSize greater than maximumPoolSize.
+ * @throws NullPointerException if <tt>workQueue</tt>
+ * or <tt>handler</tt> are null.
+ */
+ public ThreadPoolExecutor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ RejectedExecutionHandler handler) {
+ this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
+ Executors.defaultThreadFactory(), handler);
+ }
+
+ /**
+ * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
+ * parameters.
+ *
+ * @param corePoolSize the number of threads to keep in the
+ * pool, even if they are idle.
+ * @param maximumPoolSize the maximum number of threads to allow in the
+ * pool.
+ * @param keepAliveTime when the number of threads is greater than
+ * the core, this is the maximum time that excess idle threads
+ * will wait for new tasks before terminating.
+ * @param unit the time unit for the keepAliveTime
+ * argument.
+ * @param workQueue the queue to use for holding tasks before they
+ * are executed. This queue will hold only the <tt>Runnable</tt>
+ * tasks submitted by the <tt>execute</tt> method.
+ * @param threadFactory the factory to use when the executor
+ * creates a new thread.
+ * @param handler the handler to use when execution is blocked
+ * because the thread bounds and queue capacities are reached.
+ * @throws IllegalArgumentException if corePoolSize, or
+ * keepAliveTime less than zero, or if maximumPoolSize less than or
+ * equal to zero, or if corePoolSize greater than maximumPoolSize.
+ * @throws NullPointerException if <tt>workQueue</tt>
+ * or <tt>threadFactory</tt> or <tt>handler</tt> are null.
+ */
+ public ThreadPoolExecutor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ ThreadFactory threadFactory,
+ RejectedExecutionHandler handler) {
+ if (corePoolSize < 0 ||
+ maximumPoolSize <= 0 ||
+ maximumPoolSize < corePoolSize ||
+ keepAliveTime < 0)
+ throw new IllegalArgumentException();
+ if (workQueue == null || threadFactory == null || handler == null)
+ throw new NullPointerException();
+ this.corePoolSize = corePoolSize;
+ this.maximumPoolSize = maximumPoolSize;
+ this.workQueue = workQueue;
+ this.keepAliveTime = unit.toNanos(keepAliveTime);
+ this.threadFactory = threadFactory;
+ this.handler = handler;
+ }
+
+
+ /**
+ * Executes the given task sometime in the future. The task
+ * may execute in a new thread or in an existing pooled thread.
+ *
+ * If the task cannot be submitted for execution, either because this
+ * executor has been shutdown or because its capacity has been reached,
+ * the task is handled by the current <tt>RejectedExecutionHandler</tt>.
+ *
+ * @param command the task to execute
+ * @throws RejectedExecutionException at discretion of
+ * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
+ * for execution
+ * @throws NullPointerException if command is null
+ */
+ public void execute(Runnable command) {
+ if (command == null)
+ throw new NullPointerException();
+ for (;;) {
+ if (runState != RUNNING) {
+ reject(command);
+ return;
+ }
+ if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
+ return;
+ if (workQueue.offer(command))
+ return;
+ Runnable r = addIfUnderMaximumPoolSize(command);
+ if (r == command)
+ return;
+ if (r == null) {
+ reject(command);
+ return;
+ }
+ // else retry
+ }
+ }
+
+ /**
+ * Initiates an orderly shutdown in which previously submitted
+ * tasks are executed, but no new tasks will be
+ * accepted. Invocation has no additional effect if already shut
+ * down.
+ * @throws SecurityException if a security manager exists and
+ * shutting down this ExecutorService may manipulate threads that
+ * the caller is not permitted to modify because it does not hold
+ * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
+ * or the security manager's <tt>checkAccess</tt> method denies access.
+ */
+ public void shutdown() {
+ // Fail if caller doesn't have modifyThread permission
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
+ java.security.AccessController.checkPermission(shutdownPerm);
+
+ boolean fullyTerminated = false;
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ if (workers.size() > 0) {
+ // Check if caller can modify worker threads. This
+ // might not be true even if passed above check, if
+ // the SecurityManager treats some threads specially.
+ if (security != null) {
+ for (Worker w: workers)
+ security.checkAccess(w.thread);
+ }
+
+ int state = runState;
+ if (state == RUNNING) // don't override shutdownNow
+ runState = SHUTDOWN;
+
+ try {
+ for (Worker w: workers)
+ w.interruptIfIdle();
+ } catch(SecurityException se) {
+ // If SecurityManager allows above checks, but
+ // then unexpectedly throws exception when
+ // interrupting threads (which it ought not do),
+ // back out as cleanly as we can. Some threads may
+ // have been killed but we remain in non-shutdown
+ // state.
+ runState = state;
+ throw se;
+ }
+ }
+ else { // If no workers, trigger full termination now
+ fullyTerminated = true;
+ runState = TERMINATED;
+ termination.signalAll();
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ if (fullyTerminated)
+ terminated();
+ }
+
+
+ /**
+ * Attempts to stop all actively executing tasks, halts the
+ * processing of waiting tasks, and returns a list of the tasks that were
+ * awaiting execution.
+ *
+ * <p>This implementation cancels tasks via {@link
+ * Thread#interrupt}, so if any tasks mask or fail to respond to
+ * interrupts, they may never terminate.
+ *
+ * @return list of tasks that never commenced execution
+ * @throws SecurityException if a security manager exists and
+ * shutting down this ExecutorService may manipulate threads that
+ * the caller is not permitted to modify because it does not hold
+ * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
+ * or the security manager's <tt>checkAccess</tt> method denies access.
+ */
+ public List<Runnable> shutdownNow() {
+ // Almost the same code as shutdown()
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
+ java.security.AccessController.checkPermission(shutdownPerm);
+
+ boolean fullyTerminated = false;
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ if (workers.size() > 0) {
+ if (security != null) {
+ for (Worker w: workers)
+ security.checkAccess(w.thread);
+ }
+
+ int state = runState;
+ if (state != TERMINATED)
+ runState = STOP;
+ try {
+ for (Worker w : workers)
+ w.interruptNow();
+ } catch(SecurityException se) {
+ runState = state; // back out;
+ throw se;
+ }
+ }
+ else { // If no workers, trigger full termination now
+ fullyTerminated = true;
+ runState = TERMINATED;
+ termination.signalAll();
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ if (fullyTerminated)
+ terminated();
+ return Arrays.asList(workQueue.toArray(EMPTY_RUNNABLE_ARRAY));
+ }
+
+ public boolean isShutdown() {
+ return runState != RUNNING;
+ }
+
+ /**
+ * Returns true if this executor is in the process of terminating
+ * after <tt>shutdown</tt> or <tt>shutdownNow</tt> but has not
+ * completely terminated. This method may be useful for
+ * debugging. A return of <tt>true</tt> reported a sufficient
+ * period after shutdown may indicate that submitted tasks have
+ * ignored or suppressed interruption, causing this executor not
+ * to properly terminate.
+ * @return true if terminating but not yet terminated.
+ */
+ public boolean isTerminating() {
+ return runState == STOP;
+ }
+
+ public boolean isTerminated() {
+ return runState == TERMINATED;
+ }
+
+ public boolean awaitTermination(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ long nanos = unit.toNanos(timeout);
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ for (;;) {
+ if (runState == TERMINATED)
+ return true;
+ if (nanos <= 0)
+ return false;
+ nanos = termination.awaitNanos(nanos);
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Invokes <tt>shutdown</tt> when this executor is no longer
+ * referenced.
+ */
+ protected void finalize() {
+ shutdown();
+ }
+
+ /**
+ * Sets the thread factory used to create new threads.
+ *
+ * @param threadFactory the new thread factory
+ * @throws NullPointerException if threadFactory is null
+ * @see #getThreadFactory
+ */
+ public void setThreadFactory(ThreadFactory threadFactory) {
+ if (threadFactory == null)
+ throw new NullPointerException();
+ this.threadFactory = threadFactory;
+ }
+
+ /**
+ * Returns the thread factory used to create new threads.
+ *
+ * @return the current thread factory
+ * @see #setThreadFactory
+ */
+ public ThreadFactory getThreadFactory() {
+ return threadFactory;
+ }
+
+ /**
+ * Sets a new handler for unexecutable tasks.
+ *
+ * @param handler the new handler
+ * @throws NullPointerException if handler is null
+ * @see #getRejectedExecutionHandler
+ */
+ public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
+ if (handler == null)
+ throw new NullPointerException();
+ this.handler = handler;
+ }
+
+ /**
+ * Returns the current handler for unexecutable tasks.
+ *
+ * @return the current handler
+ * @see #setRejectedExecutionHandler
+ */
+ public RejectedExecutionHandler getRejectedExecutionHandler() {
+ return handler;
+ }
+
+ /**
+ * Returns the task queue used by this executor. Access to the
+ * task queue is intended primarily for debugging and monitoring.
+ * This queue may be in active use. Retrieving the task queue
+ * does not prevent queued tasks from executing.
+ *
+ * @return the task queue
+ */
+ public BlockingQueue<Runnable> getQueue() {
+ return workQueue;
+ }
+
+ /**
+ * Removes this task from the executor's internal queue if it is
+ * present, thus causing it not to be run if it has not already
+ * started.
+ *
+ * <p> This method may be useful as one part of a cancellation
+ * scheme. It may fail to remove tasks that have been converted
+ * into other forms before being placed on the internal queue. For
+ * example, a task entered using <tt>submit</tt> might be
+ * converted into a form that maintains <tt>Future</tt> status.
+ * However, in such cases, method {@link ThreadPoolExecutor#purge}
+ * may be used to remove those Futures that have been cancelled.
+ *
+ *
+ * @param task the task to remove
+ * @return true if the task was removed
+ */
+ public boolean remove(Runnable task) {
+ return getQueue().remove(task);
+ }
+
+
+ /**
+ * Tries to remove from the work queue all {@link Future}
+ * tasks that have been cancelled. This method can be useful as a
+ * storage reclamation operation, that has no other impact on
+ * functionality. Cancelled tasks are never executed, but may
+ * accumulate in work queues until worker threads can actively
+ * remove them. Invoking this method instead tries to remove them now.
+ * However, this method may fail to remove tasks in
+ * the presence of interference by other threads.
+ */
+ public void purge() {
+ // Fail if we encounter interference during traversal
+ try {
+ Iterator<Runnable> it = getQueue().iterator();
+ while (it.hasNext()) {
+ Runnable r = it.next();
+ if (r instanceof Future<?>) {
+ Future<?> c = (Future<?>)r;
+ if (c.isCancelled())
+ it.remove();
+ }
+ }
+ }
+ catch(ConcurrentModificationException ex) {
+ return;
+ }
+ }
+
+ /**
+ * Sets the core number of threads. This overrides any value set
+ * in the constructor. If the new value is smaller than the
+ * current value, excess existing threads will be terminated when
+ * they next become idle. If larger, new threads will, if needed,
+ * be started to execute any queued tasks.
+ *
+ * @param corePoolSize the new core size
+ * @throws IllegalArgumentException if <tt>corePoolSize</tt>
+ * less than zero
+ * @see #getCorePoolSize
+ */
+ public void setCorePoolSize(int corePoolSize) {
+ if (corePoolSize < 0)
+ throw new IllegalArgumentException();
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ int extra = this.corePoolSize - corePoolSize;
+ this.corePoolSize = corePoolSize;
+ if (extra < 0) {
+ Runnable r;
+ while (extra++ < 0 && poolSize < corePoolSize &&
+ (r = workQueue.poll()) != null)
+ addThread(r).start();
+ }
+ else if (extra > 0 && poolSize > corePoolSize) {
+ Iterator<Worker> it = workers.iterator();
+ while (it.hasNext() &&
+ extra-- > 0 &&
+ poolSize > corePoolSize &&
+ workQueue.remainingCapacity() == 0)
+ it.next().interruptIfIdle();
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Returns the core number of threads.
+ *
+ * @return the core number of threads
+ * @see #setCorePoolSize
+ */
+ public int getCorePoolSize() {
+ return corePoolSize;
+ }
+
+ /**
+ * Starts a core thread, causing it to idly wait for work. This
+ * overrides the default policy of starting core threads only when
+ * new tasks are executed. This method will return <tt>false</tt>
+ * if all core threads have already been started.
+ * @return true if a thread was started
+ */
+ public boolean prestartCoreThread() {
+ return addIfUnderCorePoolSize(null);
+ }
+
+ /**
+ * Starts all core threads, causing them to idly wait for work. This
+ * overrides the default policy of starting core threads only when
+ * new tasks are executed.
+ * @return the number of threads started.
+ */
+ public int prestartAllCoreThreads() {
+ int n = 0;
+ while (addIfUnderCorePoolSize(null))
+ ++n;
+ return n;
+ }
+
+ /**
+ * Sets the maximum allowed number of threads. This overrides any
+ * value set in the constructor. If the new value is smaller than
+ * the current value, excess existing threads will be
+ * terminated when they next become idle.
+ *
+ * @param maximumPoolSize the new maximum
+ * @throws IllegalArgumentException if maximumPoolSize less than zero or
+ * the {@link #getCorePoolSize core pool size}
+ * @see #getMaximumPoolSize
+ */
+ public void setMaximumPoolSize(int maximumPoolSize) {
+ if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
+ throw new IllegalArgumentException();
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ int extra = this.maximumPoolSize - maximumPoolSize;
+ this.maximumPoolSize = maximumPoolSize;
+ if (extra > 0 && poolSize > maximumPoolSize) {
+ Iterator<Worker> it = workers.iterator();
+ while (it.hasNext() &&
+ extra > 0 &&
+ poolSize > maximumPoolSize) {
+ it.next().interruptIfIdle();
+ --extra;
+ }
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Returns the maximum allowed number of threads.
+ *
+ * @return the maximum allowed number of threads
+ * @see #setMaximumPoolSize
+ */
+ public int getMaximumPoolSize() {
+ return maximumPoolSize;
+ }
+
+ /**
+ * Sets the time limit for which threads may remain idle before
+ * being terminated. If there are more than the core number of
+ * threads currently in the pool, after waiting this amount of
+ * time without processing a task, excess threads will be
+ * terminated. This overrides any value set in the constructor.
+ * @param time the time to wait. A time value of zero will cause
+ * excess threads to terminate immediately after executing tasks.
+ * @param unit the time unit of the time argument
+ * @throws IllegalArgumentException if time less than zero
+ * @see #getKeepAliveTime
+ */
+ public void setKeepAliveTime(long time, TimeUnit unit) {
+ if (time < 0)
+ throw new IllegalArgumentException();
+ this.keepAliveTime = unit.toNanos(time);
+ }
+
+ /**
+ * Returns the thread keep-alive time, which is the amount of time
+ * which threads in excess of the core pool size may remain
+ * idle before being terminated.
+ *
+ * @param unit the desired time unit of the result
+ * @return the time limit
+ * @see #setKeepAliveTime
+ */
+ public long getKeepAliveTime(TimeUnit unit) {
+ return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
+ }
+
+ /* Statistics */
+
+ /**
+ * Returns the current number of threads in the pool.
+ *
+ * @return the number of threads
+ */
+ public int getPoolSize() {
+ return poolSize;
+ }
+
+ /**
+ * Returns the approximate number of threads that are actively
+ * executing tasks.
+ *
+ * @return the number of threads
+ */
+ public int getActiveCount() {
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ int n = 0;
+ for (Worker w : workers) {
+ if (w.isActive())
+ ++n;
+ }
+ return n;
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Returns the largest number of threads that have ever
+ * simultaneously been in the pool.
+ *
+ * @return the number of threads
+ */
+ public int getLargestPoolSize() {
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ return largestPoolSize;
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Returns the approximate total number of tasks that have been
+ * scheduled for execution. Because the states of tasks and
+ * threads may change dynamically during computation, the returned
+ * value is only an approximation, but one that does not ever
+ * decrease across successive calls.
+ *
+ * @return the number of tasks
+ */
+ public long getTaskCount() {
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ long n = completedTaskCount;
+ for (Worker w : workers) {
+ n += w.completedTasks;
+ if (w.isActive())
+ ++n;
+ }
+ return n + workQueue.size();
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Returns the approximate total number of tasks that have
+ * completed execution. Because the states of tasks and threads
+ * may change dynamically during computation, the returned value
+ * is only an approximation, but one that does not ever decrease
+ * across successive calls.
+ *
+ * @return the number of tasks
+ */
+ public long getCompletedTaskCount() {
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ long n = completedTaskCount;
+ for (Worker w : workers)
+ n += w.completedTasks;
+ return n;
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Method invoked prior to executing the given Runnable in the
+ * given thread. This method is invoked by thread <tt>t</tt> that
+ * will execute task <tt>r</tt>, and may be used to re-initialize
+ * ThreadLocals, or to perform logging. Note: To properly nest
+ * multiple overridings, subclasses should generally invoke
+ * <tt>super.beforeExecute</tt> at the end of this method.
+ *
+ * @param t the thread that will run task r.
+ * @param r the task that will be executed.
+ */
+ protected void beforeExecute(Thread t, Runnable r) { }
+
+ /**
+ * Method invoked upon completion of execution of the given
+ * Runnable. This method is invoked by the thread that executed
+ * the task. If non-null, the Throwable is the uncaught exception
+ * that caused execution to terminate abruptly. Note: To properly
+ * nest multiple overridings, subclasses should generally invoke
+ * <tt>super.afterExecute</tt> at the beginning of this method.
+ *
+ * @param r the runnable that has completed.
+ * @param t the exception that caused termination, or null if
+ * execution completed normally.
+ */
+ protected void afterExecute(Runnable r, Throwable t) { }
+
+ /**
+ * Method invoked when the Executor has terminated. Default
+ * implementation does nothing. Note: To properly nest multiple
+ * overridings, subclasses should generally invoke
+ * <tt>super.terminated</tt> within this method.
+ */
+ protected void terminated() { }
+
+ /**
+ * A handler for rejected tasks that runs the rejected task
+ * directly in the calling thread of the <tt>execute</tt> method,
+ * unless the executor has been shut down, in which case the task
+ * is discarded.
+ */
+ public static class CallerRunsPolicy implements RejectedExecutionHandler {
+ /**
+ * Creates a <tt>CallerRunsPolicy</tt>.
+ */
+ public CallerRunsPolicy() { }
+
+ /**
+ * Executes task r in the caller's thread, unless the executor
+ * has been shut down, in which case the task is discarded.
+ * @param r the runnable task requested to be executed
+ * @param e the executor attempting to execute this task
+ */
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
+ if (!e.isShutdown()) {
+ r.run();
+ }
+ }
+ }
+
+ /**
+ * A handler for rejected tasks that throws a
+ * <tt>RejectedExecutionException</tt>.
+ */
+ public static class AbortPolicy implements RejectedExecutionHandler {
+ /**
+ * Creates an <tt>AbortPolicy</tt>.
+ */
+ public AbortPolicy() { }
+
+ /**
+ * Always throws RejectedExecutionException.
+ * @param r the runnable task requested to be executed
+ * @param e the executor attempting to execute this task
+ * @throws RejectedExecutionException always.
+ */
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
+ throw new RejectedExecutionException();
+ }
+ }
+
+ /**
+ * A handler for rejected tasks that silently discards the
+ * rejected task.
+ */
+ public static class DiscardPolicy implements RejectedExecutionHandler {
+ /**
+ * Creates a <tt>DiscardPolicy</tt>.
+ */
+ public DiscardPolicy() { }
+
+ /**
+ * Does nothing, which has the effect of discarding task r.
+ * @param r the runnable task requested to be executed
+ * @param e the executor attempting to execute this task
+ */
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
+ }
+ }
+
+ /**
+ * A handler for rejected tasks that discards the oldest unhandled
+ * request and then retries <tt>execute</tt>, unless the executor
+ * is shut down, in which case the task is discarded.
+ */
+ public static class DiscardOldestPolicy implements RejectedExecutionHandler {
+ /**
+ * Creates a <tt>DiscardOldestPolicy</tt> for the given executor.
+ */
+ public DiscardOldestPolicy() { }
+
+ /**
+ * Obtains and ignores the next task that the executor
+ * would otherwise execute, if one is immediately available,
+ * and then retries execution of task r, unless the executor
+ * is shut down, in which case task r is instead discarded.
+ * @param r the runnable task requested to be executed
+ * @param e the executor attempting to execute this task
+ */
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
+ if (!e.isShutdown()) {
+ e.getQueue().poll();
+ e.execute(r);
+ }
+ }
+ }
+}
Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/harmony/ThreadPoolExecutor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/spi/FactoryFinder.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/spi/FactoryFinder.java?rev=634792&view=auto
==============================================================================
--- geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/spi/FactoryFinder.java (added)
+++ geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/spi/FactoryFinder.java Fri Mar 7 11:56:14 2008
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.geronimo.concurrent.spi;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Properties;
+
+class FactoryFinder {
+
+ private static Object newInstance(String factoryClassName) throws Exception {
+ ClassLoader classloader = null;
+ try {
+ classloader = Thread.currentThread().getContextClassLoader();
+ } catch (Exception exception) {
+ throw new Exception(exception.toString(), exception);
+ }
+
+ try {
+ Class factory = null;
+ if (classloader == null) {
+ factory = Class.forName(factoryClassName);
+ } else {
+ try {
+ factory = classloader.loadClass(factoryClassName);
+ } catch (ClassNotFoundException cnfe) {
+ }
+ }
+ if (factory == null) {
+ classloader = FactoryFinder.class.getClassLoader();
+ factory = classloader.loadClass(factoryClassName);
+ }
+ return factory.newInstance();
+ } catch (ClassNotFoundException classnotfoundexception) {
+ throw new Exception(
+ "Provider " + factoryClassName + " not found",
+ classnotfoundexception);
+ } catch (Exception exception) {
+ throw new Exception(
+ "Provider " + factoryClassName + " could not be instantiated: " + exception,
+ exception);
+ }
+ }
+
+ static Object find(String factoryPropertyName,
+ String defaultFactoryClassName) throws Exception {
+ try {
+ String factoryClassName = System.getProperty(factoryPropertyName);
+ if (factoryClassName != null) {
+ return newInstance(factoryClassName);
+ }
+ } catch (SecurityException securityexception) {
+ }
+
+ try {
+ String propertiesFileName = System.getProperty("java.home")
+ + File.separator + "lib"
+ + File.separator + "concurrent.properties";
+ File file = new File(propertiesFileName);
+ if (file.exists()) {
+ FileInputStream fileInput = new FileInputStream(file);
+ Properties properties = new Properties();
+ properties.load(fileInput);
+ fileInput.close();
+ String factoryClassName = properties.getProperty(factoryPropertyName);
+ return newInstance(factoryClassName);
+ }
+ } catch (Exception exception1) {
+ }
+
+ String factoryResource = "META-INF/services/" + factoryPropertyName;
+
+ try {
+ InputStream inputstream = getResource(factoryResource);
+ if (inputstream != null) {
+ BufferedReader bufferedreader =
+ new BufferedReader(new InputStreamReader(inputstream, "UTF-8"));
+ String factoryClassName = bufferedreader.readLine();
+ bufferedreader.close();
+ if ((factoryClassName != null) && !"".equals(factoryClassName)) {
+ try {
+ return newInstance(factoryClassName);
+ } catch (Exception e) {
+ throw new Exception(
+ "Provider for " + factoryPropertyName + " cannot be found");
+ }
+ }
+ }
+ } catch (Exception exception2) {
+ }
+
+ if (defaultFactoryClassName == null) {
+ throw new Exception(
+ "Provider for " + factoryPropertyName + " cannot be found");
+ } else {
+ return newInstance(defaultFactoryClassName);
+ }
+ }
+
+ private static InputStream getResource(String factoryResource) {
+ ClassLoader classloader = null;
+ try {
+ classloader = Thread.currentThread().getContextClassLoader();
+ } catch (SecurityException securityexception) {
+ }
+
+ InputStream inputstream;
+ if (classloader == null) {
+ inputstream =
+ ClassLoader.getSystemResourceAsStream(factoryResource);
+ } else {
+ inputstream = classloader.getResourceAsStream(factoryResource);
+ }
+
+ if (inputstream == null) {
+ inputstream =
+ FactoryFinder.class.getClassLoader().getResourceAsStream(factoryResource);
+ }
+ return inputstream;
+ }
+}
Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/spi/FactoryFinder.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/spi/ManagedContextBuilder.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/spi/ManagedContextBuilder.java?rev=634792&view=auto
==============================================================================
--- geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/spi/ManagedContextBuilder.java (added)
+++ geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/spi/ManagedContextBuilder.java Fri Mar 7 11:56:14 2008
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.geronimo.concurrent.spi;
+
+import org.apache.geronimo.concurrent.ManagedContext;
+import org.apache.geronimo.concurrent.ManagedContextHandler;
+
+public abstract class ManagedContextBuilder {
+
+ public static final String MANAGED_CONTEXT_BUILDER_PROPERTY =
+ "org.apache.geronimo.concurrent.ManagedContextBuilder";
+
+ private static final String DEFAULT_MANAGED_CONTEXT_BUILDER =
+ "org.apache.geronimo.concurrent.impl.GeronimoManagedContextBuilder";
+
+ private static ManagedContextBuilder instance = null;
+
+ public static synchronized ManagedContextBuilder getManagedContextBuilder() {
+ if (instance == null) {
+ try {
+ instance =
+ (ManagedContextBuilder)FactoryFinder.find(MANAGED_CONTEXT_BUILDER_PROPERTY,
+ DEFAULT_MANAGED_CONTEXT_BUILDER);
+ } catch (Exception exception) {
+ throw new RuntimeException(exception.getMessage(), exception);
+ }
+ }
+ return instance;
+ }
+
+ protected ManagedContextBuilder() {
+ }
+
+ public abstract ManagedContext buildManagedContext(ManagedContextHandler contextHandler);
+
+}
Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/spi/ManagedContextBuilder.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/thread/AbstractManagedThreadFactory.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/thread/AbstractManagedThreadFactory.java?rev=634792&view=auto
==============================================================================
--- geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/thread/AbstractManagedThreadFactory.java (added)
+++ geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/thread/AbstractManagedThreadFactory.java Fri Mar 7 11:56:14 2008
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.concurrent.thread;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.util.concurrent.ManagedThreadFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class AbstractManagedThreadFactory
+ implements ManagedThreadFactory, ThreadStoppedListener {
+
+ private final static Log LOG = LogFactory.getLog(AbstractManagedThreadFactory.class);
+
+ private static final AtomicInteger groupNumber = new AtomicInteger(1);
+
+ protected List<ManagedThread> threads =
+ Collections.synchronizedList(new ArrayList<ManagedThread>());
+ protected boolean stopped = false;
+
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+ private int threadPriority = Thread.NORM_PRIORITY;
+ private boolean daemonThread = false;
+ private long hungTaskThreshold = 0;
+ protected ThreadGroup threadGroup;
+
+ public AbstractManagedThreadFactory() {
+ setThreadGroup(null);
+ }
+
+ public ThreadGroup getThreadGroup() {
+ return this.threadGroup;
+ }
+
+ public void setThreadGroup(String groupName) {
+ this.threadGroup = new ThreadGroup(getTheadGroupName(groupName));
+ }
+
+ private static String getTheadGroupName(String groupName) {
+ if (groupName == null) {
+ return "ManagedThread-g" + groupNumber.getAndIncrement();
+ } else {
+ return groupName;
+ }
+ }
+
+ public int getThreadPriority() {
+ return this.threadPriority;
+ }
+
+ public void setThreadPriority(int threadPriority) {
+ if (threadPriority < Thread.MIN_PRIORITY ||
+ threadPriority > Thread.MAX_PRIORITY) {
+ throw new IllegalArgumentException("Invalid thread priority: " + threadPriority);
+ }
+ this.threadPriority = threadPriority;
+ }
+
+ public boolean isDaemonThread() {
+ return this.daemonThread;
+ }
+
+ public void setDaemonThread(boolean daemonThread) {
+ this.daemonThread = daemonThread;
+ }
+
+ public long getHungTaskThreshold() {
+ return this.hungTaskThreshold;
+ }
+
+ public void setHungTaskThreshold(long hungTaskThreshold) {
+ if (hungTaskThreshold < 0) {
+ throw new IllegalArgumentException("Threshold must be zero or greater");
+ }
+ this.hungTaskThreshold = hungTaskThreshold;
+ }
+
+ public Thread newThread(Runnable runnable) {
+ if (!isRunning()) {
+ throw new IllegalArgumentException();
+ }
+
+ String name = this.threadGroup.getName() + "-t" + this.threadNumber.getAndIncrement();
+ ManagedThread thread = createThread(this.threadGroup, runnable, name);
+ thread.setPriority(this.threadPriority);
+ thread.setDaemon(this.daemonThread);
+ thread.setHungTaskThreshold(this.hungTaskThreshold);
+
+ this.threads.add(thread);
+
+ LOG.debug("Added thread: " + thread);
+
+ return thread;
+ }
+
+ protected ManagedThread createThread(ThreadGroup group, Runnable runnable, String name) {
+ return new ManagedThread(group, runnable, name, this);
+ }
+
+ public void threadStopped(Thread thread) {
+ this.threads.remove(thread);
+
+ LOG.debug("Removed thread: " + thread);
+ }
+
+ protected void updateStatus() {
+ List<ManagedThread> threadList = getThreadList();
+ for (ManagedThread thread : threadList) {
+ thread.updateState();
+ }
+ }
+
+ protected List<ManagedThread> getThreadList() {
+ synchronized(this.threads) {
+ return new ArrayList<ManagedThread>(this.threads);
+ }
+ }
+
+ public boolean isRunning() {
+ return !this.stopped;
+ }
+
+ public void shutdown() {
+ this.stopped = true;
+ synchronized(this.threads) {
+ for (Thread thread : this.threads) {
+ thread.interrupt();
+ }
+ }
+ }
+
+}
Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/thread/AbstractManagedThreadFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/thread/AbstractManagedThreadFactory.java
------------------------------------------------------------------------------
svn:executable =