You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@harmony.apache.org by nd...@apache.org on 2006/08/24 05:42:33 UTC
svn commit: r434296 [3/19] - in /incubator/harmony/enhanced/classlib/trunk:
make/ modules/concurrent/ modules/concurrent/.settings/
modules/concurrent/META-INF/ modules/concurrent/make/
modules/concurrent/src/ modules/concurrent/src/main/ modules/concu...
Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CountDownLatch.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CountDownLatch.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CountDownLatch.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CountDownLatch.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,280 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+import java.util.concurrent.locks.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * A synchronization aid that allows one or more threads to wait until
+ * a set of operations being performed in other threads completes.
+ *
+ * <p>A <tt>CountDownLatch</tt> is initialized with a given
+ * <em>count</em>. The {@link #await await} methods block until the current
+ * {@link #getCount count} reaches zero due to invocations of the
+ * {@link #countDown} method, after which all waiting threads are
+ * released and any subsequent invocations of {@link #await await} return
+ * immediately. This is a one-shot phenomenon -- the count cannot be
+ * reset. If you need a version that resets the count, consider using
+ * a {@link CyclicBarrier}.
+ *
+ * <p>A <tt>CountDownLatch</tt> is a versatile synchronization tool
+ * and can be used for a number of purposes. A
+ * <tt>CountDownLatch</tt> initialized with a count of one serves as a
+ * simple on/off latch, or gate: all threads invoking {@link #await await}
+ * wait at the gate until it is opened by a thread invoking {@link
+ * #countDown}. A <tt>CountDownLatch</tt> initialized to <em>N</em>
+ * can be used to make one thread wait until <em>N</em> threads have
+ * completed some action, or some action has been completed N times.
+ * <p>A useful property of a <tt>CountDownLatch</tt> is that it
+ * doesn't require that threads calling <tt>countDown</tt> wait for
+ * the count to reach zero before proceeding, it simply prevents any
+ * thread from proceeding past an {@link #await await} until all
+ * threads could pass.
+ *
+ * <p><b>Sample usage:</b> Here is a pair of classes in which a group
+ * of worker threads use two countdown latches:
+ * <ul>
+ * <li>The first is a start signal that prevents any worker from proceeding
+ * until the driver is ready for them to proceed;
+ * <li>The second is a completion signal that allows the driver to wait
+ * until all workers have completed.
+ * </ul>
+ *
+ * <pre>
+ * class Driver { // ...
+ * void main() throws InterruptedException {
+ * CountDownLatch startSignal = new CountDownLatch(1);
+ * CountDownLatch doneSignal = new CountDownLatch(N);
+ *
+ * for (int i = 0; i < N; ++i) // create and start threads
+ * new Thread(new Worker(startSignal, doneSignal)).start();
+ *
+ * doSomethingElse(); // don't let run yet
+ * startSignal.countDown(); // let all threads proceed
+ * doSomethingElse();
+ * doneSignal.await(); // wait for all to finish
+ * }
+ * }
+ *
+ * class Worker implements Runnable {
+ * private final CountDownLatch startSignal;
+ * private final CountDownLatch doneSignal;
+ * Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
+ * this.startSignal = startSignal;
+ * this.doneSignal = doneSignal;
+ * }
+ * public void run() {
+ * try {
+ * startSignal.await();
+ * doWork();
+ * doneSignal.countDown();
+ * } catch (InterruptedException ex) {} // return;
+ * }
+ *
+ * void doWork() { ... }
+ * }
+ *
+ * </pre>
+ *
+ * <p>Another typical usage would be to divide a problem into N parts,
+ * describe each part with a Runnable that executes that portion and
+ * counts down on the latch, and queue all the Runnables to an
+ * Executor. When all sub-parts are complete, the coordinating thread
+ * will be able to pass through await. (When threads must repeatedly
+ * count down in this way, instead use a {@link CyclicBarrier}.)
+ *
+ * <pre>
+ * class Driver2 { // ...
+ * void main() throws InterruptedException {
+ * CountDownLatch doneSignal = new CountDownLatch(N);
+ * Executor e = ...
+ *
+ * for (int i = 0; i < N; ++i) // create and start threads
+ * e.execute(new WorkerRunnable(doneSignal, i));
+ *
+ * doneSignal.await(); // wait for all to finish
+ * }
+ * }
+ *
+ * class WorkerRunnable implements Runnable {
+ * private final CountDownLatch doneSignal;
+ * private final int i;
+ * WorkerRunnable(CountDownLatch doneSignal, int i) {
+ * this.doneSignal = doneSignal;
+ * this.i = i;
+ * }
+ * public void run() {
+ * try {
+ * doWork(i);
+ * doneSignal.countDown();
+ * } catch (InterruptedException ex) {} // return;
+ * }
+ *
+ * void doWork() { ... }
+ * }
+ *
+ * </pre>
+ *
+ * @since 1.5
+ * @author Doug Lea
+ */
+public class CountDownLatch {
+ /**
+ * Synchronization control For CountDownLatch.
+ * Uses AQS state to represent count.
+ */
+ private static final class Sync extends AbstractQueuedSynchronizer {
+ Sync(int count) {
+ setState(count);
+ }
+
+ int getCount() {
+ return getState();
+ }
+
+ public int tryAcquireShared(int acquires) {
+ return getState() == 0? 1 : -1;
+ }
+
+ public boolean tryReleaseShared(int releases) {
+ // Decrement count; signal when transition to zero
+ for (;;) {
+ int c = getState();
+ if (c == 0)
+ return false;
+ int nextc = c-1;
+ if (compareAndSetState(c, nextc))
+ return nextc == 0;
+ }
+ }
+ }
+
+ private final Sync sync;
+ /**
+ * Constructs a <tt>CountDownLatch</tt> initialized with the given
+ * count.
+ *
+ * @param count the number of times {@link #countDown} must be invoked
+ * before threads can pass through {@link #await}.
+ *
+ * @throws IllegalArgumentException if <tt>count</tt> is less than zero.
+ */
+ public CountDownLatch(int count) {
+ if (count < 0) throw new IllegalArgumentException("count < 0");
+ this.sync = new Sync(count);
+ }
+
+ /**
+ * Causes the current thread to wait until the latch has counted down to
+ * zero, unless the thread is {@link Thread#interrupt interrupted}.
+ *
+ * <p>If the current {@link #getCount count} is zero then this method
+ * returns immediately.
+ * <p>If the current {@link #getCount count} is greater than zero then
+ * the current thread becomes disabled for thread scheduling
+ * purposes and lies dormant until one of two things happen:
+ * <ul>
+ * <li>The count reaches zero due to invocations of the
+ * {@link #countDown} method; or
+ * <li>Some other thread {@link Thread#interrupt interrupts} the current
+ * thread.
+ * </ul>
+ * <p>If the current thread:
+ * <ul>
+ * <li>has its interrupted status set on entry to this method; or
+ * <li>is {@link Thread#interrupt interrupted} while waiting,
+ * </ul>
+ * then {@link InterruptedException} is thrown and the current thread's
+ * interrupted status is cleared.
+ *
+ * @throws InterruptedException if the current thread is interrupted
+ * while waiting.
+ */
+ public void await() throws InterruptedException {
+ sync.acquireSharedInterruptibly(1);
+ }
+
+ /**
+ * Causes the current thread to wait until the latch has counted down to
+ * zero, unless the thread is {@link Thread#interrupt interrupted},
+ * or the specified waiting time elapses.
+ *
+ * <p>If the current {@link #getCount count} is zero then this method
+ * returns immediately with the value <tt>true</tt>.
+ *
+ * <p>If the current {@link #getCount count} is greater than zero then
+ * the current thread becomes disabled for thread scheduling
+ * purposes and lies dormant until one of three things happen:
+ * <ul>
+ * <li>The count reaches zero due to invocations of the
+ * {@link #countDown} method; or
+ * <li>Some other thread {@link Thread#interrupt interrupts} the current
+ * thread; or
+ * <li>The specified waiting time elapses.
+ * </ul>
+ * <p>If the count reaches zero then the method returns with the
+ * value <tt>true</tt>.
+ * <p>If the current thread:
+ * <ul>
+ * <li>has its interrupted status set on entry to this method; or
+ * <li>is {@link Thread#interrupt interrupted} while waiting,
+ * </ul>
+ * then {@link InterruptedException} is thrown and the current thread's
+ * interrupted status is cleared.
+ *
+ * <p>If the specified waiting time elapses then the value <tt>false</tt>
+ * is returned.
+ * If the time is
+ * less than or equal to zero, the method will not wait at all.
+ *
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the <tt>timeout</tt> argument.
+ * @return <tt>true</tt> if the count reached zero and <tt>false</tt>
+ * if the waiting time elapsed before the count reached zero.
+ *
+ * @throws InterruptedException if the current thread is interrupted
+ * while waiting.
+ */
+ public boolean await(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
+ }
+
+ /**
+ * Decrements the count of the latch, releasing all waiting threads if
+ * the count reaches zero.
+ * <p>If the current {@link #getCount count} is greater than zero then
+ * it is decremented. If the new count is zero then all waiting threads
+ * are re-enabled for thread scheduling purposes.
+ * <p>If the current {@link #getCount count} equals zero then nothing
+ * happens.
+ */
+ public void countDown() {
+ sync.releaseShared(1);
+ }
+
+ /**
+ * Returns the current count.
+ * <p>This method is typically used for debugging and testing purposes.
+ * @return the current count.
+ */
+ public long getCount() {
+ return sync.getCount();
+ }
+
+ /**
+ * Returns a string identifying this latch, as well as its state.
+ * The state, in brackets, includes the String
+ * "Count =" followed by the current count.
+ * @return a string identifying this latch, as well as its
+ * state
+ */
+ public String toString() {
+ return super.toString() + "[Count = " + sync.getCount() + "]";
+ }
+
+}
Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CountDownLatch.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CyclicBarrier.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CyclicBarrier.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CyclicBarrier.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CyclicBarrier.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,430 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+import java.util.concurrent.locks.*;
+
+/**
+ * A synchronization aid that allows a set of threads to all wait for
+ * each other to reach a common barrier point. CyclicBarriers are
+ * useful in programs involving a fixed sized party of threads that
+ * must occasionally wait for each other. The barrier is called
+ * <em>cyclic</em> because it can be re-used after the waiting threads
+ * are released.
+ *
+ * <p>A <tt>CyclicBarrier</tt> supports an optional {@link Runnable} command
+ * that is run once per barrier point, after the last thread in the party
+ * arrives, but before any threads are released.
+ * This <em>barrier action</em> is useful
+ * for updating shared-state before any of the parties continue.
+ *
+ * <p><b>Sample usage:</b> Here is an example of
+ * using a barrier in a parallel decomposition design:
+ * <pre>
+ * class Solver {
+ * final int N;
+ * final float[][] data;
+ * final CyclicBarrier barrier;
+ *
+ * class Worker implements Runnable {
+ * int myRow;
+ * Worker(int row) { myRow = row; }
+ * public void run() {
+ * while (!done()) {
+ * processRow(myRow);
+ *
+ * try {
+ * barrier.await();
+ * } catch (InterruptedException ex) {
+ * return;
+ * } catch (BrokenBarrierException ex) {
+ * return;
+ * }
+ * }
+ * }
+ * }
+ *
+ * public Solver(float[][] matrix) {
+ * data = matrix;
+ * N = matrix.length;
+ * barrier = new CyclicBarrier(N,
+ * new Runnable() {
+ * public void run() {
+ * mergeRows(...);
+ * }
+ * });
+ * for (int i = 0; i < N; ++i)
+ * new Thread(new Worker(i)).start();
+ *
+ * waitUntilDone();
+ * }
+ * }
+ * </pre>
+ * Here, each worker thread processes a row of the matrix then waits at the
+ * barrier until all rows have been processed. When all rows are processed
+ * the supplied {@link Runnable} barrier action is executed and merges the
+ * rows. If the merger
+ * determines that a solution has been found then <tt>done()</tt> will return
+ * <tt>true</tt> and each worker will terminate.
+ *
+ * <p>If the barrier action does not rely on the parties being suspended when
+ * it is executed, then any of the threads in the party could execute that
+ * action when it is released. To facilitate this, each invocation of
+ * {@link #await} returns the arrival index of that thread at the barrier.
+ * You can then choose which thread should execute the barrier action, for
+ * example:
+ * <pre> if (barrier.await() == 0) {
+ * // log the completion of this iteration
+ * }</pre>
+ *
+ * <p>The <tt>CyclicBarrier</tt> uses a fast-fail all-or-none breakage
+ * model for failed synchronization attempts: If a thread leaves a
+ * barrier point prematurely because of interruption, failure, or
+ * timeout, all other threads, even those that have not yet resumed
+ * from a previous {@link #await}. will also leave abnormally via
+ * {@link BrokenBarrierException} (or <tt>InterruptedException</tt> if
+ * they too were interrupted at about the same time).
+ *
+ * @since 1.5
+ * @see CountDownLatch
+ *
+ * @author Doug Lea
+ */
+public class CyclicBarrier {
+ /** The lock for guarding barrier entry */
+ private final ReentrantLock lock = new ReentrantLock();
+ /** Condition to wait on until tripped */
+ private final Condition trip = lock.newCondition();
+ /** The number of parties */
+ private final int parties;
+ /* The command to run when tripped */
+ private final Runnable barrierCommand;
+
+ /**
+ * The generation number. Incremented upon barrier trip.
+ * Retracted upon reset.
+ */
+ private long generation;
+
+ /**
+ * Breakage indicator.
+ */
+ private boolean broken;
+
+ /**
+ * Number of parties still waiting. Counts down from parties to 0
+ * on each cycle.
+ */
+ private int count;
+
+ /**
+ * Updates state on barrier trip and wake up everyone.
+ */
+ private void nextGeneration() {
+ count = parties;
+ ++generation;
+ trip.signalAll();
+ }
+
+ /**
+ * Sets barrier as broken and wake up everyone
+ */
+ private void breakBarrier() {
+ broken = true;
+ trip.signalAll();
+ }
+
+ /**
+ * Main barrier code, covering the various policies.
+ */
+ private int dowait(boolean timed, long nanos)
+ throws InterruptedException, BrokenBarrierException, TimeoutException {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ int index = --count;
+ long g = generation;
+
+ if (broken)
+ throw new BrokenBarrierException();
+
+ if (Thread.interrupted()) {
+ breakBarrier();
+ throw new InterruptedException();
+ }
+
+ if (index == 0) { // tripped
+ nextGeneration();
+ boolean ranAction = false;
+ try {
+ Runnable command = barrierCommand;
+ if (command != null)
+ command.run();
+ ranAction = true;
+ return 0;
+ } finally {
+ if (!ranAction)
+ breakBarrier();
+ }
+ }
+
+ for (;;) {
+ try {
+ if (!timed)
+ trip.await();
+ else if (nanos > 0L)
+ nanos = trip.awaitNanos(nanos);
+ } catch (InterruptedException ie) {
+ breakBarrier();
+ throw ie;
+ }
+
+ if (broken ||
+ g > generation) // true if a reset occurred while waiting
+ throw new BrokenBarrierException();
+
+ if (g < generation)
+ return index;
+
+ if (timed && nanos <= 0L) {
+ breakBarrier();
+ throw new TimeoutException();
+ }
+ }
+
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Creates a new <tt>CyclicBarrier</tt> that will trip when the
+ * given number of parties (threads) are waiting upon it, and which
+ * will execute the given barrier action when the barrier is tripped,
+ * performed by the last thread entering the barrier.
+ *
+ * @param parties the number of threads that must invoke {@link #await}
+ * before the barrier is tripped.
+ * @param barrierAction the command to execute when the barrier is
+ * tripped, or <tt>null</tt> if there is no action.
+ *
+ * @throws IllegalArgumentException if <tt>parties</tt> is less than 1.
+ */
+ public CyclicBarrier(int parties, Runnable barrierAction) {
+ if (parties <= 0) throw new IllegalArgumentException();
+ this.parties = parties;
+ this.count = parties;
+ this.barrierCommand = barrierAction;
+ }
+
+ /**
+ * Creates a new <tt>CyclicBarrier</tt> that will trip when the
+ * given number of parties (threads) are waiting upon it, and
+ * does not perform a predefined action upon each barrier.
+ *
+ * @param parties the number of threads that must invoke {@link #await}
+ * before the barrier is tripped.
+ *
+ * @throws IllegalArgumentException if <tt>parties</tt> is less than 1.
+ */
+ public CyclicBarrier(int parties) {
+ this(parties, null);
+ }
+
+ /**
+ * Returns the number of parties required to trip this barrier.
+ * @return the number of parties required to trip this barrier.
+ **/
+ public int getParties() {
+ return parties;
+ }
+
+ /**
+ * Waits until all {@link #getParties parties} have invoked <tt>await</tt>
+ * on this barrier.
+ *
+ * <p>If the current thread is not the last to arrive then it is
+ * disabled for thread scheduling purposes and lies dormant until
+ * one of following things happens:
+ * <ul>
+ * <li>The last thread arrives; or
+ * <li>Some other thread {@link Thread#interrupt interrupts} the current
+ * thread; or
+ * <li>Some other thread {@link Thread#interrupt interrupts} one of the
+ * other waiting threads; or
+ * <li>Some other thread times out while waiting for barrier; or
+ * <li>Some other thread invokes {@link #reset} on this barrier.
+ * </ul>
+ * <p>If the current thread:
+ * <ul>
+ * <li>has its interrupted status set on entry to this method; or
+ * <li>is {@link Thread#interrupt interrupted} while waiting
+ * </ul>
+ * then {@link InterruptedException} is thrown and the current thread's
+ * interrupted status is cleared.
+ *
+ * <p>If the barrier is {@link #reset} while any thread is waiting, or if
+ * the barrier {@link #isBroken is broken} when <tt>await</tt> is invoked,
+ * or while any thread is waiting,
+ * then {@link BrokenBarrierException} is thrown.
+ *
+ * <p>If any thread is {@link Thread#interrupt interrupted} while waiting,
+ * then all other waiting threads will throw
+ * {@link BrokenBarrierException} and the barrier is placed in the broken
+ * state.
+ *
+ * <p>If the current thread is the last thread to arrive, and a
+ * non-null barrier action was supplied in the constructor, then the
+ * current thread runs the action before allowing the other threads to
+ * continue.
+ * If an exception occurs during the barrier action then that exception
+ * will be propagated in the current thread and the barrier is placed in
+ * the broken state.
+ *
+ * @return the arrival index of the current thread, where index
+ * <tt>{@link #getParties()} - 1</tt> indicates the first to arrive and
+ * zero indicates the last to arrive.
+ *
+ * @throws InterruptedException if the current thread was interrupted
+ * while waiting
+ * @throws BrokenBarrierException if <em>another</em> thread was
+ * interrupted while the current thread was waiting, or the barrier was
+ * reset, or the barrier was broken when <tt>await</tt> was called,
+ * or the barrier action (if present) failed due an exception.
+ */
+ public int await() throws InterruptedException, BrokenBarrierException {
+ try {
+ return dowait(false, 0L);
+ } catch (TimeoutException toe) {
+ throw new Error(toe); // cannot happen;
+ }
+ }
+
+ /**
+ * Waits until all {@link #getParties parties} have invoked <tt>await</tt>
+ * on this barrier.
+ *
+ * <p>If the current thread is not the last to arrive then it is
+ * disabled for thread scheduling purposes and lies dormant until
+ * one of the following things happens:
+ * <ul>
+ * <li>The last thread arrives; or
+ * <li>The specified timeout elapses; or
+ * <li>Some other thread {@link Thread#interrupt interrupts} the current
+ * thread; or
+ * <li>Some other thread {@link Thread#interrupt interrupts} one of the
+ * other waiting threads; or
+ * <li>Some other thread times out while waiting for barrier; or
+ * <li>Some other thread invokes {@link #reset} on this barrier.
+ * </ul>
+ * <p>If the current thread:
+ * <ul>
+ * <li>has its interrupted status set on entry to this method; or
+ * <li>is {@link Thread#interrupt interrupted} while waiting
+ * </ul>
+ * then {@link InterruptedException} is thrown and the current thread's
+ * interrupted status is cleared.
+ *
+ * <p>If the barrier is {@link #reset} while any thread is waiting, or if
+ * the barrier {@link #isBroken is broken} when <tt>await</tt> is invoked,
+ * or while any thread is waiting,
+ * then {@link BrokenBarrierException} is thrown.
+ *
+ * <p>If any thread is {@link Thread#interrupt interrupted} while waiting,
+ * then all other waiting threads will throw
+ * {@link BrokenBarrierException} and the barrier is placed in the broken
+ * state.
+ *
+ * <p>If the current thread is the last thread to arrive, and a
+ * non-null barrier action was supplied in the constructor, then the
+ * current thread runs the action before allowing the other threads to
+ * continue.
+ * If an exception occurs during the barrier action then that exception
+ * will be propagated in the current thread and the barrier is placed in
+ * the broken state.
+ *
+ * @param timeout the time to wait for the barrier
+ * @param unit the time unit of the timeout parameter
+ * @return the arrival index of the current thread, where index
+ * <tt>{@link #getParties()} - 1</tt> indicates the first to arrive and
+ * zero indicates the last to arrive.
+ *
+ * @throws InterruptedException if the current thread was interrupted
+ * while waiting
+ * @throws TimeoutException if the specified timeout elapses.
+ * @throws BrokenBarrierException if <em>another</em> thread was
+ * interrupted while the current thread was waiting, or the barrier was
+ * reset, or the barrier was broken when <tt>await</tt> was called,
+ * or the barrier action (if present) failed due an exception.
+ */
+ public int await(long timeout, TimeUnit unit)
+ throws InterruptedException,
+ BrokenBarrierException,
+ TimeoutException {
+ return dowait(true, unit.toNanos(timeout));
+ }
+
+ /**
+ * Queries if this barrier is in a broken state.
+ * @return <tt>true</tt> if one or more parties broke out of this
+ * barrier due to interruption or timeout since construction or
+ * the last reset, or a barrier action failed due to an exception;
+ * and <tt>false</tt> otherwise.
+ */
+ public boolean isBroken() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ return broken;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Resets the barrier to its initial state. If any parties are
+ * currently waiting at the barrier, they will return with a
+ * {@link BrokenBarrierException}. Note that resets <em>after</em>
+ * a breakage has occurred for other reasons can be complicated to
+ * carry out; threads need to re-synchronize in some other way,
+ * and choose one to perform the reset. It may be preferable to
+ * instead create a new barrier for subsequent use.
+ */
+ public void reset() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ /*
+ * Retract generation number enough to cover threads
+ * currently waiting on current and still resuming from
+ * previous generation, plus similarly accommodating spans
+ * after the reset.
+ */
+ generation -= 4;
+ broken = false;
+ trip.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns the number of parties currently waiting at the barrier.
+ * This method is primarily useful for debugging and assertions.
+ *
+ * @return the number of parties currently blocked in {@link #await}
+ **/
+ public int getNumberWaiting() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ return parties - count;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+}
Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CyclicBarrier.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/DelayQueue.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/DelayQueue.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/DelayQueue.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/DelayQueue.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,366 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+
+package java.util.concurrent;
+import java.util.concurrent.locks.*;
+import java.util.*;
+
+/**
+ * An unbounded {@linkplain BlockingQueue blocking queue} of <tt>Delayed</tt>
+ * elements, in which an element can only be taken when its delay has expired.
+ * The <em>head</em> of the queue is that <tt>Delayed</tt> element whose delay
+ * expired furthest in the past - if no delay has expired there is no head and
+ * <tt>poll</tt> will return <tt>null</tt>.
+ * This queue does not permit <tt>null</tt> elements.
+ * <p>This class implements all of the <em>optional</em> methods
+ * of the {@link Collection} and {@link Iterator} interfaces.
+ *
+ * <p>This class is a member of the
+ * <a href="{@docRoot}/../guide/collections/index.html">
+ * Java Collections Framework</a>.
+ *
+ * @since 1.5
+ * @author Doug Lea
+ * @param <E> the type of elements held in this collection
+ */
+
+public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
+ implements BlockingQueue<E> {
+
+ private transient final ReentrantLock lock = new ReentrantLock();
+ private transient final Condition available = lock.newCondition();
+ private final PriorityQueue<E> q = new PriorityQueue<E>();
+
+ /**
+ * Creates a new <tt>DelayQueue</tt> that is initially empty.
+ */
+ public DelayQueue() {}
+
+ /**
+ * Creates a <tt>DelayQueue</tt> initially containing the elements of the
+ * given collection of {@link Delayed} instances.
+ *
+ * @param c the collection
+ * @throws NullPointerException if <tt>c</tt> or any element within it
+ * is <tt>null</tt>
+ *
+ */
+ public DelayQueue(Collection<? extends E> c) {
+ this.addAll(c);
+ }
+
+ /**
+ * Inserts the specified element into this delay queue.
+ *
+ * @param o the element to add
+ * @return <tt>true</tt>
+ * @throws NullPointerException if the specified element is <tt>null</tt>.
+ */
+ public boolean offer(E o) {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ E first = q.peek();
+ q.offer(o);
+ if (first == null || o.compareTo(first) < 0)
+ available.signalAll();
+ return true;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+
+ /**
+ * Adds the specified element to this delay queue. As the queue is
+ * unbounded this method will never block.
+ * @param o the element to add
+ * @throws NullPointerException if the specified element is <tt>null</tt>.
+ */
+ public void put(E o) {
+ offer(o);
+ }
+
+ /**
+ * Inserts the specified element into this delay queue. As the queue is
+ * unbounded this method will never block.
+ * @param o the element to add
+ * @param timeout This parameter is ignored as the method never blocks
+ * @param unit This parameter is ignored as the method never blocks
+ * @return <tt>true</tt>
+ * @throws NullPointerException if the specified element is <tt>null</tt>.
+ */
+ public boolean offer(E o, long timeout, TimeUnit unit) {
+ return offer(o);
+ }
+
+ /**
+ * Adds the specified element to this queue.
+ * @param o the element to add
+ * @return <tt>true</tt> (as per the general contract of
+ * <tt>Collection.add</tt>).
+ *
+ * @throws NullPointerException if the specified element is <tt>null</tt>.
+ */
+ public boolean add(E o) {
+ return offer(o);
+ }
+
+ public E take() throws InterruptedException {
+ final ReentrantLock lock = this.lock;
+ lock.lockInterruptibly();
+ try {
+ for (;;) {
+ E first = q.peek();
+ if (first == null) {
+ available.await();
+ } else {
+ long delay = first.getDelay(TimeUnit.NANOSECONDS);
+ if (delay > 0) {
+ long tl = available.awaitNanos(delay);
+ } else {
+ E x = q.poll();
+ assert x != null;
+ if (q.size() != 0)
+ available.signalAll(); // wake up other takers
+ return x;
+
+ }
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public E poll(long time, TimeUnit unit) throws InterruptedException {
+ final ReentrantLock lock = this.lock;
+ lock.lockInterruptibly();
+ long nanos = unit.toNanos(time);
+ try {
+ for (;;) {
+ E first = q.peek();
+ if (first == null) {
+ if (nanos <= 0)
+ return null;
+ else
+ nanos = available.awaitNanos(nanos);
+ } else {
+ long delay = first.getDelay(TimeUnit.NANOSECONDS);
+ if (delay > 0) {
+ if (delay > nanos)
+ delay = nanos;
+ long timeLeft = available.awaitNanos(delay);
+ nanos -= delay - timeLeft;
+ } else {
+ E x = q.poll();
+ assert x != null;
+ if (q.size() != 0)
+ available.signalAll();
+ return x;
+ }
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+
+ public E poll() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ E first = q.peek();
+ if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
+ return null;
+ else {
+ E x = q.poll();
+ assert x != null;
+ if (q.size() != 0)
+ available.signalAll();
+ return x;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public E peek() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ return q.peek();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public int size() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ return q.size();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public int drainTo(Collection<? super E> c) {
+ if (c == null)
+ throw new NullPointerException();
+ if (c == this)
+ throw new IllegalArgumentException();
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ int n = 0;
+ for (;;) {
+ E first = q.peek();
+ if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
+ break;
+ c.add(q.poll());
+ ++n;
+ }
+ if (n > 0)
+ available.signalAll();
+ return n;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public int drainTo(Collection<? super E> c, int maxElements) {
+ if (c == null)
+ throw new NullPointerException();
+ if (c == this)
+ throw new IllegalArgumentException();
+ if (maxElements <= 0)
+ return 0;
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ int n = 0;
+ while (n < maxElements) {
+ E first = q.peek();
+ if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
+ break;
+ c.add(q.poll());
+ ++n;
+ }
+ if (n > 0)
+ available.signalAll();
+ return n;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Atomically removes all of the elements from this delay queue.
+ * The queue will be empty after this call returns.
+ */
+ public void clear() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ q.clear();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Always returns <tt>Integer.MAX_VALUE</tt> because
+ * a <tt>DelayQueue</tt> is not capacity constrained.
+ * @return <tt>Integer.MAX_VALUE</tt>
+ */
+ public int remainingCapacity() {
+ return Integer.MAX_VALUE;
+ }
+
+ public Object[] toArray() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ return q.toArray();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public <T> T[] toArray(T[] array) {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ return q.toArray(array);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean remove(Object o) {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ return q.remove(o);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns an iterator over the elements in this queue. The iterator
+ * does not return the elements in any particular order. The
+ * returned iterator is a thread-safe "fast-fail" iterator that will
+ * throw {@link java.util.ConcurrentModificationException}
+ * upon detected interference.
+ *
+ * @return an iterator over the elements in this queue.
+ */
+ public Iterator<E> iterator() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ return new Itr(q.iterator());
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private class Itr<E> implements Iterator<E> {
+ private final Iterator<E> iter;
+ Itr(Iterator<E> i) {
+ iter = i;
+ }
+
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ public E next() {
+ final ReentrantLock lock = DelayQueue.this.lock;
+ lock.lock();
+ try {
+ return iter.next();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void remove() {
+ final ReentrantLock lock = DelayQueue.this.lock;
+ lock.lock();
+ try {
+ iter.remove();
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+}
Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/DelayQueue.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Delayed.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Delayed.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Delayed.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Delayed.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,32 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+
+import java.util.*;
+
+/**
+ * A mix-in style interface for marking objects that should be
+ * acted upon after a given delay.
+ *
+ * <p>An implementation of this interface must define a
+ * <tt>compareTo</tt> method that provides an ordering consistent with
+ * its <tt>getDelay</tt> method.
+ *
+ * @since 1.5
+ * @author Doug Lea
+ */
+public interface Delayed extends Comparable {
+
+ /**
+ * Returns the delay associated with this object, in the given time unit.
+ *
+ * @param unit the time unit
+ * @return the delay; zero or negative values indicate that the
+ * delay has already elapsed
+ */
+ long getDelay(TimeUnit unit);
+}
Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Delayed.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Exchanger.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Exchanger.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Exchanger.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Exchanger.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,247 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+import java.util.concurrent.locks.*;
+
+/**
+ * A synchronization point at which two threads can exchange objects.
+ * Each thread presents some object on entry to the {@link #exchange
+ * exchange} method, and receives the object presented by the other
+ * thread on return.
+ *
+ * <p><b>Sample Usage:</b>
+ * Here are the highlights of a class that uses an <tt>Exchanger</tt> to
+ * swap buffers between threads so that the thread filling the
+ * buffer gets a freshly
+ * emptied one when it needs it, handing off the filled one to
+ * the thread emptying the buffer.
+ * <pre>
+ * class FillAndEmpty {
+ * Exchanger<DataBuffer> exchanger = new Exchanger();
+ * DataBuffer initialEmptyBuffer = ... a made-up type
+ * DataBuffer initialFullBuffer = ...
+ *
+ * class FillingLoop implements Runnable {
+ * public void run() {
+ * DataBuffer currentBuffer = initialEmptyBuffer;
+ * try {
+ * while (currentBuffer != null) {
+ * addToBuffer(currentBuffer);
+ * if (currentBuffer.full())
+ * currentBuffer = exchanger.exchange(currentBuffer);
+ * }
+ * } catch (InterruptedException ex) { ... handle ... }
+ * }
+ * }
+ *
+ * class EmptyingLoop implements Runnable {
+ * public void run() {
+ * DataBuffer currentBuffer = initialFullBuffer;
+ * try {
+ * while (currentBuffer != null) {
+ * takeFromBuffer(currentBuffer);
+ * if (currentBuffer.empty())
+ * currentBuffer = exchanger.exchange(currentBuffer);
+ * }
+ * } catch (InterruptedException ex) { ... handle ...}
+ * }
+ * }
+ *
+ * void start() {
+ * new Thread(new FillingLoop()).start();
+ * new Thread(new EmptyingLoop()).start();
+ * }
+ * }
+ * </pre>
+ *
+ * @since 1.5
+ * @author Doug Lea
+ * @param <V> The type of objects that may be exchanged
+ */
+public class Exchanger<V> {
+ private final ReentrantLock lock = new ReentrantLock();
+ private final Condition taken = lock.newCondition();
+
+ /** Holder for the item being exchanged */
+ private V item;
+
+ /**
+ * Arrival count transitions from 0 to 1 to 2 then back to 0
+ * during an exchange.
+ */
+ private int arrivalCount;
+
+ /**
+ * Main exchange function, handling the different policy variants.
+ */
+ private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException {
+ lock.lock();
+ try {
+ V other;
+
+ // If arrival count already at two, we must wait for
+ // a previous pair to finish and reset the count;
+ while (arrivalCount == 2) {
+ if (!timed)
+ taken.await();
+ else if (nanos > 0)
+ nanos = taken.awaitNanos(nanos);
+ else
+ throw new TimeoutException();
+ }
+
+ int count = ++arrivalCount;
+
+ // If item is already waiting, replace it and signal other thread
+ if (count == 2) {
+ other = item;
+ item = x;
+ taken.signal();
+ return other;
+ }
+
+ // Otherwise, set item and wait for another thread to
+ // replace it and signal us.
+
+ item = x;
+ InterruptedException interrupted = null;
+ try {
+ while (arrivalCount != 2) {
+ if (!timed)
+ taken.await();
+ else if (nanos > 0)
+ nanos = taken.awaitNanos(nanos);
+ else
+ break; // timed out
+ }
+ } catch (InterruptedException ie) {
+ interrupted = ie;
+ }
+
+ // Get and reset item and count after the wait.
+ // (We need to do this even if wait was aborted.)
+ other = item;
+ item = null;
+ count = arrivalCount;
+ arrivalCount = 0;
+ taken.signal();
+
+ // If the other thread replaced item, then we must
+ // continue even if cancelled.
+ if (count == 2) {
+ if (interrupted != null)
+ Thread.currentThread().interrupt();
+ return other;
+ }
+
+ // If no one is waiting for us, we can back out
+ if (interrupted != null)
+ throw interrupted;
+ else // must be timeout
+ throw new TimeoutException();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Create a new Exchanger.
+ **/
+ public Exchanger() {
+ }
+
+ /**
+ * Waits for another thread to arrive at this exchange point (unless
+ * it is {@link Thread#interrupt interrupted}),
+ * and then transfers the given object to it, receiving its object
+ * in return.
+ * <p>If another thread is already waiting at the exchange point then
+ * it is resumed for thread scheduling purposes and receives the object
+ * passed in by the current thread. The current thread returns immediately,
+ * receiving the object passed to the exchange by that other thread.
+ * <p>If no other thread is already waiting at the exchange then the
+ * current thread is disabled for thread scheduling purposes and lies
+ * dormant until one of two things happens:
+ * <ul>
+ * <li>Some other thread enters the exchange; or
+ * <li>Some other thread {@link Thread#interrupt interrupts} the current
+ * thread.
+ * </ul>
+ * <p>If the current thread:
+ * <ul>
+ * <li>has its interrupted status set on entry to this method; or
+ * <li>is {@link Thread#interrupt interrupted} while waiting
+ * for the exchange,
+ * </ul>
+ * then {@link InterruptedException} is thrown and the current thread's
+ * interrupted status is cleared.
+ *
+ * @param x the object to exchange
+ * @return the object provided by the other thread.
+ * @throws InterruptedException if current thread was interrupted
+ * while waiting
+ **/
+ public V exchange(V x) throws InterruptedException {
+ try {
+ return doExchange(x, false, 0);
+ } catch (TimeoutException cannotHappen) {
+ throw new Error(cannotHappen);
+ }
+ }
+
+ /**
+ * Waits for another thread to arrive at this exchange point (unless
+ * it is {@link Thread#interrupt interrupted}, or the specified waiting
+ * time elapses),
+ * and then transfers the given object to it, receiving its object
+ * in return.
+ *
+ * <p>If another thread is already waiting at the exchange point then
+ * it is resumed for thread scheduling purposes and receives the object
+ * passed in by the current thread. The current thread returns immediately,
+ * receiving the object passed to the exchange by that other thread.
+ *
+ * <p>If no other thread is already waiting at the exchange then the
+ * current thread is disabled for thread scheduling purposes and lies
+ * dormant until one of three things happens:
+ * <ul>
+ * <li>Some other thread enters the exchange; or
+ * <li>Some other thread {@link Thread#interrupt interrupts} the current
+ * thread; or
+ * <li>The specified waiting time elapses.
+ * </ul>
+ * <p>If the current thread:
+ * <ul>
+ * <li>has its interrupted status set on entry to this method; or
+ * <li>is {@link Thread#interrupt interrupted} while waiting
+ * for the exchange,
+ * </ul>
+ * then {@link InterruptedException} is thrown and the current thread's
+ * interrupted status is cleared.
+ *
+ * <p>If the specified waiting time elapses then {@link TimeoutException}
+ * is thrown.
+ * If the time is
+ * less than or equal to zero, the method will not wait at all.
+ *
+ * @param x the object to exchange
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the <tt>timeout</tt> argument.
+ * @return the object provided by the other thread.
+ * @throws InterruptedException if current thread was interrupted
+ * while waiting
+ * @throws TimeoutException if the specified waiting time elapses before
+ * another thread enters the exchange.
+ **/
+ public V exchange(V x, long timeout, TimeUnit unit)
+ throws InterruptedException, TimeoutException {
+ return doExchange(x, true, unit.toNanos(timeout));
+ }
+
+}
+
+
Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Exchanger.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutionException.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutionException.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutionException.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutionException.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,65 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+
+/**
+ * Exception thrown when attempting to retrieve the result of a task
+ * that aborted by throwing an exception. This exception can be
+ * inspected using the {@link #getCause()} method.
+ *
+ * @see Future
+ * @since 1.5
+ * @author Doug Lea
+ */
+public class ExecutionException extends Exception {
+ private static final long serialVersionUID = 7830266012832686185L;
+
+ /**
+ * Constructs a <tt>ExecutionException</tt> with no detail message.
+ * The cause is not initialized, and may subsequently be
+ * initialized by a call to {@link #initCause(Throwable) initCause}.
+ */
+ protected ExecutionException() { }
+
+ /**
+ * Constructs a <tt>ExecutionException</tt> with the specified detail
+ * message. The cause is not initialized, and may subsequently be
+ * initialized by a call to {@link #initCause(Throwable) initCause}.
+ *
+ * @param message the detail message
+ */
+ protected ExecutionException(String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs a <tt>ExecutionException</tt> with the specified detail
+ * message and cause.
+ *
+ * @param message the detail message
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method)
+ */
+ public ExecutionException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a <tt>ExecutionException</tt> with the specified cause.
+ * The detail message is set to:
+ * <pre>
+ * (cause == null ? null : cause.toString())</pre>
+ * (which typically contains the class and detail message of
+ * <tt>cause</tt>).
+ *
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method)
+ */
+ public ExecutionException(Throwable cause) {
+ super(cause);
+ }
+}
Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutionException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Executor.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Executor.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Executor.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Executor.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,107 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+
+/**
+ * An object that executes submitted {@link Runnable} tasks. This
+ * interface provides a way of decoupling task submission from the
+ * mechanics of how each task will be run, including details of thread
+ * use, scheduling, etc. An <tt>Executor</tt> is normally used
+ * instead of explicitly creating threads. For example, rather than
+ * invoking <tt>new Thread(new(RunnableTask())).start()</tt> for each
+ * of a set of tasks, you might use:
+ *
+ * <pre>
+ * Executor executor = <em>anExecutor</em>;
+ * executor.execute(new RunnableTask1());
+ * executor.execute(new RunnableTask2());
+ * ...
+ * </pre>
+ *
+ * However, the <tt>Executor</tt> interface does not strictly
+ * require that execution be asynchronous. In the simplest case, an
+ * executor can run the submitted task immediately in the caller's
+ * thread:
+ *
+ * <pre>
+ * class DirectExecutor implements Executor {
+ * public void execute(Runnable r) {
+ * r.run();
+ * }
+ * }</pre>
+ *
+ * More typically, tasks are executed in some thread other
+ * than the caller's thread. The executor below spawns a new thread
+ * for each task.
+ *
+ * <pre>
+ * class ThreadPerTaskExecutor implements Executor {
+ * public void execute(Runnable r) {
+ * new Thread(r).start();
+ * }
+ * }</pre>
+ *
+ * Many <tt>Executor</tt> implementations impose some sort of
+ * limitation on how and when tasks are scheduled. The executor below
+ * serializes the submission of tasks to a second executor,
+ * illustrating a composite executor.
+ *
+ * <pre>
+ * class SerialExecutor implements Executor {
+ * final Queue<Runnable> tasks = new LinkedBlockingQueue<Runnable>();
+ * final Executor executor;
+ * Runnable active;
+ *
+ * SerialExecutor(Executor executor) {
+ * this.executor = executor;
+ * }
+ *
+ * public synchronized void execute(final Runnable r) {
+ * tasks.offer(new Runnable() {
+ * public void run() {
+ * try {
+ * r.run();
+ * } finally {
+ * scheduleNext();
+ * }
+ * }
+ * });
+ * if (active == null) {
+ * scheduleNext();
+ * }
+ * }
+ *
+ * protected synchronized void scheduleNext() {
+ * if ((active = tasks.poll()) != null) {
+ * executor.execute(active);
+ * }
+ * }
+ * }</pre>
+ *
+ * The <tt>Executor</tt> implementations provided in this package
+ * implement {@link ExecutorService}, which is a more extensive
+ * interface. The {@link ThreadPoolExecutor} class provides an
+ * extensible thread pool implementation. The {@link Executors} class
+ * provides convenient factory methods for these Executors.
+ *
+ * @since 1.5
+ * @author Doug Lea
+ */
+public interface Executor {
+
+ /**
+ * Executes the given command at some time in the future. The command
+ * may execute in a new thread, in a pooled thread, or in the calling
+ * thread, at the discretion of the <tt>Executor</tt> implementation.
+ *
+ * @param command the runnable task
+ * @throws RejectedExecutionException if this task cannot be
+ * accepted for execution.
+ * @throws NullPointerException if command is null
+ */
+ void execute(Runnable command);
+}
Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Executor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorCompletionService.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorCompletionService.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorCompletionService.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorCompletionService.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,148 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+
+
+/**
+ * A {@link CompletionService} that uses a supplied {@link Executor}
+ * to execute tasks. This class arranges that submitted tasks are,
+ * upon completion, placed on a queue accessible using <tt>take</tt>.
+ * The class is lightweight enough to be suitable for transient use
+ * when processing groups of tasks.
+ *
+ * <p>
+ *
+ * <b>Usage Examples.</b>
+ *
+ * Suppose you have a set of solvers for a certain problem, each
+ * returning a value of some type <tt>Result</tt>, and would like to
+ * run them concurrently, processing the results of each of them that
+ * return a non-null value, in some method <tt>use(Result r)</tt>. You
+ * could write this as:
+ *
+ * <pre>
+ * void solve(Executor e, Collection<Callable<Result>> solvers)
+ * throws InterruptedException, ExecutionException {
+ * CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
+ * for (Callable<Result> s : solvers)
+ * ecs.submit(s);
+ * int n = solvers.size();
+ * for (int i = 0; i < n; ++i) {
+ * Result r = ecs.take().get();
+ * if (r != null)
+ * use(r);
+ * }
+ * }
+ * </pre>
+ *
+ * Suppose instead that you would like to use the first non-null result
+ * of the set of tasks, ignoring any that encounter exceptions,
+ * and cancelling all other tasks when the first one is ready:
+ *
+ * <pre>
+ * void solve(Executor e, Collection<Callable<Result>> solvers)
+ * throws InterruptedException {
+ * CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
+ * int n = solvers.size();
+ * List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
+ * Result result = null;
+ * try {
+ * for (Callable<Result> s : solvers)
+ * futures.add(ecs.submit(s));
+ * for (int i = 0; i < n; ++i) {
+ * try {
+ * Result r = ecs.take().get();
+ * if (r != null) {
+ * result = r;
+ * break;
+ * }
+ * } catch(ExecutionException ignore) {}
+ * }
+ * }
+ * finally {
+ * for (Future<Result> f : futures)
+ * f.cancel(true);
+ * }
+ *
+ * if (result != null)
+ * use(result);
+ * }
+ * </pre>
+ */
+public class ExecutorCompletionService<V> implements CompletionService<V> {
+ private final Executor executor;
+ private final BlockingQueue<Future<V>> completionQueue;
+
+ /**
+ * FutureTask extension to enqueue upon completion
+ */
+ private class QueueingFuture extends FutureTask<V> {
+ QueueingFuture(Callable<V> c) { super(c); }
+ QueueingFuture(Runnable t, V r) { super(t, r); }
+ protected void done() { completionQueue.add(this); }
+ }
+
+ /**
+ * Creates an ExecutorCompletionService using the supplied
+ * executor for base task execution and a
+ * {@link LinkedBlockingQueue} as a completion queue.
+ * @param executor the executor to use
+ * @throws NullPointerException if executor is <tt>null</tt>
+ */
+ public ExecutorCompletionService(Executor executor) {
+ if (executor == null)
+ throw new NullPointerException();
+ this.executor = executor;
+ this.completionQueue = new LinkedBlockingQueue<Future<V>>();
+ }
+
+ /**
+ * Creates an ExecutorCompletionService using the supplied
+ * executor for base task execution and the supplied queue as its
+ * completion queue.
+ * @param executor the executor to use
+ * @param completionQueue the queue to use as the completion queue
+ * normally one dedicated for use by this service
+ * @throws NullPointerException if executor or completionQueue are <tt>null</tt>
+ */
+ public ExecutorCompletionService(Executor executor,
+ BlockingQueue<Future<V>> completionQueue) {
+ if (executor == null || completionQueue == null)
+ throw new NullPointerException();
+ this.executor = executor;
+ this.completionQueue = completionQueue;
+ }
+
+ public Future<V> submit(Callable<V> task) {
+ if (task == null) throw new NullPointerException();
+ QueueingFuture f = new QueueingFuture(task);
+ executor.execute(f);
+ return f;
+ }
+
+ public Future<V> submit(Runnable task, V result) {
+ if (task == null) throw new NullPointerException();
+ QueueingFuture f = new QueueingFuture(task, result);
+ executor.execute(f);
+ return f;
+ }
+
+ public Future<V> take() throws InterruptedException {
+ return completionQueue.take();
+ }
+
+ public Future<V> poll() {
+ return completionQueue.poll();
+ }
+
+ public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
+ return completionQueue.poll(timeout, unit);
+ }
+
+}
+
+
Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorCompletionService.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorService.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorService.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorService.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorService.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,286 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+
+import java.util.List;
+import java.util.Collection;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * An {@link Executor} that provides methods to manage termination and
+ * methods that can produce a {@link Future} for tracking progress of
+ * one or more asynchronous tasks.
+ *
+ * <p>
+ * An <tt>ExecutorService</tt> can be shut down, which will cause it
+ * to stop accepting new tasks. After being shut down, the executor
+ * will eventually terminate, at which point no tasks are actively
+ * executing, no tasks are awaiting execution, and no new tasks can be
+ * submitted.
+ *
+ * <p> Method <tt>submit</tt> extends base method {@link
+ * Executor#execute} by creating and returning a {@link Future} that
+ * can be used to cancel execution and/or wait for completion.
+ * Methods <tt>invokeAny</tt> and <tt>invokeAll</tt> perform the most
+ * commonly useful forms of bulk execution, executing a collection of
+ * tasks and then waiting for at least one, or all, to
+ * complete. (Class {@link ExecutorCompletionService} can be used to
+ * write customized variants of these methods.)
+ *
+ * <p>The {@link Executors} class provides factory methods for the
+ * executor services provided in this package.
+ *
+ * <h3>Usage Example</h3>
+ *
+ * Here is a sketch of a network service in which threads in a thread
+ * pool service incoming requests. It uses the preconfigured {@link
+ * Executors#newFixedThreadPool} factory method:
+ *
+ * <pre>
+ * class NetworkService {
+ * private final ServerSocket serverSocket;
+ * private final ExecutorService pool;
+ *
+ * public NetworkService(int port, int poolSize) throws IOException {
+ * serverSocket = new ServerSocket(port);
+ * pool = Executors.newFixedThreadPool(poolSize);
+ * }
+ *
+ * public void serve() {
+ * try {
+ * for (;;) {
+ * pool.execute(new Handler(serverSocket.accept()));
+ * }
+ * } catch (IOException ex) {
+ * pool.shutdown();
+ * }
+ * }
+ * }
+ *
+ * class Handler implements Runnable {
+ * private final Socket socket;
+ * Handler(Socket socket) { this.socket = socket; }
+ * public void run() {
+ * // read and service request
+ * }
+ * }
+ * </pre>
+ * @since 1.5
+ * @author Doug Lea
+ */
+public interface ExecutorService extends Executor {
+
+ /**
+ * Initiates an orderly shutdown in which previously submitted
+ * tasks are executed, but no new tasks will be
+ * accepted. Invocation has no additional effect if already shut
+ * down.
+ * @throws SecurityException if a security manager exists and
+ * shutting down this ExecutorService may manipulate threads that
+ * the caller is not permitted to modify because it does not hold
+ * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
+ * or the security manager's <tt>checkAccess</tt> method denies access.
+ */
+ void shutdown();
+
+ /**
+ * Attempts to stop all actively executing tasks, halts the
+ * processing of waiting tasks, and returns a list of the tasks that were
+ * awaiting execution.
+ *
+ * <p>There are no guarantees beyond best-effort attempts to stop
+ * processing actively executing tasks. For example, typical
+ * implementations will cancel via {@link Thread#interrupt}, so if any
+ * tasks mask or fail to respond to interrupts, they may never terminate.
+ *
+ * @return list of tasks that never commenced execution
+ * @throws SecurityException if a security manager exists and
+ * shutting down this ExecutorService may manipulate threads that
+ * the caller is not permitted to modify because it does not hold
+ * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
+ * or the security manager's <tt>checkAccess</tt> method denies access.
+ */
+ List<Runnable> shutdownNow();
+
+ /**
+ * Returns <tt>true</tt> if this executor has been shut down.
+ *
+ * @return <tt>true</tt> if this executor has been shut down
+ */
+ boolean isShutdown();
+
+ /**
+ * Returns <tt>true</tt> if all tasks have completed following shut down.
+ * Note that <tt>isTerminated</tt> is never <tt>true</tt> unless
+ * either <tt>shutdown</tt> or <tt>shutdownNow</tt> was called first.
+ *
+ * @return <tt>true</tt> if all tasks have completed following shut down
+ */
+ boolean isTerminated();
+
+ /**
+ * Blocks until all tasks have completed execution after a shutdown
+ * request, or the timeout occurs, or the current thread is
+ * interrupted, whichever happens first.
+ *
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the timeout argument
+ * @return <tt>true</tt> if this executor terminated and <tt>false</tt>
+ * if the timeout elapsed before termination
+ * @throws InterruptedException if interrupted while waiting
+ */
+ boolean awaitTermination(long timeout, TimeUnit unit)
+ throws InterruptedException;
+
+
+ /**
+ * Submits a value-returning task for execution and returns a Future
+ * representing the pending results of the task.
+ *
+ * <p>
+ * If you would like to immediately block waiting
+ * for a task, you can use constructions of the form
+ * <tt>result = exec.submit(aCallable).get();</tt>
+ *
+ * <p> Note: The {@link Executors} class includes a set of methods
+ * that can convert some other common closure-like objects,
+ * for example, {@link java.security.PrivilegedAction} to
+ * {@link Callable} form so they can be submitted.
+ *
+ * @param task the task to submit
+ * @return a Future representing pending completion of the task
+ * @throws RejectedExecutionException if task cannot be scheduled
+ * for execution
+ * @throws NullPointerException if task null
+ */
+ <T> Future<T> submit(Callable<T> task);
+
+ /**
+ * Submits a Runnable task for execution and returns a Future
+ * representing that task that will upon completion return
+ * the given result
+ *
+ * @param task the task to submit
+ * @param result the result to return
+ * @return a Future representing pending completion of the task,
+ * and whose <tt>get()</tt> method will return the given result
+ * upon completion.
+ * @throws RejectedExecutionException if task cannot be scheduled
+ * for execution
+ * @throws NullPointerException if task null
+ */
+ <T> Future<T> submit(Runnable task, T result);
+
+ /**
+ * Submits a Runnable task for execution and returns a Future
+ * representing that task.
+ *
+ * @param task the task to submit
+ * @return a Future representing pending completion of the task,
+ * and whose <tt>get()</tt> method will return <tt>null</tt>
+ * upon completion.
+ * @throws RejectedExecutionException if task cannot be scheduled
+ * for execution
+ * @throws NullPointerException if task null
+ */
+ Future<?> submit(Runnable task);
+
+ /**
+ * Executes the given tasks, returning their results
+ * when all complete.
+ * Note that a <em>completed</em> task could have
+ * terminated either normally or by throwing an exception.
+ * The results of this method are undefined if the given
+ * collection is modified while this operation is in progress.
+ * @param tasks the collection of tasks
+ * @return A list of Futures representing the tasks, in the same
+ * sequential order as produced by the iterator for the given task
+ * list, each of which has completed.
+ * @throws InterruptedException if interrupted while waiting, in
+ * which case unfinished tasks are cancelled.
+ * @throws NullPointerException if tasks or any of its elements are <tt>null</tt>
+ * @throws RejectedExecutionException if any task cannot be scheduled
+ * for execution
+ */
+
+ <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks)
+ throws InterruptedException;
+
+ /**
+ * Executes the given tasks, returning their results
+ * when all complete or the timeout expires, whichever happens first.
+ * Upon return, tasks that have not completed are cancelled.
+ * Note that a <em>completed</em> task could have
+ * terminated either normally or by throwing an exception.
+ * The results of this method are undefined if the given
+ * collection is modified while this operation is in progress.
+ * @param tasks the collection of tasks
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the timeout argument
+ * @return A list of Futures representing the tasks, in the same
+ * sequential order as produced by the iterator for the given
+ * task list. If the operation did not time out, each task will
+ * have completed. If it did time out, some of thiese tasks will
+ * not have completed.
+ * @throws InterruptedException if interrupted while waiting, in
+ * which case unfinished tasks are cancelled.
+ * @throws NullPointerException if tasks, any of its elements, or
+ * unit are <tt>null</tt>
+ * @throws RejectedExecutionException if any task cannot be scheduled
+ * for execution
+ */
+ <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks,
+ long timeout, TimeUnit unit)
+ throws InterruptedException;
+
+ /**
+ * Executes the given tasks, returning the result
+ * of one that has completed successfully (i.e., without throwing
+ * an exception), if any do. Upon normal or exceptional return,
+ * tasks that have not completed are cancelled.
+ * The results of this method are undefined if the given
+ * collection is modified while this operation is in progress.
+ * @param tasks the collection of tasks
+ * @return The result returned by one of the tasks.
+ * @throws InterruptedException if interrupted while waiting
+ * @throws NullPointerException if tasks or any of its elements
+ * are <tt>null</tt>
+ * @throws IllegalArgumentException if tasks empty
+ * @throws ExecutionException if no task successfully completes
+ * @throws RejectedExecutionException if tasks cannot be scheduled
+ * for execution
+ */
+ <T> T invokeAny(Collection<Callable<T>> tasks)
+ throws InterruptedException, ExecutionException;
+
+ /**
+ * Executes the given tasks, returning the result
+ * of one that has completed successfully (i.e., without throwing
+ * an exception), if any do before the given timeout elapses.
+ * Upon normal or exceptional return, tasks that have not
+ * completed are cancelled.
+ * The results of this method are undefined if the given
+ * collection is modified while this operation is in progress.
+ * @param tasks the collection of tasks
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the timeout argument
+ * @return The result returned by one of the tasks.
+ * @throws InterruptedException if interrupted while waiting
+ * @throws NullPointerException if tasks, any of its elements, or
+ * unit are <tt>null</tt>
+ * @throws TimeoutException if the given timeout elapses before
+ * any task successfully completes
+ * @throws ExecutionException if no task successfully completes
+ * @throws RejectedExecutionException if tasks cannot be scheduled
+ * for execution
+ */
+ <T> T invokeAny(Collection<Callable<T>> tasks,
+ long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException;
+
+}
Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorService.java
------------------------------------------------------------------------------
svn:eol-style = native