You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@harmony.apache.org by nd...@apache.org on 2006/08/24 05:42:33 UTC
svn commit: r434296 [6/19] - in /incubator/harmony/enhanced/classlib/trunk:
make/ modules/concurrent/ modules/concurrent/.settings/
modules/concurrent/META-INF/ modules/concurrent/make/
modules/concurrent/src/ modules/concurrent/src/main/ modules/concu...
Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ThreadPoolExecutor.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ThreadPoolExecutor.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ThreadPoolExecutor.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,1508 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+import java.util.concurrent.locks.*;
+import java.util.*;
+
+/**
+ * An {@link ExecutorService} that executes each submitted task using
+ * one of possibly several pooled threads, normally configured
+ * using {@link Executors} factory methods.
+ *
+ * <p>Thread pools address two different problems: they usually
+ * provide improved performance when executing large numbers of
+ * asynchronous tasks, due to reduced per-task invocation overhead,
+ * and they provide a means of bounding and managing the resources,
+ * including threads, consumed when executing a collection of tasks.
+ * Each <tt>ThreadPoolExecutor</tt> also maintains some basic
+ * statistics, such as the number of completed tasks.
+ *
+ * <p>To be useful across a wide range of contexts, this class
+ * provides many adjustable parameters and extensibility
+ * hooks. However, programmers are urged to use the more convenient
+ * {@link Executors} factory methods {@link
+ * Executors#newCachedThreadPool} (unbounded thread pool, with
+ * automatic thread reclamation), {@link Executors#newFixedThreadPool}
+ * (fixed size thread pool) and {@link
+ * Executors#newSingleThreadExecutor} (single background thread), that
+ * preconfigure settings for the most common usage
+ * scenarios. Otherwise, use the following guide when manually
+ * configuring and tuning this class:
+ *
+ * <dl>
+ *
+ * <dt>Core and maximum pool sizes</dt>
+ *
+ * <dd>A <tt>ThreadPoolExecutor</tt> will automatically adjust the
+ * pool size
+ * (see {@link ThreadPoolExecutor#getPoolSize})
+ * according to the bounds set by corePoolSize
+ * (see {@link ThreadPoolExecutor#getCorePoolSize})
+ * and
+ * maximumPoolSize
+ * (see {@link ThreadPoolExecutor#getMaximumPoolSize}).
+ * When a new task is submitted in method {@link
+ * ThreadPoolExecutor#execute}, and fewer than corePoolSize threads
+ * are running, a new thread is created to handle the request, even if
+ * other worker threads are idle. If there are more than
+ * corePoolSize but less than maximumPoolSize threads running, a new
+ * thread will be created only if the queue is full. By setting
+ * corePoolSize and maximumPoolSize the same, you create a fixed-size
+ * thread pool. By setting maximumPoolSize to an essentially unbounded
+ * value such as <tt>Integer.MAX_VALUE</tt>, you allow the pool to
+ * accommodate an arbitrary number of concurrent tasks. Most typically,
+ * core and maximum pool sizes are set only upon construction, but they
+ * may also be changed dynamically using {@link
+ * ThreadPoolExecutor#setCorePoolSize} and {@link
+ * ThreadPoolExecutor#setMaximumPoolSize}. <dd>
+ *
+ * <dt> On-demand construction
+ *
+ * <dd> By default, even core threads are initially created and
+ * started only when needed by new tasks, but this can be overridden
+ * dynamically using method {@link
+ * ThreadPoolExecutor#prestartCoreThread} or
+ * {@link ThreadPoolExecutor#prestartAllCoreThreads}. </dd>
+ *
+ * <dt>Creating new threads</dt>
+ *
+ * <dd>New threads are created using a {@link
+ * java.util.concurrent.ThreadFactory}. If not otherwise specified, a
+ * {@link Executors#defaultThreadFactory} is used, that creates threads to all
+ * be in the same {@link ThreadGroup} and with the same
+ * <tt>NORM_PRIORITY</tt> priority and non-daemon status. By supplying
+ * a different ThreadFactory, you can alter the thread's name, thread
+ * group, priority, daemon status, etc. </dd>
+ *
+ * <dt>Keep-alive times</dt>
+ *
+ * <dd>If the pool currently has more than corePoolSize threads,
+ * excess threads will be terminated if they have been idle for more
+ * than the keepAliveTime (see {@link
+ * ThreadPoolExecutor#getKeepAliveTime}). This provides a means of
+ * reducing resource consumption when the pool is not being actively
+ * used. If the pool becomes more active later, new threads will be
+ * constructed. This parameter can also be changed dynamically
+ * using method {@link ThreadPoolExecutor#setKeepAliveTime}. Using
+ * a value of <tt>Long.MAX_VALUE</tt> {@link TimeUnit#NANOSECONDS}
+ * effectively disables idle threads from ever terminating prior
+ * to shut down.
+ * </dd>
+ *
+ * <dt>Queuing</dt>
+ *
+ * <dd>Any {@link BlockingQueue} may be used to transfer and hold
+ * submitted tasks. The use of this queue interacts with pool sizing:
+ *
+ * <ul>
+ *
+ * <li> If fewer than corePoolSize threads are running, the Executor
+ * always prefers adding a new thread
+ * rather than queuing.</li>
+ *
+ * <li> If corePoolSize or more threads are running, the Executor
+ * always prefers queuing a request rather than adding a new
+ * thread.</li>
+ *
+ * <li> If a request cannot be queued, a new thread is created unless
+ * this would exceed maximumPoolSize, in which case, the task will be
+ * rejected.</li>
+ *
+ * </ul>
+ *
+ * There are three general strategies for queuing:
+ * <ol>
+ *
+ * <li> <em> Direct handoffs.</em> A good default choice for a work
+ * queue is a {@link SynchronousQueue} that hands off tasks to threads
+ * without otherwise holding them. Here, an attempt to queue a task
+ * will fail if no threads are immediately available to run it, so a
+ * new thread will be constructed. This policy avoids lockups when
+ * handling sets of requests that might have internal dependencies.
+ * Direct handoffs generally require unbounded maximumPoolSizes to
+ * avoid rejection of new submitted tasks. This in turn admits the
+ * possibility of unbounded thread growth when commands continue to
+ * arrive on average faster than they can be processed. </li>
+ *
+ * <li><em> Unbounded queues.</em> Using an unbounded queue (for
+ * example a {@link LinkedBlockingQueue} without a predefined
+ * capacity) will cause new tasks to be queued in cases where all
+ * corePoolSize threads are busy. Thus, no more than corePoolSize
+ * threads will ever be created. (And the value of the maximumPoolSize
+ * therefore doesn't have any effect.) This may be appropriate when
+ * each task is completely independent of others, so tasks cannot
+ * affect each others execution; for example, in a web page server.
+ * While this style of queuing can be useful in smoothing out
+ * transient bursts of requests, it admits the possibility of
+ * unbounded work queue growth when commands continue to arrive on
+ * average faster than they can be processed. </li>
+ *
+ * <li><em>Bounded queues.</em> A bounded queue (for example, an
+ * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when
+ * used with finite maximumPoolSizes, but can be more difficult to
+ * tune and control. Queue sizes and maximum pool sizes may be traded
+ * off for each other: Using large queues and small pools minimizes
+ * CPU usage, OS resources, and context-switching overhead, but can
+ * lead to artificially low throughput. If tasks frequently block (for
+ * example if they are I/O bound), a system may be able to schedule
+ * time for more threads than you otherwise allow. Use of small queues
+ * generally requires larger pool sizes, which keeps CPUs busier but
+ * may encounter unacceptable scheduling overhead, which also
+ * decreases throughput. </li>
+ *
+ * </ol>
+ *
+ * </dd>
+ *
+ * <dt>Rejected tasks</dt>
+ *
+ * <dd> New tasks submitted in method {@link
+ * ThreadPoolExecutor#execute} will be <em>rejected</em> when the
+ * Executor has been shut down, and also when the Executor uses finite
+ * bounds for both maximum threads and work queue capacity, and is
+ * saturated. In either case, the <tt>execute</tt> method invokes the
+ * {@link RejectedExecutionHandler#rejectedExecution} method of its
+ * {@link RejectedExecutionHandler}. Four predefined handler policies
+ * are provided:
+ *
+ * <ol>
+ *
+ * <li> In the
+ * default {@link ThreadPoolExecutor.AbortPolicy}, the handler throws a
+ * runtime {@link RejectedExecutionException} upon rejection. </li>
+ *
+ * <li> In {@link
+ * ThreadPoolExecutor.CallerRunsPolicy}, the thread that invokes
+ * <tt>execute</tt> itself runs the task. This provides a simple
+ * feedback control mechanism that will slow down the rate that new
+ * tasks are submitted. </li>
+ *
+ * <li> In {@link ThreadPoolExecutor.DiscardPolicy},
+ * a task that cannot be executed is simply dropped. </li>
+ *
+ * <li>In {@link
+ * ThreadPoolExecutor.DiscardOldestPolicy}, if the executor is not
+ * shut down, the task at the head of the work queue is dropped, and
+ * then execution is retried (which can fail again, causing this to be
+ * repeated.) </li>
+ *
+ * </ol>
+ *
+ * It is possible to define and use other kinds of {@link
+ * RejectedExecutionHandler} classes. Doing so requires some care
+ * especially when policies are designed to work only under particular
+ * capacity or queuing policies. </dd>
+ *
+ * <dt>Hook methods</dt>
+ *
+ * <dd>This class provides <tt>protected</tt> overridable {@link
+ * ThreadPoolExecutor#beforeExecute} and {@link
+ * ThreadPoolExecutor#afterExecute} methods that are called before and
+ * after execution of each task. These can be used to manipulate the
+ * execution environment, for example, reinitializing ThreadLocals,
+ * gathering statistics, or adding log entries. Additionally, method
+ * {@link ThreadPoolExecutor#terminated} can be overridden to perform
+ * any special processing that needs to be done once the Executor has
+ * fully terminated.</dd>
+ *
+ * <dt>Queue maintenance</dt>
+ *
+ * <dd> Method {@link ThreadPoolExecutor#getQueue} allows access to
+ * the work queue for purposes of monitoring and debugging. Use of
+ * this method for any other purpose is strongly discouraged. Two
+ * supplied methods, {@link ThreadPoolExecutor#remove} and {@link
+ * ThreadPoolExecutor#purge} are available to assist in storage
+ * reclamation when large numbers of queued tasks become
+ * cancelled.</dd> </dl>
+ *
+ * <p> <b>Extension example</b>. Most extensions of this class
+ * override one or more of the protected hook methods. For example,
+ * here is a subclass that adds a simple pause/resume feature:
+ *
+ * <pre>
+ * class PausableThreadPoolExecutor extends ThreadPoolExecutor {
+ * private boolean isPaused;
+ * private ReentrantLock pauseLock = new ReentrantLock();
+ * private Condition unpaused = pauseLock.newCondition();
+ *
+ * public PausableThreadPoolExecutor(...) { super(...); }
+ *
+ * protected void beforeExecute(Thread t, Runnable r) {
+ * super.beforeExecute(t, r);
+ * pauseLock.lock();
+ * try {
+ * while (isPaused) unpaused.await();
+ * } catch(InterruptedException ie) {
+ * t.interrupt();
+ * } finally {
+ * pauseLock.unlock();
+ * }
+ * }
+ *
+ * public void pause() {
+ * pauseLock.lock();
+ * try {
+ * isPaused = true;
+ * } finally {
+ * pauseLock.unlock();
+ * }
+ * }
+ *
+ * public void resume() {
+ * pauseLock.lock();
+ * try {
+ * isPaused = false;
+ * unpaused.signalAll();
+ * } finally {
+ * pauseLock.unlock();
+ * }
+ * }
+ * }
+ * </pre>
+ * @since 1.5
+ * @author Doug Lea
+ */
+public class ThreadPoolExecutor extends AbstractExecutorService {
+ /**
+ * Only used to force toArray() to produce a Runnable[].
+ */
+ private static final Runnable[] EMPTY_RUNNABLE_ARRAY = new Runnable[0];
+
+ /**
+ * Permission for checking shutdown
+ */
+ private static final RuntimePermission shutdownPerm =
+ new RuntimePermission("modifyThread");
+
+ /**
+ * Queue used for holding tasks and handing off to worker threads.
+ */
+ private final BlockingQueue<Runnable> workQueue;
+
+ /**
+ * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and
+ * workers set.
+ */
+ private final ReentrantLock mainLock = new ReentrantLock();
+
+ /**
+ * Wait condition to support awaitTermination
+ */
+ private final Condition termination = mainLock.newCondition();
+
+ /**
+ * Set containing all worker threads in pool.
+ */
+ private final HashSet<Worker> workers = new HashSet<Worker>();
+
+ /**
+ * Timeout in nanoseconds for idle threads waiting for work.
+ * Threads use this timeout only when there are more than
+ * corePoolSize present. Otherwise they wait forever for new work.
+ */
+ private volatile long keepAliveTime;
+
+ /**
+ * Core pool size, updated only while holding mainLock,
+ * but volatile to allow concurrent readability even
+ * during updates.
+ */
+ private volatile int corePoolSize;
+
+ /**
+ * Maximum pool size, updated only while holding mainLock
+ * but volatile to allow concurrent readability even
+ * during updates.
+ */
+ private volatile int maximumPoolSize;
+
+ /**
+ * Current pool size, updated only while holding mainLock
+ * but volatile to allow concurrent readability even
+ * during updates.
+ */
+ private volatile int poolSize;
+
+ /**
+ * Lifecycle state
+ */
+ volatile int runState;
+
+ // Special values for runState
+ /** Normal, not-shutdown mode */
+ static final int RUNNING = 0;
+ /** Controlled shutdown mode */
+ static final int SHUTDOWN = 1;
+ /** Immediate shutdown mode */
+ static final int STOP = 2;
+ /** Final state */
+ static final int TERMINATED = 3;
+
+ /**
+ * Handler called when saturated or shutdown in execute.
+ */
+ private volatile RejectedExecutionHandler handler;
+
+ /**
+ * Factory for new threads.
+ */
+ private volatile ThreadFactory threadFactory;
+
+ /**
+ * Tracks largest attained pool size.
+ */
+ private int largestPoolSize;
+
+ /**
+ * Counter for completed tasks. Updated only on termination of
+ * worker threads.
+ */
+ private long completedTaskCount;
+
+ /**
+ * The default rejected execution handler
+ */
+ private static final RejectedExecutionHandler defaultHandler =
+ new AbortPolicy();
+
+ /**
+ * Invoke the rejected execution handler for the given command.
+ */
+ void reject(Runnable command) {
+ handler.rejectedExecution(command, this);
+ }
+
+ /**
+ * Create and return a new thread running firstTask as its first
+ * task. Call only while holding mainLock
+ * @param firstTask the task the new thread should run first (or
+ * null if none)
+ * @return the new thread
+ */
+ private Thread addThread(Runnable firstTask) {
+ Worker w = new Worker(firstTask);
+ Thread t = threadFactory.newThread(w);
+ w.thread = t;
+ workers.add(w);
+ int nt = ++poolSize;
+ if (nt > largestPoolSize)
+ largestPoolSize = nt;
+ return t;
+ }
+
+ /**
+ * Create and start a new thread running firstTask as its first
+ * task, only if fewer than corePoolSize threads are running.
+ * @param firstTask the task the new thread should run first (or
+ * null if none)
+ * @return true if successful.
+ */
+ private boolean addIfUnderCorePoolSize(Runnable firstTask) {
+ Thread t = null;
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ if (poolSize < corePoolSize)
+ t = addThread(firstTask);
+ } finally {
+ mainLock.unlock();
+ }
+ if (t == null)
+ return false;
+ t.start();
+ return true;
+ }
+
+ /**
+ * Create and start a new thread only if fewer than maximumPoolSize
+ * threads are running. The new thread runs as its first task the
+ * next task in queue, or if there is none, the given task.
+ * @param firstTask the task the new thread should run first (or
+ * null if none)
+ * @return null on failure, else the first task to be run by new thread.
+ */
+ private Runnable addIfUnderMaximumPoolSize(Runnable firstTask) {
+ Thread t = null;
+ Runnable next = null;
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ if (poolSize < maximumPoolSize) {
+ next = workQueue.poll();
+ if (next == null)
+ next = firstTask;
+ t = addThread(next);
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ if (t == null)
+ return null;
+ t.start();
+ return next;
+ }
+
+
+ /**
+ * Get the next task for a worker thread to run.
+ * @return the task
+ * @throws InterruptedException if interrupted while waiting for task
+ */
+ Runnable getTask() throws InterruptedException {
+ for (;;) {
+ switch(runState) {
+ case RUNNING: {
+ if (poolSize <= corePoolSize) // untimed wait if core
+ return workQueue.take();
+
+ long timeout = keepAliveTime;
+ if (timeout <= 0) // die immediately for 0 timeout
+ return null;
+ Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS);
+ if (r != null)
+ return r;
+ if (poolSize > corePoolSize) // timed out
+ return null;
+ // else, after timeout, pool shrank so shouldn't die, so retry
+ break;
+ }
+
+ case SHUTDOWN: {
+ // Help drain queue
+ Runnable r = workQueue.poll();
+ if (r != null)
+ return r;
+
+ // Check if can terminate
+ if (workQueue.isEmpty()) {
+ interruptIdleWorkers();
+ return null;
+ }
+
+ // There could still be delayed tasks in queue.
+ // Wait for one, re-checking state upon interruption
+ try {
+ return workQueue.take();
+ } catch(InterruptedException ignore) {}
+ break;
+ }
+
+ case STOP:
+ return null;
+ default:
+ assert false;
+ }
+ }
+ }
+
+ /**
+ * Wake up all threads that might be waiting for tasks.
+ */
+ void interruptIdleWorkers() {
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ for (Worker w : workers)
+ w.interruptIfIdle();
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Perform bookkeeping for a terminated worker thread.
+ * @param w the worker
+ */
+ void workerDone(Worker w) {
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ completedTaskCount += w.completedTasks;
+ workers.remove(w);
+ if (--poolSize > 0)
+ return;
+
+ // Else, this is the last thread. Deal with potential shutdown.
+
+ int state = runState;
+ assert state != TERMINATED;
+
+ if (state != STOP) {
+ // If there are queued tasks but no threads, create
+ // replacement.
+ Runnable r = workQueue.poll();
+ if (r != null) {
+ addThread(r).start();
+ return;
+ }
+
+ // If there are some (presumably delayed) tasks but
+ // none pollable, create an idle replacement to wait.
+ if (!workQueue.isEmpty()) {
+ addThread(null).start();
+ return;
+ }
+
+ // Otherwise, we can exit without replacement
+ if (state == RUNNING)
+ return;
+ }
+
+ // Either state is STOP, or state is SHUTDOWN and there is
+ // no work to do. So we can terminate.
+ termination.signalAll();
+ runState = TERMINATED;
+ // fall through to call terminate() outside of lock.
+ } finally {
+ mainLock.unlock();
+ }
+
+ assert runState == TERMINATED;
+ terminated();
+ }
+
+ /**
+ * Worker threads
+ */
+ private class Worker implements Runnable {
+
+ /**
+ * The runLock is acquired and released surrounding each task
+ * execution. It mainly protects against interrupts that are
+ * intended to cancel the worker thread from instead
+ * interrupting the task being run.
+ */
+ private final ReentrantLock runLock = new ReentrantLock();
+
+ /**
+ * Initial task to run before entering run loop
+ */
+ private Runnable firstTask;
+
+ /**
+ * Per thread completed task counter; accumulated
+ * into completedTaskCount upon termination.
+ */
+ volatile long completedTasks;
+
+ /**
+ * Thread this worker is running in. Acts as a final field,
+ * but cannot be set until thread is created.
+ */
+ Thread thread;
+
+ Worker(Runnable firstTask) {
+ this.firstTask = firstTask;
+ }
+
+ boolean isActive() {
+ return runLock.isLocked();
+ }
+
+ /**
+ * Interrupt thread if not running a task
+ */
+ void interruptIfIdle() {
+ final ReentrantLock runLock = this.runLock;
+ if (runLock.tryLock()) {
+ try {
+ thread.interrupt();
+ } finally {
+ runLock.unlock();
+ }
+ }
+ }
+
+ /**
+ * Cause thread to die even if running a task.
+ */
+ void interruptNow() {
+ thread.interrupt();
+ }
+
+ /**
+ * Run a single task between before/after methods.
+ */
+ private void runTask(Runnable task) {
+ final ReentrantLock runLock = this.runLock;
+ runLock.lock();
+ try {
+ // Abort now if immediate cancel. Otherwise, we have
+ // committed to run this task.
+ if (runState == STOP)
+ return;
+
+ Thread.interrupted(); // clear interrupt status on entry
+ boolean ran = false;
+ beforeExecute(thread, task);
+ try {
+ task.run();
+ ran = true;
+ afterExecute(task, null);
+ ++completedTasks;
+ } catch(RuntimeException ex) {
+ if (!ran)
+ afterExecute(task, ex);
+ // Else the exception occurred within
+ // afterExecute itself in which case we don't
+ // want to call it again.
+ throw ex;
+ }
+ } finally {
+ runLock.unlock();
+ }
+ }
+
+ /**
+ * Main run loop
+ */
+ public void run() {
+ try {
+ Runnable task = firstTask;
+ firstTask = null;
+ while (task != null || (task = getTask()) != null) {
+ runTask(task);
+ task = null; // unnecessary but can help GC
+ }
+ } catch(InterruptedException ie) {
+ // fall through
+ } finally {
+ workerDone(this);
+ }
+ }
+ }
+
+ // Public methods
+
+ /**
+ * Creates a new <tt>ThreadPoolExecutor</tt> with the given
+ * initial parameters and default thread factory and handler. It
+ * may be more convenient to use one of the {@link Executors}
+ * factory methods instead of this general purpose constructor.
+ *
+ * @param corePoolSize the number of threads to keep in the
+ * pool, even if they are idle.
+ * @param maximumPoolSize the maximum number of threads to allow in the
+ * pool.
+ * @param keepAliveTime when the number of threads is greater than
+ * the core, this is the maximum time that excess idle threads
+ * will wait for new tasks before terminating.
+ * @param unit the time unit for the keepAliveTime
+ * argument.
+ * @param workQueue the queue to use for holding tasks before they
+ * are executed. This queue will hold only the <tt>Runnable</tt>
+ * tasks submitted by the <tt>execute</tt> method.
+ * @throws IllegalArgumentException if corePoolSize, or
+ * keepAliveTime less than zero, or if maximumPoolSize less than or
+ * equal to zero, or if corePoolSize greater than maximumPoolSize.
+ * @throws NullPointerException if <tt>workQueue</tt> is null
+ */
+ public ThreadPoolExecutor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue) {
+ this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
+ Executors.defaultThreadFactory(), defaultHandler);
+ }
+
+ /**
+ * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
+ * parameters.
+ *
+ * @param corePoolSize the number of threads to keep in the
+ * pool, even if they are idle.
+ * @param maximumPoolSize the maximum number of threads to allow in the
+ * pool.
+ * @param keepAliveTime when the number of threads is greater than
+ * the core, this is the maximum time that excess idle threads
+ * will wait for new tasks before terminating.
+ * @param unit the time unit for the keepAliveTime
+ * argument.
+ * @param workQueue the queue to use for holding tasks before they
+ * are executed. This queue will hold only the <tt>Runnable</tt>
+ * tasks submitted by the <tt>execute</tt> method.
+ * @param threadFactory the factory to use when the executor
+ * creates a new thread.
+ * @throws IllegalArgumentException if corePoolSize, or
+ * keepAliveTime less than zero, or if maximumPoolSize less than or
+ * equal to zero, or if corePoolSize greater than maximumPoolSize.
+ * @throws NullPointerException if <tt>workQueue</tt>
+ * or <tt>threadFactory</tt> are null.
+ */
+ public ThreadPoolExecutor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ ThreadFactory threadFactory) {
+ this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
+ threadFactory, defaultHandler);
+ }
+
+ /**
+ * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
+ * parameters.
+ *
+ * @param corePoolSize the number of threads to keep in the
+ * pool, even if they are idle.
+ * @param maximumPoolSize the maximum number of threads to allow in the
+ * pool.
+ * @param keepAliveTime when the number of threads is greater than
+ * the core, this is the maximum time that excess idle threads
+ * will wait for new tasks before terminating.
+ * @param unit the time unit for the keepAliveTime
+ * argument.
+ * @param workQueue the queue to use for holding tasks before they
+ * are executed. This queue will hold only the <tt>Runnable</tt>
+ * tasks submitted by the <tt>execute</tt> method.
+ * @param handler the handler to use when execution is blocked
+ * because the thread bounds and queue capacities are reached.
+ * @throws IllegalArgumentException if corePoolSize, or
+ * keepAliveTime less than zero, or if maximumPoolSize less than or
+ * equal to zero, or if corePoolSize greater than maximumPoolSize.
+ * @throws NullPointerException if <tt>workQueue</tt>
+ * or <tt>handler</tt> are null.
+ */
+ public ThreadPoolExecutor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ RejectedExecutionHandler handler) {
+ this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
+ Executors.defaultThreadFactory(), handler);
+ }
+
+ /**
+ * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
+ * parameters.
+ *
+ * @param corePoolSize the number of threads to keep in the
+ * pool, even if they are idle.
+ * @param maximumPoolSize the maximum number of threads to allow in the
+ * pool.
+ * @param keepAliveTime when the number of threads is greater than
+ * the core, this is the maximum time that excess idle threads
+ * will wait for new tasks before terminating.
+ * @param unit the time unit for the keepAliveTime
+ * argument.
+ * @param workQueue the queue to use for holding tasks before they
+ * are executed. This queue will hold only the <tt>Runnable</tt>
+ * tasks submitted by the <tt>execute</tt> method.
+ * @param threadFactory the factory to use when the executor
+ * creates a new thread.
+ * @param handler the handler to use when execution is blocked
+ * because the thread bounds and queue capacities are reached.
+ * @throws IllegalArgumentException if corePoolSize, or
+ * keepAliveTime less than zero, or if maximumPoolSize less than or
+ * equal to zero, or if corePoolSize greater than maximumPoolSize.
+ * @throws NullPointerException if <tt>workQueue</tt>
+ * or <tt>threadFactory</tt> or <tt>handler</tt> are null.
+ */
+ public ThreadPoolExecutor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ ThreadFactory threadFactory,
+ RejectedExecutionHandler handler) {
+ if (corePoolSize < 0 ||
+ maximumPoolSize <= 0 ||
+ maximumPoolSize < corePoolSize ||
+ keepAliveTime < 0)
+ throw new IllegalArgumentException();
+ if (workQueue == null || threadFactory == null || handler == null)
+ throw new NullPointerException();
+ this.corePoolSize = corePoolSize;
+ this.maximumPoolSize = maximumPoolSize;
+ this.workQueue = workQueue;
+ this.keepAliveTime = unit.toNanos(keepAliveTime);
+ this.threadFactory = threadFactory;
+ this.handler = handler;
+ }
+
+
+ /**
+ * Executes the given task sometime in the future. The task
+ * may execute in a new thread or in an existing pooled thread.
+ *
+ * If the task cannot be submitted for execution, either because this
+ * executor has been shutdown or because its capacity has been reached,
+ * the task is handled by the current <tt>RejectedExecutionHandler</tt>.
+ *
+ * @param command the task to execute
+ * @throws RejectedExecutionException at discretion of
+ * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
+ * for execution
+ * @throws NullPointerException if command is null
+ */
+ public void execute(Runnable command) {
+ if (command == null)
+ throw new NullPointerException();
+ for (;;) {
+ if (runState != RUNNING) {
+ reject(command);
+ return;
+ }
+ if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
+ return;
+ if (workQueue.offer(command))
+ return;
+ Runnable r = addIfUnderMaximumPoolSize(command);
+ if (r == command)
+ return;
+ if (r == null) {
+ reject(command);
+ return;
+ }
+ // else retry
+ }
+ }
+
+ /**
+ * Initiates an orderly shutdown in which previously submitted
+ * tasks are executed, but no new tasks will be
+ * accepted. Invocation has no additional effect if already shut
+ * down.
+ * @throws SecurityException if a security manager exists and
+ * shutting down this ExecutorService may manipulate threads that
+ * the caller is not permitted to modify because it does not hold
+ * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
+ * or the security manager's <tt>checkAccess</tt> method denies access.
+ */
+ public void shutdown() {
+ // Fail if caller doesn't have modifyThread permission
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
+ java.security.AccessController.checkPermission(shutdownPerm);
+
+ boolean fullyTerminated = false;
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ if (workers.size() > 0) {
+ // Check if caller can modify worker threads. This
+ // might not be true even if passed above check, if
+ // the SecurityManager treats some threads specially.
+ if (security != null) {
+ for (Worker w: workers)
+ security.checkAccess(w.thread);
+ }
+
+ int state = runState;
+ if (state == RUNNING) // don't override shutdownNow
+ runState = SHUTDOWN;
+
+ try {
+ for (Worker w: workers)
+ w.interruptIfIdle();
+ } catch(SecurityException se) {
+ // If SecurityManager allows above checks, but
+ // then unexpectedly throws exception when
+ // interrupting threads (which it ought not do),
+ // back out as cleanly as we can. Some threads may
+ // have been killed but we remain in non-shutdown
+ // state.
+ runState = state;
+ throw se;
+ }
+ }
+ else { // If no workers, trigger full termination now
+ fullyTerminated = true;
+ runState = TERMINATED;
+ termination.signalAll();
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ if (fullyTerminated)
+ terminated();
+ }
+
+
+ /**
+ * Attempts to stop all actively executing tasks, halts the
+ * processing of waiting tasks, and returns a list of the tasks that were
+ * awaiting execution.
+ *
+ * <p>This implementation cancels tasks via {@link
+ * Thread#interrupt}, so if any tasks mask or fail to respond to
+ * interrupts, they may never terminate.
+ *
+ * @return list of tasks that never commenced execution
+ * @throws SecurityException if a security manager exists and
+ * shutting down this ExecutorService may manipulate threads that
+ * the caller is not permitted to modify because it does not hold
+ * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
+ * or the security manager's <tt>checkAccess</tt> method denies access.
+ */
+ public List<Runnable> shutdownNow() {
+ // Almost the same code as shutdown()
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
+ java.security.AccessController.checkPermission(shutdownPerm);
+
+ boolean fullyTerminated = false;
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ if (workers.size() > 0) {
+ if (security != null) {
+ for (Worker w: workers)
+ security.checkAccess(w.thread);
+ }
+
+ int state = runState;
+ if (state != TERMINATED)
+ runState = STOP;
+ try {
+ for (Worker w : workers)
+ w.interruptNow();
+ } catch(SecurityException se) {
+ runState = state; // back out;
+ throw se;
+ }
+ }
+ else { // If no workers, trigger full termination now
+ fullyTerminated = true;
+ runState = TERMINATED;
+ termination.signalAll();
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ if (fullyTerminated)
+ terminated();
+ return Arrays.asList(workQueue.toArray(EMPTY_RUNNABLE_ARRAY));
+ }
+
+ public boolean isShutdown() {
+ return runState != RUNNING;
+ }
+
+ /**
+ * Returns true if this executor is in the process of terminating
+ * after <tt>shutdown</tt> or <tt>shutdownNow</tt> but has not
+ * completely terminated. This method may be useful for
+ * debugging. A return of <tt>true</tt> reported a sufficient
+ * period after shutdown may indicate that submitted tasks have
+ * ignored or suppressed interruption, causing this executor not
+ * to properly terminate.
+ * @return true if terminating but not yet terminated.
+ */
+ public boolean isTerminating() {
+ return runState == STOP;
+ }
+
+ public boolean isTerminated() {
+ return runState == TERMINATED;
+ }
+
+ public boolean awaitTermination(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ long nanos = unit.toNanos(timeout);
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ for (;;) {
+ if (runState == TERMINATED)
+ return true;
+ if (nanos <= 0)
+ return false;
+ nanos = termination.awaitNanos(nanos);
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Invokes <tt>shutdown</tt> when this executor is no longer
+ * referenced.
+ */
+ protected void finalize() {
+ shutdown();
+ }
+
+ /**
+ * Sets the thread factory used to create new threads.
+ *
+ * @param threadFactory the new thread factory
+ * @throws NullPointerException if threadFactory is null
+ * @see #getThreadFactory
+ */
+ public void setThreadFactory(ThreadFactory threadFactory) {
+ if (threadFactory == null)
+ throw new NullPointerException();
+ this.threadFactory = threadFactory;
+ }
+
+ /**
+ * Returns the thread factory used to create new threads.
+ *
+ * @return the current thread factory
+ * @see #setThreadFactory
+ */
+ public ThreadFactory getThreadFactory() {
+ return threadFactory;
+ }
+
+ /**
+ * Sets a new handler for unexecutable tasks.
+ *
+ * @param handler the new handler
+ * @throws NullPointerException if handler is null
+ * @see #getRejectedExecutionHandler
+ */
+ public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
+ if (handler == null)
+ throw new NullPointerException();
+ this.handler = handler;
+ }
+
+ /**
+ * Returns the current handler for unexecutable tasks.
+ *
+ * @return the current handler
+ * @see #setRejectedExecutionHandler
+ */
+ public RejectedExecutionHandler getRejectedExecutionHandler() {
+ return handler;
+ }
+
+ /**
+ * Returns the task queue used by this executor. Access to the
+ * task queue is intended primarily for debugging and monitoring.
+ * This queue may be in active use. Retrieving the task queue
+ * does not prevent queued tasks from executing.
+ *
+ * @return the task queue
+ */
+ public BlockingQueue<Runnable> getQueue() {
+ return workQueue;
+ }
+
+ /**
+ * Removes this task from the executor's internal queue if it is
+ * present, thus causing it not to be run if it has not already
+ * started.
+ *
+ * <p> This method may be useful as one part of a cancellation
+ * scheme. It may fail to remove tasks that have been converted
+ * into other forms before being placed on the internal queue. For
+ * example, a task entered using <tt>submit</tt> might be
+ * converted into a form that maintains <tt>Future</tt> status.
+ * However, in such cases, method {@link ThreadPoolExecutor#purge}
+ * may be used to remove those Futures that have been cancelled.
+ *
+ *
+ * @param task the task to remove
+ * @return true if the task was removed
+ */
+ public boolean remove(Runnable task) {
+ return getQueue().remove(task);
+ }
+
+
+ /**
+ * Tries to remove from the work queue all {@link Future}
+ * tasks that have been cancelled. This method can be useful as a
+ * storage reclamation operation, that has no other impact on
+ * functionality. Cancelled tasks are never executed, but may
+ * accumulate in work queues until worker threads can actively
+ * remove them. Invoking this method instead tries to remove them now.
+ * However, this method may fail to remove tasks in
+ * the presence of interference by other threads.
+ */
+ public void purge() {
+ // Fail if we encounter interference during traversal
+ try {
+ Iterator<Runnable> it = getQueue().iterator();
+ while (it.hasNext()) {
+ Runnable r = it.next();
+ if (r instanceof Future<?>) {
+ Future<?> c = (Future<?>)r;
+ if (c.isCancelled())
+ it.remove();
+ }
+ }
+ }
+ catch(ConcurrentModificationException ex) {
+ return;
+ }
+ }
+
+ /**
+ * Sets the core number of threads. This overrides any value set
+ * in the constructor. If the new value is smaller than the
+ * current value, excess existing threads will be terminated when
+ * they next become idle. If larger, new threads will, if needed,
+ * be started to execute any queued tasks.
+ *
+ * @param corePoolSize the new core size
+ * @throws IllegalArgumentException if <tt>corePoolSize</tt>
+ * less than zero
+ * @see #getCorePoolSize
+ */
+ public void setCorePoolSize(int corePoolSize) {
+ if (corePoolSize < 0)
+ throw new IllegalArgumentException();
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ int extra = this.corePoolSize - corePoolSize;
+ this.corePoolSize = corePoolSize;
+ if (extra < 0) {
+ Runnable r;
+ while (extra++ < 0 && poolSize < corePoolSize &&
+ (r = workQueue.poll()) != null)
+ addThread(r).start();
+ }
+ else if (extra > 0 && poolSize > corePoolSize) {
+ Iterator<Worker> it = workers.iterator();
+ while (it.hasNext() &&
+ extra-- > 0 &&
+ poolSize > corePoolSize &&
+ workQueue.remainingCapacity() == 0)
+ it.next().interruptIfIdle();
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Returns the core number of threads.
+ *
+ * @return the core number of threads
+ * @see #setCorePoolSize
+ */
+ public int getCorePoolSize() {
+ return corePoolSize;
+ }
+
+ /**
+ * Starts a core thread, causing it to idly wait for work. This
+ * overrides the default policy of starting core threads only when
+ * new tasks are executed. This method will return <tt>false</tt>
+ * if all core threads have already been started.
+ * @return true if a thread was started
+ */
+ public boolean prestartCoreThread() {
+ return addIfUnderCorePoolSize(null);
+ }
+
+ /**
+ * Starts all core threads, causing them to idly wait for work. This
+ * overrides the default policy of starting core threads only when
+ * new tasks are executed.
+ * @return the number of threads started.
+ */
+ public int prestartAllCoreThreads() {
+ int n = 0;
+ while (addIfUnderCorePoolSize(null))
+ ++n;
+ return n;
+ }
+
+ /**
+ * Sets the maximum allowed number of threads. This overrides any
+ * value set in the constructor. If the new value is smaller than
+ * the current value, excess existing threads will be
+ * terminated when they next become idle.
+ *
+ * @param maximumPoolSize the new maximum
+ * @throws IllegalArgumentException if maximumPoolSize less than zero or
+ * the {@link #getCorePoolSize core pool size}
+ * @see #getMaximumPoolSize
+ */
+ public void setMaximumPoolSize(int maximumPoolSize) {
+ if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
+ throw new IllegalArgumentException();
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ int extra = this.maximumPoolSize - maximumPoolSize;
+ this.maximumPoolSize = maximumPoolSize;
+ if (extra > 0 && poolSize > maximumPoolSize) {
+ Iterator<Worker> it = workers.iterator();
+ while (it.hasNext() &&
+ extra > 0 &&
+ poolSize > maximumPoolSize) {
+ it.next().interruptIfIdle();
+ --extra;
+ }
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Returns the maximum allowed number of threads.
+ *
+ * @return the maximum allowed number of threads
+ * @see #setMaximumPoolSize
+ */
+ public int getMaximumPoolSize() {
+ return maximumPoolSize;
+ }
+
+ /**
+ * Sets the time limit for which threads may remain idle before
+ * being terminated. If there are more than the core number of
+ * threads currently in the pool, after waiting this amount of
+ * time without processing a task, excess threads will be
+ * terminated. This overrides any value set in the constructor.
+ * @param time the time to wait. A time value of zero will cause
+ * excess threads to terminate immediately after executing tasks.
+ * @param unit the time unit of the time argument
+ * @throws IllegalArgumentException if time less than zero
+ * @see #getKeepAliveTime
+ */
+ public void setKeepAliveTime(long time, TimeUnit unit) {
+ if (time < 0)
+ throw new IllegalArgumentException();
+ this.keepAliveTime = unit.toNanos(time);
+ }
+
+ /**
+ * Returns the thread keep-alive time, which is the amount of time
+ * which threads in excess of the core pool size may remain
+ * idle before being terminated.
+ *
+ * @param unit the desired time unit of the result
+ * @return the time limit
+ * @see #setKeepAliveTime
+ */
+ public long getKeepAliveTime(TimeUnit unit) {
+ return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
+ }
+
+ /* Statistics */
+
+ /**
+ * Returns the current number of threads in the pool.
+ *
+ * @return the number of threads
+ */
+ public int getPoolSize() {
+ return poolSize;
+ }
+
+ /**
+ * Returns the approximate number of threads that are actively
+ * executing tasks.
+ *
+ * @return the number of threads
+ */
+ public int getActiveCount() {
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ int n = 0;
+ for (Worker w : workers) {
+ if (w.isActive())
+ ++n;
+ }
+ return n;
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Returns the largest number of threads that have ever
+ * simultaneously been in the pool.
+ *
+ * @return the number of threads
+ */
+ public int getLargestPoolSize() {
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ return largestPoolSize;
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Returns the approximate total number of tasks that have been
+ * scheduled for execution. Because the states of tasks and
+ * threads may change dynamically during computation, the returned
+ * value is only an approximation, but one that does not ever
+ * decrease across successive calls.
+ *
+ * @return the number of tasks
+ */
+ public long getTaskCount() {
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ long n = completedTaskCount;
+ for (Worker w : workers) {
+ n += w.completedTasks;
+ if (w.isActive())
+ ++n;
+ }
+ return n + workQueue.size();
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Returns the approximate total number of tasks that have
+ * completed execution. Because the states of tasks and threads
+ * may change dynamically during computation, the returned value
+ * is only an approximation, but one that does not ever decrease
+ * across successive calls.
+ *
+ * @return the number of tasks
+ */
+ public long getCompletedTaskCount() {
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ long n = completedTaskCount;
+ for (Worker w : workers)
+ n += w.completedTasks;
+ return n;
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Method invoked prior to executing the given Runnable in the
+ * given thread. This method is invoked by thread <tt>t</tt> that
+ * will execute task <tt>r</tt>, and may be used to re-initialize
+ * ThreadLocals, or to perform logging. Note: To properly nest
+ * multiple overridings, subclasses should generally invoke
+ * <tt>super.beforeExecute</tt> at the end of this method.
+ *
+ * @param t the thread that will run task r.
+ * @param r the task that will be executed.
+ */
+ protected void beforeExecute(Thread t, Runnable r) { }
+
+ /**
+ * Method invoked upon completion of execution of the given
+ * Runnable. This method is invoked by the thread that executed
+ * the task. If non-null, the Throwable is the uncaught exception
+ * that caused execution to terminate abruptly. Note: To properly
+ * nest multiple overridings, subclasses should generally invoke
+ * <tt>super.afterExecute</tt> at the beginning of this method.
+ *
+ * @param r the runnable that has completed.
+ * @param t the exception that caused termination, or null if
+ * execution completed normally.
+ */
+ protected void afterExecute(Runnable r, Throwable t) { }
+
+ /**
+ * Method invoked when the Executor has terminated. Default
+ * implementation does nothing. Note: To properly nest multiple
+ * overridings, subclasses should generally invoke
+ * <tt>super.terminated</tt> within this method.
+ */
+ protected void terminated() { }
+
+ /**
+ * A handler for rejected tasks that runs the rejected task
+ * directly in the calling thread of the <tt>execute</tt> method,
+ * unless the executor has been shut down, in which case the task
+ * is discarded.
+ */
+ public static class CallerRunsPolicy implements RejectedExecutionHandler {
+ /**
+ * Creates a <tt>CallerRunsPolicy</tt>.
+ */
+ public CallerRunsPolicy() { }
+
+ /**
+ * Executes task r in the caller's thread, unless the executor
+ * has been shut down, in which case the task is discarded.
+ * @param r the runnable task requested to be executed
+ * @param e the executor attempting to execute this task
+ */
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
+ if (!e.isShutdown()) {
+ r.run();
+ }
+ }
+ }
+
+ /**
+ * A handler for rejected tasks that throws a
+ * <tt>RejectedExecutionException</tt>.
+ */
+ public static class AbortPolicy implements RejectedExecutionHandler {
+ /**
+ * Creates an <tt>AbortPolicy</tt>.
+ */
+ public AbortPolicy() { }
+
+ /**
+ * Always throws RejectedExecutionException.
+ * @param r the runnable task requested to be executed
+ * @param e the executor attempting to execute this task
+ * @throws RejectedExecutionException always.
+ */
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
+ throw new RejectedExecutionException();
+ }
+ }
+
+ /**
+ * A handler for rejected tasks that silently discards the
+ * rejected task.
+ */
+ public static class DiscardPolicy implements RejectedExecutionHandler {
+ /**
+ * Creates a <tt>DiscardPolicy</tt>.
+ */
+ public DiscardPolicy() { }
+
+ /**
+ * Does nothing, which has the effect of discarding task r.
+ * @param r the runnable task requested to be executed
+ * @param e the executor attempting to execute this task
+ */
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
+ }
+ }
+
+ /**
+ * A handler for rejected tasks that discards the oldest unhandled
+ * request and then retries <tt>execute</tt>, unless the executor
+ * is shut down, in which case the task is discarded.
+ */
+ public static class DiscardOldestPolicy implements RejectedExecutionHandler {
+ /**
+ * Creates a <tt>DiscardOldestPolicy</tt> for the given executor.
+ */
+ public DiscardOldestPolicy() { }
+
+ /**
+ * Obtains and ignores the next task that the executor
+ * would otherwise execute, if one is immediately available,
+ * and then retries execution of task r, unless the executor
+ * is shut down, in which case task r is instead discarded.
+ * @param r the runnable task requested to be executed
+ * @param e the executor attempting to execute this task
+ */
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
+ if (!e.isShutdown()) {
+ e.getQueue().poll();
+ e.execute(r);
+ }
+ }
+ }
+}
Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ThreadPoolExecutor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/TimeUnit.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/TimeUnit.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/TimeUnit.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/TimeUnit.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,231 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+
+/**
+ * A <tt>TimeUnit</tt> represents time durations at a given unit of
+ * granularity and provides utility methods to convert across units,
+ * and to perform timing and delay operations in these units. A
+ * <tt>TimeUnit</tt> does not maintain time information, but only
+ * helps organize and use time representations that may be maintained
+ * separately across various contexts.
+ *
+ * <p>A <tt>TimeUnit</tt> is mainly used to inform time-based methods
+ * how a given timing parameter should be interpreted. For example,
+ * the following code will timeout in 50 milliseconds if the {@link
+ * java.util.concurrent.locks.Lock lock} is not available:
+ *
+ * <pre> Lock lock = ...;
+ * if ( lock.tryLock(50L, TimeUnit.MILLISECONDS) ) ...
+ * </pre>
+ * while this code will timeout in 50 seconds:
+ * <pre>
+ * Lock lock = ...;
+ * if ( lock.tryLock(50L, TimeUnit.SECONDS) ) ...
+ * </pre>
+ *
+ * Note however, that there is no guarantee that a particular timeout
+ * implementation will be able to notice the passage of time at the
+ * same granularity as the given <tt>TimeUnit</tt>.
+ *
+ * @since 1.5
+ * @author Doug Lea
+ */
+public enum TimeUnit {
+ NANOSECONDS(0), MICROSECONDS(1), MILLISECONDS(2), SECONDS(3);
+
+ /** the index of this unit */
+ private final int index;
+
+ /** Internal constructor */
+ TimeUnit(int index) {
+ this.index = index;
+ }
+
+ /** Lookup table for conversion factors */
+ private static final int[] multipliers = {
+ 1,
+ 1000,
+ 1000 * 1000,
+ 1000 * 1000 * 1000
+ };
+
+ /**
+ * Lookup table to check saturation. Note that because we are
+ * dividing these down, we don't have to deal with asymmetry of
+ * MIN/MAX values.
+ */
+ private static final long[] overflows = {
+ 0, // unused
+ Long.MAX_VALUE / 1000,
+ Long.MAX_VALUE / (1000 * 1000),
+ Long.MAX_VALUE / (1000 * 1000 * 1000)
+ };
+
+ /**
+ * Perform conversion based on given delta representing the
+ * difference between units
+ * @param delta the difference in index values of source and target units
+ * @param duration the duration
+ * @return converted duration or saturated value
+ */
+ private static long doConvert(int delta, long duration) {
+ if (delta == 0)
+ return duration;
+ if (delta < 0)
+ return duration / multipliers[-delta];
+ if (duration > overflows[delta])
+ return Long.MAX_VALUE;
+ if (duration < -overflows[delta])
+ return Long.MIN_VALUE;
+ return duration * multipliers[delta];
+ }
+
+ /**
+ * Convert the given time duration in the given unit to this
+ * unit. Conversions from finer to coarser granularities
+ * truncate, so lose precision. For example converting
+ * <tt>999</tt> milliseconds to seconds results in
+ * <tt>0</tt>. Conversions from coarser to finer granularities
+ * with arguments that would numerically overflow saturate to
+ * <tt>Long.MIN_VALUE</tt> if negative or <tt>Long.MAX_VALUE</tt>
+ * if positive.
+ *
+ * @param duration the time duration in the given <tt>unit</tt>
+ * @param unit the unit of the <tt>duration</tt> argument
+ * @return the converted duration in this unit,
+ * or <tt>Long.MIN_VALUE</tt> if conversion would negatively
+ * overflow, or <tt>Long.MAX_VALUE</tt> if it would positively overflow.
+ */
+ public long convert(long duration, TimeUnit unit) {
+ return doConvert(unit.index - index, duration);
+ }
+
+ /**
+ * Equivalent to <tt>NANOSECONDS.convert(duration, this)</tt>.
+ * @param duration the duration
+ * @return the converted duration,
+ * or <tt>Long.MIN_VALUE</tt> if conversion would negatively
+ * overflow, or <tt>Long.MAX_VALUE</tt> if it would positively overflow.
+ * @see #convert
+ */
+ public long toNanos(long duration) {
+ return doConvert(index, duration);
+ }
+
+ /**
+ * Equivalent to <tt>MICROSECONDS.convert(duration, this)</tt>.
+ * @param duration the duration
+ * @return the converted duration,
+ * or <tt>Long.MIN_VALUE</tt> if conversion would negatively
+ * overflow, or <tt>Long.MAX_VALUE</tt> if it would positively overflow.
+ * @see #convert
+ */
+ public long toMicros(long duration) {
+ return doConvert(index - MICROSECONDS.index, duration);
+ }
+
+ /**
+ * Equivalent to <tt>MILLISECONDS.convert(duration, this)</tt>.
+ * @param duration the duration
+ * @return the converted duration,
+ * or <tt>Long.MIN_VALUE</tt> if conversion would negatively
+ * overflow, or <tt>Long.MAX_VALUE</tt> if it would positively overflow.
+ * @see #convert
+ */
+ public long toMillis(long duration) {
+ return doConvert(index - MILLISECONDS.index, duration);
+ }
+
+ /**
+ * Equivalent to <tt>SECONDS.convert(duration, this)</tt>.
+ * @param duration the duration
+ * @return the converted duration.
+ * @see #convert
+ */
+ public long toSeconds(long duration) {
+ return doConvert(index - SECONDS.index, duration);
+ }
+
+
+ /**
+ * Utility method to compute the excess-nanosecond argument to
+ * wait, sleep, join.
+ */
+ private int excessNanos(long time, long ms) {
+ if (this == NANOSECONDS)
+ return (int) (time - (ms * 1000 * 1000));
+ if (this == MICROSECONDS)
+ return (int) ((time * 1000) - (ms * 1000 * 1000));
+ return 0;
+ }
+
+ /**
+ * Perform a timed <tt>Object.wait</tt> using this time unit.
+ * This is a convenience method that converts timeout arguments
+ * into the form required by the <tt>Object.wait</tt> method.
+ *
+ * <p>For example, you could implement a blocking <tt>poll</tt>
+ * method (see {@link BlockingQueue#poll BlockingQueue.poll})
+ * using:
+ *
+ * <pre> public synchronized Object poll(long timeout, TimeUnit unit) throws InterruptedException {
+ * while (empty) {
+ * unit.timedWait(this, timeout);
+ * ...
+ * }
+ * }</pre>
+ *
+ * @param obj the object to wait on
+ * @param timeout the maximum time to wait.
+ * @throws InterruptedException if interrupted while waiting.
+ * @see Object#wait(long, int)
+ */
+ public void timedWait(Object obj, long timeout)
+ throws InterruptedException {
+ if (timeout > 0) {
+ long ms = toMillis(timeout);
+ int ns = excessNanos(timeout, ms);
+ obj.wait(ms, ns);
+ }
+ }
+
+ /**
+ * Perform a timed <tt>Thread.join</tt> using this time unit.
+ * This is a convenience method that converts time arguments into the
+ * form required by the <tt>Thread.join</tt> method.
+ * @param thread the thread to wait for
+ * @param timeout the maximum time to wait
+ * @throws InterruptedException if interrupted while waiting.
+ * @see Thread#join(long, int)
+ */
+ public void timedJoin(Thread thread, long timeout)
+ throws InterruptedException {
+ if (timeout > 0) {
+ long ms = toMillis(timeout);
+ int ns = excessNanos(timeout, ms);
+ thread.join(ms, ns);
+ }
+ }
+
+ /**
+ * Perform a <tt>Thread.sleep</tt> using this unit.
+ * This is a convenience method that converts time arguments into the
+ * form required by the <tt>Thread.sleep</tt> method.
+ * @param timeout the minimum time to sleep
+ * @throws InterruptedException if interrupted while sleeping.
+ * @see Thread#sleep
+ */
+ public void sleep(long timeout) throws InterruptedException {
+ if (timeout > 0) {
+ long ms = toMillis(timeout);
+ int ns = excessNanos(timeout, ms);
+ Thread.sleep(ms, ns);
+ }
+ }
+
+}
Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/TimeUnit.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/TimeoutException.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/TimeoutException.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/TimeoutException.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/TimeoutException.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,38 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+
+/**
+ * Exception thrown when a blocking operation times out. Blocking
+ * operations for which a timeout is specified need a means to
+ * indicate that the timeout has occurred. For many such operations it
+ * is possible to return a value that indicates timeout; when that is
+ * not possible or desirable then <tt>TimeoutException</tt> should be
+ * declared and thrown.
+ *
+ * @since 1.5
+ * @author Doug Lea
+ */
+public class TimeoutException extends Exception {
+ private static final long serialVersionUID = 1900926677490660714L;
+
+ /**
+ * Constructs a <tt>TimeoutException</tt> with no specified detail
+ * message.
+ */
+ public TimeoutException() {}
+
+ /**
+ * Constructs a <tt>TimeoutException</tt> with the specified detail
+ * message.
+ *
+ * @param message the detail message
+ */
+ public TimeoutException(String message) {
+ super(message);
+ }
+}
Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/TimeoutException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/atomic/AtomicBoolean.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/atomic/AtomicBoolean.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/atomic/AtomicBoolean.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/atomic/AtomicBoolean.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,123 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent.atomic;
+import sun.misc.Unsafe;
+
+/**
+ * A <tt>boolean</tt> value that may be updated atomically. See the
+ * {@link java.util.concurrent.atomic} package specification for
+ * description of the properties of atomic variables. An
+ * <tt>AtomicBoolean</tt> is used in applications such as atomically
+ * updated flags, and cannot be used as a replacement for a
+ * {@link java.lang.Boolean}.
+ *
+ * @since 1.5
+ * @author Doug Lea
+ */
+public class AtomicBoolean implements java.io.Serializable {
+ private static final long serialVersionUID = 4654671469794556979L;
+ // setup to use Unsafe.compareAndSwapInt for updates
+ private static final Unsafe unsafe = Unsafe.getUnsafe();
+ private static final long valueOffset;
+
+ static {
+ try {
+ valueOffset = unsafe.objectFieldOffset
+ (AtomicBoolean.class.getDeclaredField("value"));
+ } catch (Exception ex) { throw new Error(ex); }
+ }
+
+ private volatile int value;
+
+ /**
+ * Creates a new <tt>AtomicBoolean</tt> with the given initial value.
+ *
+ * @param initialValue the initial value
+ */
+ public AtomicBoolean(boolean initialValue) {
+ value = initialValue ? 1 : 0;
+ }
+
+ /**
+ * Creates a new <tt>AtomicBoolean</tt> with initial value <tt>false</tt>.
+ */
+ public AtomicBoolean() {
+ }
+
+ /**
+ * Returns the current value.
+ *
+ * @return the current value
+ */
+ public final boolean get() {
+ return value != 0;
+ }
+
+ /**
+ * Atomically sets the value to the given update value if the
+ * current value is equal to the expected value. Any given
+ * invocation of this operation may fail (return
+ * <tt>false</tt>) spuriously, but repeated invocation when
+ * the current value holds the expected value and no other thread
+ * is also attempting to set the value will eventually succeed.
+ *
+ * @param expect the expected value
+ * @param update the new value
+ * @return true if successful
+ */
+ public final boolean compareAndSet(boolean expect, boolean update) {
+ int e = expect ? 1 : 0;
+ int u = update ? 1 : 0;
+ return unsafe.compareAndSwapInt(this, valueOffset, e, u);
+ }
+
+ /**
+ * Atomically set the value to the given updated value
+ * if the current value <tt>==</tt> the expected value.
+ * May fail spuriously.
+ * @param expect the expected value
+ * @param update the new value
+ * @return true if successful.
+ */
+ public boolean weakCompareAndSet(boolean expect, boolean update) {
+ int e = expect ? 1 : 0;
+ int u = update ? 1 : 0;
+ return unsafe.compareAndSwapInt(this, valueOffset, e, u);
+ }
+
+ /**
+ * Unconditionally sets to the given value.
+ *
+ * @param newValue the new value
+ */
+ public final void set(boolean newValue) {
+ value = newValue ? 1 : 0;
+ }
+
+ /**
+ * Sets to the given value and returns the previous value.
+ *
+ * @param newValue the new value
+ * @return the previous value
+ */
+ public final boolean getAndSet(boolean newValue) {
+ for (;;) {
+ boolean current = get();
+ if (compareAndSet(current, newValue))
+ return current;
+ }
+ }
+
+ /**
+ * Returns the String representation of the current value.
+ * @return the String representation of the current value.
+ */
+ public String toString() {
+ return Boolean.toString(get());
+ }
+
+}
Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/atomic/AtomicBoolean.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/atomic/AtomicInteger.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/atomic/AtomicInteger.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/atomic/AtomicInteger.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/atomic/AtomicInteger.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,221 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent.atomic;
+import sun.misc.Unsafe;
+
+/**
+ * An <tt>int</tt> value that may be updated atomically. See the
+ * {@link java.util.concurrent.atomic} package specification for
+ * description of the properties of atomic variables. An
+ * <tt>AtomicInteger</tt> is used in applications such as atomically
+ * incremented counters, and cannot be used as a replacement for an
+ * {@link java.lang.Integer}. However, this class does extend
+ * <tt>Number</tt> to allow uniform access by tools and utilities that
+ * deal with numerically-based classes.
+ *
+ *
+ * @since 1.5
+ * @author Doug Lea
+*/
+public class AtomicInteger extends Number implements java.io.Serializable {
+ private static final long serialVersionUID = 6214790243416807050L;
+
+ // setup to use Unsafe.compareAndSwapInt for updates
+ private static final Unsafe unsafe = Unsafe.getUnsafe();
+ private static final long valueOffset;
+
+ static {
+ try {
+ valueOffset = unsafe.objectFieldOffset
+ (AtomicInteger.class.getDeclaredField("value"));
+ } catch(Exception ex) { throw new Error(ex); }
+ }
+
+ private volatile int value;
+
+ /**
+ * Create a new AtomicInteger with the given initial value.
+ *
+ * @param initialValue the initial value
+ */
+ public AtomicInteger(int initialValue) {
+ value = initialValue;
+ }
+
+ /**
+ * Create a new AtomicInteger with initial value <tt>0</tt>.
+ */
+ public AtomicInteger() {
+ }
+
+ /**
+ * Get the current value.
+ *
+ * @return the current value
+ */
+ public final int get() {
+ return value;
+ }
+
+ /**
+ * Set to the given value.
+ *
+ * @param newValue the new value
+ */
+ public final void set(int newValue) {
+ value = newValue;
+ }
+
+ /**
+ * Set to the give value and return the old value.
+ *
+ * @param newValue the new value
+ * @return the previous value
+ */
+ public final int getAndSet(int newValue) {
+ for (;;) {
+ int current = get();
+ if (compareAndSet(current, newValue))
+ return current;
+ }
+ }
+
+
+ /**
+ * Atomically set the value to the given updated value
+ * if the current value <tt>==</tt> the expected value.
+ * @param expect the expected value
+ * @param update the new value
+ * @return true if successful. False return indicates that
+ * the actual value was not equal to the expected value.
+ */
+ public final boolean compareAndSet(int expect, int update) {
+ return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
+ }
+
+ /**
+ * Atomically set the value to the given updated value
+ * if the current value <tt>==</tt> the expected value.
+ * May fail spuriously.
+ * @param expect the expected value
+ * @param update the new value
+ * @return true if successful.
+ */
+ public final boolean weakCompareAndSet(int expect, int update) {
+ return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
+ }
+
+
+ /**
+ * Atomically increment by one the current value.
+ * @return the previous value
+ */
+ public final int getAndIncrement() {
+ for (;;) {
+ int current = get();
+ int next = current + 1;
+ if (compareAndSet(current, next))
+ return current;
+ }
+ }
+
+
+ /**
+ * Atomically decrement by one the current value.
+ * @return the previous value
+ */
+ public final int getAndDecrement() {
+ for (;;) {
+ int current = get();
+ int next = current - 1;
+ if (compareAndSet(current, next))
+ return current;
+ }
+ }
+
+
+ /**
+ * Atomically add the given value to current value.
+ * @param delta the value to add
+ * @return the previous value
+ */
+ public final int getAndAdd(int delta) {
+ for (;;) {
+ int current = get();
+ int next = current + delta;
+ if (compareAndSet(current, next))
+ return current;
+ }
+ }
+
+ /**
+ * Atomically increment by one the current value.
+ * @return the updated value
+ */
+ public final int incrementAndGet() {
+ for (;;) {
+ int current = get();
+ int next = current + 1;
+ if (compareAndSet(current, next))
+ return next;
+ }
+ }
+
+ /**
+ * Atomically decrement by one the current value.
+ * @return the updated value
+ */
+ public final int decrementAndGet() {
+ for (;;) {
+ int current = get();
+ int next = current - 1;
+ if (compareAndSet(current, next))
+ return next;
+ }
+ }
+
+
+ /**
+ * Atomically add the given value to current value.
+ * @param delta the value to add
+ * @return the updated value
+ */
+ public final int addAndGet(int delta) {
+ for (;;) {
+ int current = get();
+ int next = current + delta;
+ if (compareAndSet(current, next))
+ return next;
+ }
+ }
+
+ /**
+ * Returns the String representation of the current value.
+ * @return the String representation of the current value.
+ */
+ public String toString() {
+ return Integer.toString(get());
+ }
+
+
+ public int intValue() {
+ return get();
+ }
+
+ public long longValue() {
+ return (long)get();
+ }
+
+ public float floatValue() {
+ return (float)get();
+ }
+
+ public double doubleValue() {
+ return (double)get();
+ }
+
+}
Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/atomic/AtomicInteger.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/atomic/AtomicIntegerArray.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/atomic/AtomicIntegerArray.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/atomic/AtomicIntegerArray.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/atomic/AtomicIntegerArray.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,243 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent.atomic;
+import sun.misc.Unsafe;
+import java.util.*;
+
+/**
+ * An <tt>int</tt> array in which elements may be updated atomically.
+ * See the {@link java.util.concurrent.atomic} package
+ * specification for description of the properties of atomic
+ * variables.
+ * @since 1.5
+ * @author Doug Lea
+ */
+public class AtomicIntegerArray implements java.io.Serializable {
+ private static final long serialVersionUID = 2862133569453604235L;
+
+ // setup to use Unsafe.compareAndSwapInt for updates
+ private static final Unsafe unsafe = Unsafe.getUnsafe();
+ private static final int base = unsafe.arrayBaseOffset(int[].class);
+ private static final int scale = unsafe.arrayIndexScale(int[].class);
+ private final int[] array;
+
+ private long rawIndex(int i) {
+ if (i < 0 || i >= array.length)
+ throw new IndexOutOfBoundsException("index " + i);
+ return base + i * scale;
+ }
+
+ /**
+ * Create a new AtomicIntegerArray of given length.
+ *
+ * @param length the length of the array
+ */
+ public AtomicIntegerArray(int length) {
+ array = new int[length];
+ // must perform at least one volatile write to conform to JMM
+ if (length > 0)
+ unsafe.putIntVolatile(array, rawIndex(0), 0);
+ }
+
+ /**
+ * Create a new AtomicIntegerArray with the same length as, and
+ * all elements copied from, the given array.
+ *
+ * @param array the array to copy elements from
+ * @throws NullPointerException if array is null
+ */
+ public AtomicIntegerArray(int[] array) {
+ if (array == null)
+ throw new NullPointerException();
+ int length = array.length;
+ this.array = new int[length];
+ if (length > 0) {
+ int last = length-1;
+ for (int i = 0; i < last; ++i)
+ this.array[i] = array[i];
+ // Do the last write as volatile
+ unsafe.putIntVolatile(this.array, rawIndex(last), array[last]);
+ }
+ }
+
+ /**
+ * Returns the length of the array.
+ *
+ * @return the length of the array
+ */
+ public final int length() {
+ return array.length;
+ }
+
+ /**
+ * Get the current value at position <tt>i</tt>.
+ *
+ * @param i the index
+ * @return the current value
+ */
+ public final int get(int i) {
+ return unsafe.getIntVolatile(array, rawIndex(i));
+ }
+
+ /**
+ * Set the element at position <tt>i</tt> to the given value.
+ *
+ * @param i the index
+ * @param newValue the new value
+ */
+ public final void set(int i, int newValue) {
+ unsafe.putIntVolatile(array, rawIndex(i), newValue);
+ }
+
+ /**
+ * Set the element at position <tt>i</tt> to the given value and return the
+ * old value.
+ *
+ * @param i the index
+ * @param newValue the new value
+ * @return the previous value
+ */
+ public final int getAndSet(int i, int newValue) {
+ while (true) {
+ int current = get(i);
+ if (compareAndSet(i, current, newValue))
+ return current;
+ }
+ }
+
+ /**
+ * Atomically set the value to the given updated value
+ * if the current value <tt>==</tt> the expected value.
+ *
+ * @param i the index
+ * @param expect the expected value
+ * @param update the new value
+ * @return true if successful. False return indicates that
+ * the actual value was not equal to the expected value.
+ */
+ public final boolean compareAndSet(int i, int expect, int update) {
+ return unsafe.compareAndSwapInt(array, rawIndex(i),
+ expect, update);
+ }
+
+ /**
+ * Atomically set the value to the given updated value
+ * if the current value <tt>==</tt> the expected value.
+ * May fail spuriously.
+ *
+ * @param i the index
+ * @param expect the expected value
+ * @param update the new value
+ * @return true if successful.
+ */
+ public final boolean weakCompareAndSet(int i, int expect, int update) {
+ return compareAndSet(i, expect, update);
+ }
+
+ /**
+ * Atomically increment by one the element at index <tt>i</tt>.
+ *
+ * @param i the index
+ * @return the previous value;
+ */
+ public final int getAndIncrement(int i) {
+ while (true) {
+ int current = get(i);
+ int next = current + 1;
+ if (compareAndSet(i, current, next))
+ return current;
+ }
+ }
+
+ /**
+ * Atomically decrement by one the element at index <tt>i</tt>.
+ *
+ * @param i the index
+ * @return the previous value;
+ */
+ public final int getAndDecrement(int i) {
+ while (true) {
+ int current = get(i);
+ int next = current - 1;
+ if (compareAndSet(i, current, next))
+ return current;
+ }
+ }
+
+ /**
+ * Atomically add the given value to element at index <tt>i</tt>.
+ *
+ * @param i the index
+ * @param delta the value to add
+ * @return the previous value;
+ */
+ public final int getAndAdd(int i, int delta) {
+ while (true) {
+ int current = get(i);
+ int next = current + delta;
+ if (compareAndSet(i, current, next))
+ return current;
+ }
+ }
+
+ /**
+ * Atomically increment by one the element at index <tt>i</tt>.
+ *
+ * @param i the index
+ * @return the updated value;
+ */
+ public final int incrementAndGet(int i) {
+ while (true) {
+ int current = get(i);
+ int next = current + 1;
+ if (compareAndSet(i, current, next))
+ return next;
+ }
+ }
+
+ /**
+ * Atomically decrement by one the element at index <tt>i</tt>.
+ *
+ * @param i the index
+ * @return the updated value;
+ */
+ public final int decrementAndGet(int i) {
+ while (true) {
+ int current = get(i);
+ int next = current - 1;
+ if (compareAndSet(i, current, next))
+ return next;
+ }
+ }
+
+ /**
+ * Atomically add the given value to element at index <tt>i</tt>.
+ *
+ * @param i the index
+ * @param delta the value to add
+ * @return the updated value;
+ */
+ public final int addAndGet(int i, int delta) {
+ while (true) {
+ int current = get(i);
+ int next = current + delta;
+ if (compareAndSet(i, current, next))
+ return next;
+ }
+ }
+
+ /**
+ * Returns the String representation of the current values of array.
+ * @return the String representation of the current values of array.
+ */
+ public String toString() {
+ if (array.length > 0) // force volatile read
+ get(0);
+ return Arrays.toString(array);
+ }
+
+}
Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/atomic/AtomicIntegerArray.java
------------------------------------------------------------------------------
svn:eol-style = native