You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@harmony.apache.org by nd...@apache.org on 2006/08/24 05:42:33 UTC

svn commit: r434296 [3/19] - in /incubator/harmony/enhanced/classlib/trunk: make/ modules/concurrent/ modules/concurrent/.settings/ modules/concurrent/META-INF/ modules/concurrent/make/ modules/concurrent/src/ modules/concurrent/src/main/ modules/concu...

Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CountDownLatch.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CountDownLatch.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CountDownLatch.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CountDownLatch.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,280 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+import java.util.concurrent.locks.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * A synchronization aid that allows one or more threads to wait until
+ * a set of operations being performed in other threads completes.
+ *
+ * <p>A <tt>CountDownLatch</tt> is initialized with a given
+ * <em>count</em>.  The {@link #await await} methods block until the current
+ * {@link #getCount count} reaches zero due to invocations of the
+ * {@link #countDown} method, after which all waiting threads are
+ * released and any subsequent invocations of {@link #await await} return
+ * immediately. This is a one-shot phenomenon -- the count cannot be
+ * reset.  If you need a version that resets the count, consider using
+ * a {@link CyclicBarrier}.
+ *
+ * <p>A <tt>CountDownLatch</tt> is a versatile synchronization tool
+ * and can be used for a number of purposes.  A
+ * <tt>CountDownLatch</tt> initialized with a count of one serves as a
+ * simple on/off latch, or gate: all threads invoking {@link #await await}
+ * wait at the gate until it is opened by a thread invoking {@link
+ * #countDown}.  A <tt>CountDownLatch</tt> initialized to <em>N</em>
+ * can be used to make one thread wait until <em>N</em> threads have
+ * completed some action, or some action has been completed N times.
+ * <p>A useful property of a <tt>CountDownLatch</tt> is that it
+ * doesn't require that threads calling <tt>countDown</tt> wait for
+ * the count to reach zero before proceeding, it simply prevents any
+ * thread from proceeding past an {@link #await await} until all
+ * threads could pass.
+ *
+ * <p><b>Sample usage:</b> Here is a pair of classes in which a group
+ * of worker threads use two countdown latches:
+ * <ul>
+ * <li>The first is a start signal that prevents any worker from proceeding
+ * until the driver is ready for them to proceed;
+ * <li>The second is a completion signal that allows the driver to wait
+ * until all workers have completed.
+ * </ul>
+ *
+ * <pre>
+ * class Driver { // ...
+ *   void main() throws InterruptedException {
+ *     CountDownLatch startSignal = new CountDownLatch(1);
+ *     CountDownLatch doneSignal = new CountDownLatch(N);
+ *
+ *     for (int i = 0; i < N; ++i) // create and start threads
+ *       new Thread(new Worker(startSignal, doneSignal)).start();
+ *
+ *     doSomethingElse();            // don't let run yet
+ *     startSignal.countDown();      // let all threads proceed
+ *     doSomethingElse();
+ *     doneSignal.await();           // wait for all to finish
+ *   }
+ * }
+ *
+ * class Worker implements Runnable {
+ *   private final CountDownLatch startSignal;
+ *   private final CountDownLatch doneSignal;
+ *   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
+ *      this.startSignal = startSignal;
+ *      this.doneSignal = doneSignal;
+ *   }
+ *   public void run() {
+ *      try {
+ *        startSignal.await();
+ *        doWork();
+ *        doneSignal.countDown();
+ *      } catch (InterruptedException ex) {} // return;
+ *   }
+ *
+ *   void doWork() { ... }
+ * }
+ *
+ * </pre>
+ *
+ * <p>Another typical usage would be to divide a problem into N parts,
+ * describe each part with a Runnable that executes that portion and
+ * counts down on the latch, and queue all the Runnables to an
+ * Executor.  When all sub-parts are complete, the coordinating thread
+ * will be able to pass through await. (When threads must repeatedly
+ * count down in this way, instead use a {@link CyclicBarrier}.)
+ *
+ * <pre>
+ * class Driver2 { // ...
+ *   void main() throws InterruptedException {
+ *     CountDownLatch doneSignal = new CountDownLatch(N);
+ *     Executor e = ...
+ *
+ *     for (int i = 0; i < N; ++i) // create and start threads
+ *       e.execute(new WorkerRunnable(doneSignal, i));
+ *
+ *     doneSignal.await();           // wait for all to finish
+ *   }
+ * }
+ *
+ * class WorkerRunnable implements Runnable {
+ *   private final CountDownLatch doneSignal;
+ *   private final int i;
+ *   WorkerRunnable(CountDownLatch doneSignal, int i) {
+ *      this.doneSignal = doneSignal;
+ *      this.i = i;
+ *   }
+ *   public void run() {
+ *      try {
+ *        doWork(i);
+ *        doneSignal.countDown();
+ *      } catch (InterruptedException ex) {} // return;
+ *   }
+ *
+ *   void doWork() { ... }
+ * }
+ *
+ * </pre>
+ *
+ * @since 1.5
+ * @author Doug Lea
+ */
+public class CountDownLatch {
+    /**
+     * Synchronization control For CountDownLatch.
+     * Uses AQS state to represent count.
+     */
+    private static final class Sync extends AbstractQueuedSynchronizer {
+        Sync(int count) {
+            setState(count); 
+        }
+        
+        int getCount() {
+            return getState();
+        }
+
+        public int tryAcquireShared(int acquires) {
+            return getState() == 0? 1 : -1;
+        }
+        
+        public boolean tryReleaseShared(int releases) {
+            // Decrement count; signal when transition to zero
+            for (;;) {
+                int c = getState();
+                if (c == 0)
+                    return false;
+                int nextc = c-1;
+                if (compareAndSetState(c, nextc)) 
+                    return nextc == 0;
+            }
+        }
+    }
+
+    private final Sync sync;
+    /**
+     * Constructs a <tt>CountDownLatch</tt> initialized with the given
+     * count.
+     * 
+     * @param count the number of times {@link #countDown} must be invoked
+     * before threads can pass through {@link #await}.
+     *
+     * @throws IllegalArgumentException if <tt>count</tt> is less than zero.
+     */
+    public CountDownLatch(int count) { 
+        if (count < 0) throw new IllegalArgumentException("count < 0");
+        this.sync = new Sync(count);
+    }
+
+    /**
+     * Causes the current thread to wait until the latch has counted down to 
+     * zero, unless the thread is {@link Thread#interrupt interrupted}.
+     *
+     * <p>If the current {@link #getCount count} is zero then this method
+     * returns immediately.
+     * <p>If the current {@link #getCount count} is greater than zero then
+     * the current thread becomes disabled for thread scheduling 
+     * purposes and lies dormant until one of two things happen:
+     * <ul>
+     * <li>The count reaches zero due to invocations of the
+     * {@link #countDown} method; or
+     * <li>Some other thread {@link Thread#interrupt interrupts} the current
+     * thread.
+     * </ul>
+     * <p>If the current thread:
+     * <ul>
+     * <li>has its interrupted status set on entry to this method; or 
+     * <li>is {@link Thread#interrupt interrupted} while waiting, 
+     * </ul>
+     * then {@link InterruptedException} is thrown and the current thread's 
+     * interrupted status is cleared. 
+     *
+     * @throws InterruptedException if the current thread is interrupted
+     * while waiting.
+     */
+    public void await() throws InterruptedException {
+        sync.acquireSharedInterruptibly(1);
+    }
+
+    /**
+     * Causes the current thread to wait until the latch has counted down to 
+     * zero, unless the thread is {@link Thread#interrupt interrupted},
+     * or the specified waiting time elapses.
+     *
+     * <p>If the current {@link #getCount count} is zero then this method
+     * returns immediately with the value <tt>true</tt>.
+     *
+     * <p>If the current {@link #getCount count} is greater than zero then
+     * the current thread becomes disabled for thread scheduling 
+     * purposes and lies dormant until one of three things happen:
+     * <ul>
+     * <li>The count reaches zero due to invocations of the
+     * {@link #countDown} method; or
+     * <li>Some other thread {@link Thread#interrupt interrupts} the current
+     * thread; or
+     * <li>The specified waiting time elapses.
+     * </ul>
+     * <p>If the count reaches zero then the method returns with the
+     * value <tt>true</tt>.
+     * <p>If the current thread:
+     * <ul>
+     * <li>has its interrupted status set on entry to this method; or 
+     * <li>is {@link Thread#interrupt interrupted} while waiting, 
+     * </ul>
+     * then {@link InterruptedException} is thrown and the current thread's 
+     * interrupted status is cleared. 
+     *
+     * <p>If the specified waiting time elapses then the value <tt>false</tt>
+     * is returned.
+     * If the time is 
+     * less than or equal to zero, the method will not wait at all.
+     *
+     * @param timeout the maximum time to wait
+     * @param unit the time unit of the <tt>timeout</tt> argument.
+     * @return <tt>true</tt> if the count reached zero  and <tt>false</tt>
+     * if the waiting time elapsed before the count reached zero.
+     *
+     * @throws InterruptedException if the current thread is interrupted
+     * while waiting.
+     */
+    public boolean await(long timeout, TimeUnit unit) 
+        throws InterruptedException {
+        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
+    }
+
+    /**
+     * Decrements the count of the latch, releasing all waiting threads if
+     * the count reaches zero.
+     * <p>If the current {@link #getCount count} is greater than zero then
+     * it is decremented. If the new count is zero then all waiting threads
+     * are re-enabled for thread scheduling purposes.
+     * <p>If the current {@link #getCount count} equals zero then nothing
+     * happens.
+     */
+    public void countDown() {
+        sync.releaseShared(1);
+    }
+
+    /**
+     * Returns the current count.
+     * <p>This method is typically used for debugging and testing purposes.
+     * @return the current count.
+     */
+    public long getCount() {
+        return sync.getCount();
+    }
+
+    /**
+     * Returns a string identifying this latch, as well as its state.
+     * The state, in brackets, includes the String 
+     * &quot;Count =&quot; followed by the current count.
+     * @return a string identifying this latch, as well as its
+     * state
+     */
+    public String toString() {
+        return super.toString() + "[Count = " + sync.getCount() + "]";
+    }
+
+}

Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CountDownLatch.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CyclicBarrier.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CyclicBarrier.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CyclicBarrier.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CyclicBarrier.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,430 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+import java.util.concurrent.locks.*;
+
+/**
+ * A synchronization aid that allows a set of threads to all wait for
+ * each other to reach a common barrier point.  CyclicBarriers are
+ * useful in programs involving a fixed sized party of threads that
+ * must occasionally wait for each other. The barrier is called
+ * <em>cyclic</em> because it can be re-used after the waiting threads
+ * are released.
+ *
+ * <p>A <tt>CyclicBarrier</tt> supports an optional {@link Runnable} command
+ * that is run once per barrier point, after the last thread in the party
+ * arrives, but before any threads are released. 
+ * This <em>barrier action</em> is useful
+ * for updating shared-state before any of the parties continue.
+ * 
+ * <p><b>Sample usage:</b> Here is an example of
+ *  using a barrier in a parallel decomposition design:
+ * <pre>
+ * class Solver {
+ *   final int N;
+ *   final float[][] data;
+ *   final CyclicBarrier barrier;
+ *   
+ *   class Worker implements Runnable {
+ *     int myRow;
+ *     Worker(int row) { myRow = row; }
+ *     public void run() {
+ *       while (!done()) {
+ *         processRow(myRow);
+ *
+ *         try {
+ *           barrier.await(); 
+ *         } catch (InterruptedException ex) { 
+ *           return; 
+ *         } catch (BrokenBarrierException ex) { 
+ *           return; 
+ *         }
+ *       }
+ *     }
+ *   }
+ *
+ *   public Solver(float[][] matrix) {
+ *     data = matrix;
+ *     N = matrix.length;
+ *     barrier = new CyclicBarrier(N, 
+ *                                 new Runnable() {
+ *                                   public void run() { 
+ *                                     mergeRows(...); 
+ *                                   }
+ *                                 });
+ *     for (int i = 0; i < N; ++i) 
+ *       new Thread(new Worker(i)).start();
+ *
+ *     waitUntilDone();
+ *   }
+ * }
+ * </pre>
+ * Here, each worker thread processes a row of the matrix then waits at the 
+ * barrier until all rows have been processed. When all rows are processed
+ * the supplied {@link Runnable} barrier action is executed and merges the 
+ * rows. If the merger
+ * determines that a solution has been found then <tt>done()</tt> will return
+ * <tt>true</tt> and each worker will terminate.
+ *
+ * <p>If the barrier action does not rely on the parties being suspended when
+ * it is executed, then any of the threads in the party could execute that
+ * action when it is released. To facilitate this, each invocation of
+ * {@link #await} returns the arrival index of that thread at the barrier.
+ * You can then choose which thread should execute the barrier action, for 
+ * example:
+ * <pre>  if (barrier.await() == 0) {
+ *     // log the completion of this iteration
+ *   }</pre>
+ *
+ * <p>The <tt>CyclicBarrier</tt> uses a fast-fail all-or-none breakage
+ * model for failed synchronization attempts: If a thread leaves a
+ * barrier point prematurely because of interruption, failure, or
+ * timeout, all other threads, even those that have not yet resumed
+ * from a previous {@link #await}. will also leave abnormally via
+ * {@link BrokenBarrierException} (or <tt>InterruptedException</tt> if
+ * they too were interrupted at about the same time).
+ *
+ * @since 1.5
+ * @see CountDownLatch
+ *
+ * @author Doug Lea
+ */
+public class CyclicBarrier {
+    /** The lock for guarding barrier entry */
+    private final ReentrantLock lock = new ReentrantLock();
+    /** Condition to wait on until tripped */
+    private final Condition trip = lock.newCondition();
+    /** The number of parties */
+    private final int parties;
+    /* The command to run when tripped */
+    private final Runnable barrierCommand;
+
+    /**
+     * The generation number. Incremented upon barrier trip.
+     * Retracted upon reset.
+     */
+    private long generation; 
+
+    /** 
+     * Breakage indicator.
+     */
+    private boolean broken; 
+
+    /**
+     * Number of parties still waiting. Counts down from parties to 0
+     * on each cycle.
+     */
+    private int count; 
+
+    /**
+     * Updates state on barrier trip and wake up everyone.
+     */  
+    private void nextGeneration() {
+        count = parties;
+        ++generation;
+        trip.signalAll();
+    }
+
+    /**
+     * Sets barrier as broken and wake up everyone
+     */
+    private void breakBarrier() {
+        broken = true;
+        trip.signalAll();
+    }
+
+    /**
+     * Main barrier code, covering the various policies.
+     */
+    private int dowait(boolean timed, long nanos) 
+        throws InterruptedException, BrokenBarrierException, TimeoutException {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            int index = --count;
+            long g = generation;
+
+            if (broken) 
+                throw new BrokenBarrierException();
+
+            if (Thread.interrupted()) {
+                breakBarrier();
+                throw new InterruptedException();
+            }
+
+            if (index == 0) {  // tripped
+                nextGeneration();
+                boolean ranAction = false;
+                try {
+                    Runnable command = barrierCommand;
+                    if (command != null) 
+                        command.run();
+                    ranAction = true;
+                    return 0;
+                } finally {
+                    if (!ranAction)
+                        breakBarrier();
+                }
+            }
+
+            for (;;) {
+                try {
+                    if (!timed) 
+                        trip.await();
+                    else if (nanos > 0L)
+                        nanos = trip.awaitNanos(nanos);
+                } catch (InterruptedException ie) {
+                    breakBarrier();
+                    throw ie;
+                }
+                
+                if (broken || 
+                    g > generation) // true if a reset occurred while waiting
+                    throw new BrokenBarrierException();
+
+                if (g < generation)
+                    return index;
+
+                if (timed && nanos <= 0L) {
+                    breakBarrier();
+                    throw new TimeoutException();
+                }
+            }
+
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Creates a new <tt>CyclicBarrier</tt> that will trip when the
+     * given number of parties (threads) are waiting upon it, and which
+     * will execute the given barrier action when the barrier is tripped,
+     * performed by the last thread entering the barrier.
+     *
+     * @param parties the number of threads that must invoke {@link #await}
+     * before the barrier is tripped.
+     * @param barrierAction the command to execute when the barrier is
+     * tripped, or <tt>null</tt> if there is no action.
+     *
+     * @throws IllegalArgumentException if <tt>parties</tt> is less than 1.
+     */
+    public CyclicBarrier(int parties, Runnable barrierAction) {
+        if (parties <= 0) throw new IllegalArgumentException();
+        this.parties = parties; 
+        this.count = parties;
+        this.barrierCommand = barrierAction;
+    }
+
+    /**
+     * Creates a new <tt>CyclicBarrier</tt> that will trip when the
+     * given number of parties (threads) are waiting upon it, and
+     * does not perform a predefined action upon each barrier.
+     *
+     * @param parties the number of threads that must invoke {@link #await}
+     * before the barrier is tripped.
+     *
+     * @throws IllegalArgumentException if <tt>parties</tt> is less than 1.
+     */
+    public CyclicBarrier(int parties) {
+        this(parties, null);
+    }
+
+    /**
+     * Returns the number of parties required to trip this barrier.
+     * @return the number of parties required to trip this barrier.
+     **/
+    public int getParties() {
+        return parties;
+    }
+
+    /**
+     * Waits until all {@link #getParties parties} have invoked <tt>await</tt>
+     * on this barrier.
+     *
+     * <p>If the current thread is not the last to arrive then it is
+     * disabled for thread scheduling purposes and lies dormant until
+     * one of following things happens:
+     * <ul>
+     * <li>The last thread arrives; or
+     * <li>Some other thread {@link Thread#interrupt interrupts} the current
+     * thread; or
+     * <li>Some other thread  {@link Thread#interrupt interrupts} one of the
+     * other waiting threads; or
+     * <li>Some other thread times out while waiting for barrier; or
+     * <li>Some other thread invokes {@link #reset} on this barrier.
+     * </ul>
+     * <p>If the current thread:
+     * <ul>
+     * <li>has its interrupted status set on entry to this method; or
+     * <li>is {@link Thread#interrupt interrupted} while waiting
+     * </ul>
+     * then {@link InterruptedException} is thrown and the current thread's
+     * interrupted status is cleared.
+     *
+     * <p>If the barrier is {@link #reset} while any thread is waiting, or if 
+     * the barrier {@link #isBroken is broken} when <tt>await</tt> is invoked,
+     * or while any thread is waiting,
+     * then {@link BrokenBarrierException} is thrown.
+     *
+     * <p>If any thread is {@link Thread#interrupt interrupted} while waiting,
+     * then all other waiting threads will throw 
+     * {@link BrokenBarrierException} and the barrier is placed in the broken
+     * state.
+     *
+     * <p>If the current thread is the last thread to arrive, and a
+     * non-null barrier action was supplied in the constructor, then the
+     * current thread runs the action before allowing the other threads to 
+     * continue.
+     * If an exception occurs during the barrier action then that exception
+     * will be propagated in the current thread and the barrier is placed in
+     * the broken state.
+     *
+     * @return the arrival index of the current thread, where index
+     *  <tt>{@link #getParties()} - 1</tt> indicates the first to arrive and 
+     * zero indicates the last to arrive.
+     *
+     * @throws InterruptedException if the current thread was interrupted 
+     * while waiting
+     * @throws BrokenBarrierException if <em>another</em> thread was
+     * interrupted while the current thread was waiting, or the barrier was
+     * reset, or the barrier was broken when <tt>await</tt> was called,
+     * or the barrier action (if present) failed due an exception.
+     */
+    public int await() throws InterruptedException, BrokenBarrierException {
+        try {
+            return dowait(false, 0L);
+        } catch (TimeoutException toe) {
+            throw new Error(toe); // cannot happen;
+        }
+    }
+
+    /**
+     * Waits until all {@link #getParties parties} have invoked <tt>await</tt>
+     * on this barrier.
+     *
+     * <p>If the current thread is not the last to arrive then it is
+     * disabled for thread scheduling purposes and lies dormant until
+     * one of the following things happens:
+     * <ul>
+     * <li>The last thread arrives; or
+     * <li>The specified timeout elapses; or
+     * <li>Some other thread {@link Thread#interrupt interrupts} the current
+     * thread; or
+     * <li>Some other thread  {@link Thread#interrupt interrupts} one of the
+     * other waiting threads; or
+     * <li>Some other thread times out while waiting for barrier; or
+     * <li>Some other thread invokes {@link #reset} on this barrier.
+     * </ul>
+     * <p>If the current thread:
+     * <ul>
+     * <li>has its interrupted status set on entry to this method; or
+     * <li>is {@link Thread#interrupt interrupted} while waiting
+     * </ul>
+     * then {@link InterruptedException} is thrown and the current thread's
+     * interrupted status is cleared.
+     *
+     * <p>If the barrier is {@link #reset} while any thread is waiting, or if 
+     * the barrier {@link #isBroken is broken} when <tt>await</tt> is invoked,
+     * or while any thread is waiting,
+     * then {@link BrokenBarrierException} is thrown.
+     *
+     * <p>If any thread is {@link Thread#interrupt interrupted} while waiting,
+     * then all other waiting threads will throw 
+     * {@link BrokenBarrierException} and the barrier is placed in the broken
+     * state.
+     *
+     * <p>If the current thread is the last thread to arrive, and a
+     * non-null barrier action was supplied in the constructor, then the
+     * current thread runs the action before allowing the other threads to 
+     * continue.
+     * If an exception occurs during the barrier action then that exception
+     * will be propagated in the current thread and the barrier is placed in
+     * the broken state.
+     *
+     * @param timeout the time to wait for the barrier
+     * @param unit the time unit of the timeout parameter
+     * @return the arrival index of the current thread, where index
+     *  <tt>{@link #getParties()} - 1</tt> indicates the first to arrive and 
+     * zero indicates the last to arrive.
+     *
+     * @throws InterruptedException if the current thread was interrupted 
+     * while waiting
+     * @throws TimeoutException if the specified timeout elapses.
+     * @throws BrokenBarrierException if <em>another</em> thread was
+     * interrupted while the current thread was waiting, or the barrier was
+     * reset, or the barrier was broken when <tt>await</tt> was called,
+     * or the barrier action (if present) failed due an exception.
+     */
+    public int await(long timeout, TimeUnit unit) 
+        throws InterruptedException, 
+        BrokenBarrierException, 
+        TimeoutException {
+        return dowait(true, unit.toNanos(timeout));
+    }
+
+    /**
+     * Queries if this barrier is in a broken state.
+     * @return <tt>true</tt> if one or more parties broke out of this
+     * barrier due to interruption or timeout since construction or
+     * the last reset, or a barrier action failed due to an exception; 
+     * and <tt>false</tt> otherwise.
+     */
+    public boolean isBroken() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return broken;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Resets the barrier to its initial state.  If any parties are
+     * currently waiting at the barrier, they will return with a
+     * {@link BrokenBarrierException}. Note that resets <em>after</em>
+     * a breakage has occurred for other reasons can be complicated to
+     * carry out; threads need to re-synchronize in some other way,
+     * and choose one to perform the reset.  It may be preferable to
+     * instead create a new barrier for subsequent use.
+     */
+    public void reset() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            /*
+             * Retract generation number enough to cover threads
+             * currently waiting on current and still resuming from
+             * previous generation, plus similarly accommodating spans
+             * after the reset.
+             */
+            generation -= 4;
+            broken = false;
+            trip.signalAll();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Returns the number of parties currently waiting at the barrier.
+     * This method is primarily useful for debugging and assertions.
+     *
+     * @return the number of parties currently blocked in {@link #await}
+     **/
+    public int getNumberWaiting() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return parties - count;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+}

Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CyclicBarrier.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/DelayQueue.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/DelayQueue.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/DelayQueue.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/DelayQueue.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,366 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+
+package java.util.concurrent;
+import java.util.concurrent.locks.*;
+import java.util.*;
+
+/**
+ * An unbounded {@linkplain BlockingQueue blocking queue} of <tt>Delayed</tt>
+ * elements, in which an element can only be taken when its delay has expired.
+ * The <em>head</em> of the queue is that <tt>Delayed</tt> element whose delay
+ * expired furthest in the past - if no delay has expired there is no head and
+ * <tt>poll</tt> will return <tt>null</tt>.
+ * This queue does not permit <tt>null</tt> elements.
+ * <p>This class implements all of the <em>optional</em> methods
+ * of the {@link Collection} and {@link Iterator} interfaces.
+ *
+ * <p>This class is a member of the
+ * <a href="{@docRoot}/../guide/collections/index.html">
+ * Java Collections Framework</a>.
+ *
+ * @since 1.5
+ * @author Doug Lea
+ * @param <E> the type of elements held in this collection
+ */
+
+public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
+    implements BlockingQueue<E> {
+
+    private transient final ReentrantLock lock = new ReentrantLock();
+    private transient final Condition available = lock.newCondition();
+    private final PriorityQueue<E> q = new PriorityQueue<E>();
+
+    /**
+     * Creates a new <tt>DelayQueue</tt> that is initially empty.
+     */
+    public DelayQueue() {}
+
+    /**
+     * Creates a <tt>DelayQueue</tt> initially containing the elements of the
+     * given collection of {@link Delayed} instances.
+     *
+     * @param c the collection
+     * @throws NullPointerException if <tt>c</tt> or any element within it
+     * is <tt>null</tt>
+     *
+     */
+    public DelayQueue(Collection<? extends E> c) {
+        this.addAll(c);
+    }
+
+    /**
+     * Inserts the specified element into this delay queue.
+     *
+     * @param o the element to add
+     * @return <tt>true</tt>
+     * @throws NullPointerException if the specified element is <tt>null</tt>.
+     */
+    public boolean offer(E o) {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            E first = q.peek();
+            q.offer(o);
+            if (first == null || o.compareTo(first) < 0)
+                available.signalAll();
+            return true;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+
+    /**
+     * Adds the specified element to this delay queue. As the queue is
+     * unbounded this method will never block.
+     * @param o the element to add
+     * @throws NullPointerException if the specified element is <tt>null</tt>.
+     */
+    public void put(E o) {
+        offer(o);
+    }
+
+    /**
+     * Inserts the specified element into this delay queue. As the queue is
+     * unbounded this method will never block.
+     * @param o the element to add
+     * @param timeout This parameter is ignored as the method never blocks
+     * @param unit This parameter is ignored as the method never blocks
+     * @return <tt>true</tt>
+     * @throws NullPointerException if the specified element is <tt>null</tt>.
+     */
+    public boolean offer(E o, long timeout, TimeUnit unit) {
+        return offer(o);
+    }
+
+    /**
+     * Adds the specified element to this queue.
+     * @param o the element to add
+     * @return <tt>true</tt> (as per the general contract of
+     * <tt>Collection.add</tt>).
+     *
+     * @throws NullPointerException if the specified element is <tt>null</tt>.
+     */
+    public boolean add(E o) {
+        return offer(o);
+    }
+
+    public E take() throws InterruptedException {
+        final ReentrantLock lock = this.lock;
+        lock.lockInterruptibly();
+        try {
+            for (;;) {
+                E first = q.peek();
+                if (first == null) {
+                    available.await();
+                } else {
+                    long delay =  first.getDelay(TimeUnit.NANOSECONDS);
+                    if (delay > 0) {
+                        long tl = available.awaitNanos(delay);
+                    } else {
+                        E x = q.poll();
+                        assert x != null;
+                        if (q.size() != 0)
+                            available.signalAll(); // wake up other takers
+                        return x;
+
+                    }
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public E poll(long time, TimeUnit unit) throws InterruptedException {
+        final ReentrantLock lock = this.lock;
+        lock.lockInterruptibly();
+        long nanos = unit.toNanos(time);
+        try {
+            for (;;) {
+                E first = q.peek();
+                if (first == null) {
+                    if (nanos <= 0)
+                        return null;
+                    else
+                        nanos = available.awaitNanos(nanos);
+                } else {
+                    long delay =  first.getDelay(TimeUnit.NANOSECONDS);
+                    if (delay > 0) {
+                        if (delay > nanos)
+                            delay = nanos;
+                        long timeLeft = available.awaitNanos(delay);
+                        nanos -= delay - timeLeft;
+                    } else {
+                        E x = q.poll();
+                        assert x != null;
+                        if (q.size() != 0)
+                            available.signalAll();
+                        return x;
+                    }
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+
+    public E poll() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            E first = q.peek();
+            if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
+                return null;
+            else {
+                E x = q.poll();
+                assert x != null;
+                if (q.size() != 0)
+                    available.signalAll();
+                return x;
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public E peek() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return q.peek();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public int size() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return q.size();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public int drainTo(Collection<? super E> c) {
+        if (c == null)
+            throw new NullPointerException();
+        if (c == this)
+            throw new IllegalArgumentException();
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            int n = 0;
+            for (;;) {
+                E first = q.peek();
+                if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
+                    break;
+                c.add(q.poll());
+                ++n;
+            }
+            if (n > 0)
+                available.signalAll();
+            return n;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public int drainTo(Collection<? super E> c, int maxElements) {
+        if (c == null)
+            throw new NullPointerException();
+        if (c == this)
+            throw new IllegalArgumentException();
+        if (maxElements <= 0)
+            return 0;
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            int n = 0;
+            while (n < maxElements) {
+                E first = q.peek();
+                if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
+                    break;
+                c.add(q.poll());
+                ++n;
+            }
+            if (n > 0)
+                available.signalAll();
+            return n;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Atomically removes all of the elements from this delay queue.
+     * The queue will be empty after this call returns.
+     */
+    public void clear() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            q.clear();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Always returns <tt>Integer.MAX_VALUE</tt> because
+     * a <tt>DelayQueue</tt> is not capacity constrained.
+     * @return <tt>Integer.MAX_VALUE</tt>
+     */
+    public int remainingCapacity() {
+        return Integer.MAX_VALUE;
+    }
+
+    public Object[] toArray() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return q.toArray();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public <T> T[] toArray(T[] array) {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return q.toArray(array);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public boolean remove(Object o) {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return q.remove(o);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Returns an iterator over the elements in this queue. The iterator
+     * does not return the elements in any particular order. The
+     * returned iterator is a thread-safe "fast-fail" iterator that will
+     * throw {@link java.util.ConcurrentModificationException}
+     * upon detected interference.
+     *
+     * @return an iterator over the elements in this queue.
+     */
+    public Iterator<E> iterator() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return new Itr(q.iterator());
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private class Itr<E> implements Iterator<E> {
+        private final Iterator<E> iter;
+        Itr(Iterator<E> i) {
+            iter = i;
+        }
+
+        public boolean hasNext() {
+            return iter.hasNext();
+        }
+
+        public E next() {
+            final ReentrantLock lock = DelayQueue.this.lock;
+            lock.lock();
+            try {
+                return iter.next();
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        public void remove() {
+            final ReentrantLock lock = DelayQueue.this.lock;
+            lock.lock();
+            try {
+                iter.remove();
+            } finally {
+                lock.unlock();
+            }
+        }
+    }
+
+}

Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/DelayQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Delayed.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Delayed.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Delayed.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Delayed.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,32 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+
+import java.util.*;
+
+/**
+ * A mix-in style interface for marking objects that should be
+ * acted upon after a given delay.
+ *
+ * <p>An implementation of this interface must define a
+ * <tt>compareTo</tt> method that provides an ordering consistent with
+ * its <tt>getDelay</tt> method.
+ *
+ * @since 1.5
+ * @author Doug Lea
+ */
+public interface Delayed extends Comparable {
+
+    /**
+     * Returns the delay associated with this object, in the given time unit.
+     *
+     * @param unit the time unit
+     * @return the delay; zero or negative values indicate that the
+     * delay has already elapsed
+     */
+    long getDelay(TimeUnit unit);
+}

Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Delayed.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Exchanger.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Exchanger.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Exchanger.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Exchanger.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,247 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+import java.util.concurrent.locks.*;
+
+/**
+ * A synchronization point at which two threads can exchange objects.
+ * Each thread presents some object on entry to the {@link #exchange
+ * exchange} method, and receives the object presented by the other
+ * thread on return.
+ *
+ * <p><b>Sample Usage:</b>
+ * Here are the highlights of a class that uses an <tt>Exchanger</tt> to
+ * swap buffers between threads so that the thread filling the
+ * buffer gets a freshly
+ * emptied one when it needs it, handing off the filled one to
+ * the thread emptying the buffer.
+ * <pre>
+ * class FillAndEmpty {
+ *   Exchanger&lt;DataBuffer&gt; exchanger = new Exchanger();
+ *   DataBuffer initialEmptyBuffer = ... a made-up type
+ *   DataBuffer initialFullBuffer = ...
+ *
+ *   class FillingLoop implements Runnable {
+ *     public void run() {
+ *       DataBuffer currentBuffer = initialEmptyBuffer;
+ *       try {
+ *         while (currentBuffer != null) {
+ *           addToBuffer(currentBuffer);
+ *           if (currentBuffer.full())
+ *             currentBuffer = exchanger.exchange(currentBuffer);
+ *         }
+ *       } catch (InterruptedException ex) { ... handle ... }
+ *     }
+ *   }
+ *
+ *   class EmptyingLoop implements Runnable {
+ *     public void run() {
+ *       DataBuffer currentBuffer = initialFullBuffer;
+ *       try {
+ *         while (currentBuffer != null) {
+ *           takeFromBuffer(currentBuffer);
+ *           if (currentBuffer.empty())
+ *             currentBuffer = exchanger.exchange(currentBuffer);
+ *         }
+ *       } catch (InterruptedException ex) { ... handle ...}
+ *     }
+ *   }
+ *
+ *   void start() {
+ *     new Thread(new FillingLoop()).start();
+ *     new Thread(new EmptyingLoop()).start();
+ *   }
+ * }
+ * </pre>
+ *
+ * @since 1.5
+ * @author Doug Lea
+ * @param <V> The type of objects that may be exchanged
+ */
+public class Exchanger<V> {
+    private final ReentrantLock lock = new ReentrantLock();
+    private final Condition taken = lock.newCondition();
+
+    /** Holder for the item being exchanged */
+    private V item;
+    
+    /**
+     * Arrival count transitions from 0 to 1 to 2 then back to 0
+     * during an exchange.
+     */
+    private int arrivalCount;
+
+    /**
+     * Main exchange function, handling the different policy variants.
+     */
+    private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException {
+        lock.lock();
+        try {
+            V other;
+
+            // If arrival count already at two, we must wait for
+            // a previous pair to finish and reset the count;
+            while (arrivalCount == 2) {
+                if (!timed)
+                    taken.await();
+                else if (nanos > 0) 
+                    nanos = taken.awaitNanos(nanos);
+                else 
+                    throw new TimeoutException();
+            }
+
+            int count = ++arrivalCount;
+
+            // If item is already waiting, replace it and signal other thread
+            if (count == 2) { 
+                other = item;
+                item = x;
+                taken.signal();
+                return other;
+            }
+
+            // Otherwise, set item and wait for another thread to
+            // replace it and signal us.
+
+            item = x;
+            InterruptedException interrupted = null;
+            try { 
+                while (arrivalCount != 2) {
+                    if (!timed)
+                        taken.await();
+                    else if (nanos > 0) 
+                        nanos = taken.awaitNanos(nanos);
+                    else 
+                        break; // timed out
+                }
+            } catch (InterruptedException ie) {
+                interrupted = ie;
+            }
+
+            // Get and reset item and count after the wait.
+            // (We need to do this even if wait was aborted.)
+            other = item;
+            item = null;
+            count = arrivalCount;
+            arrivalCount = 0; 
+            taken.signal();
+            
+            // If the other thread replaced item, then we must
+            // continue even if cancelled.
+            if (count == 2) {
+                if (interrupted != null)
+                    Thread.currentThread().interrupt();
+                return other;
+            }
+
+            // If no one is waiting for us, we can back out
+            if (interrupted != null) 
+                throw interrupted;
+            else  // must be timeout
+                throw new TimeoutException();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Create a new Exchanger.
+     **/
+    public Exchanger() {
+    }
+
+    /**
+     * Waits for another thread to arrive at this exchange point (unless
+     * it is {@link Thread#interrupt interrupted}),
+     * and then transfers the given object to it, receiving its object
+     * in return.
+     * <p>If another thread is already waiting at the exchange point then
+     * it is resumed for thread scheduling purposes and receives the object
+     * passed in by the current thread. The current thread returns immediately,
+     * receiving the object passed to the exchange by that other thread.
+     * <p>If no other thread is already waiting at the exchange then the 
+     * current thread is disabled for thread scheduling purposes and lies
+     * dormant until one of two things happens:
+     * <ul>
+     * <li>Some other thread enters the exchange; or
+     * <li>Some other thread {@link Thread#interrupt interrupts} the current
+     * thread.
+     * </ul>
+     * <p>If the current thread:
+     * <ul>
+     * <li>has its interrupted status set on entry to this method; or 
+     * <li>is {@link Thread#interrupt interrupted} while waiting
+     * for the exchange, 
+     * </ul>
+     * then {@link InterruptedException} is thrown and the current thread's 
+     * interrupted status is cleared. 
+     *
+     * @param x the object to exchange
+     * @return the object provided by the other thread.
+     * @throws InterruptedException if current thread was interrupted 
+     * while waiting
+     **/
+    public V exchange(V x) throws InterruptedException {
+        try {
+            return doExchange(x, false, 0);
+        } catch (TimeoutException cannotHappen) { 
+            throw new Error(cannotHappen);
+        }
+    }
+
+    /**
+     * Waits for another thread to arrive at this exchange point (unless
+     * it is {@link Thread#interrupt interrupted}, or the specified waiting
+     * time elapses),
+     * and then transfers the given object to it, receiving its object
+     * in return.
+     *
+     * <p>If another thread is already waiting at the exchange point then
+     * it is resumed for thread scheduling purposes and receives the object
+     * passed in by the current thread. The current thread returns immediately,
+     * receiving the object passed to the exchange by that other thread.
+     *
+     * <p>If no other thread is already waiting at the exchange then the 
+     * current thread is disabled for thread scheduling purposes and lies
+     * dormant until one of three things happens:
+     * <ul>
+     * <li>Some other thread enters the exchange; or
+     * <li>Some other thread {@link Thread#interrupt interrupts} the current
+     * thread; or
+     * <li>The specified waiting time elapses.
+     * </ul>
+     * <p>If the current thread:
+     * <ul>
+     * <li>has its interrupted status set on entry to this method; or 
+     * <li>is {@link Thread#interrupt interrupted} while waiting
+     * for the exchange, 
+     * </ul>
+     * then {@link InterruptedException} is thrown and the current thread's 
+     * interrupted status is cleared. 
+     *
+     * <p>If the specified waiting time elapses then {@link TimeoutException}
+     * is thrown.
+     * If the time is 
+     * less than or equal to zero, the method will not wait at all.
+     *
+     * @param x the object to exchange
+     * @param timeout the maximum time to wait
+     * @param unit the time unit of the <tt>timeout</tt> argument.
+     * @return the object provided by the other thread.
+     * @throws InterruptedException if current thread was interrupted
+     * while waiting
+     * @throws TimeoutException if the specified waiting time elapses before
+     * another thread enters the exchange.
+     **/
+    public V exchange(V x, long timeout, TimeUnit unit) 
+        throws InterruptedException, TimeoutException {
+        return doExchange(x, true, unit.toNanos(timeout));
+    }
+
+}
+
+

Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Exchanger.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutionException.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutionException.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutionException.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutionException.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,65 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+
+/**
+ * Exception thrown when attempting to retrieve the result of a task
+ * that aborted by throwing an exception. This exception can be
+ * inspected using the {@link #getCause()} method.
+ *
+ * @see Future
+ * @since 1.5
+ * @author Doug Lea
+ */
+public class ExecutionException extends Exception {
+    private static final long serialVersionUID = 7830266012832686185L;
+
+    /**
+     * Constructs a <tt>ExecutionException</tt> with no detail message.
+     * The cause is not initialized, and may subsequently be
+     * initialized by a call to {@link #initCause(Throwable) initCause}.
+     */
+    protected ExecutionException() { }
+
+    /**
+     * Constructs a <tt>ExecutionException</tt> with the specified detail
+     * message. The cause is not initialized, and may subsequently be
+     * initialized by a call to {@link #initCause(Throwable) initCause}.
+     *
+     * @param message the detail message
+     */
+    protected ExecutionException(String message) {
+        super(message);
+    }
+
+    /**
+     * Constructs a <tt>ExecutionException</tt> with the specified detail
+     * message and cause.
+     *
+     * @param  message the detail message
+     * @param  cause the cause (which is saved for later retrieval by the
+     *         {@link #getCause()} method)
+     */
+    public ExecutionException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    /**
+     * Constructs a <tt>ExecutionException</tt> with the specified cause.
+     * The detail message is set to:
+     * <pre>
+     *  (cause == null ? null : cause.toString())</pre>
+     * (which typically contains the class and detail message of
+     * <tt>cause</tt>).
+     *
+     * @param  cause the cause (which is saved for later retrieval by the
+     *         {@link #getCause()} method)
+     */
+    public ExecutionException(Throwable cause) {
+        super(cause);
+    }
+}

Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutionException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Executor.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Executor.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Executor.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Executor.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,107 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+
+/**
+ * An object that executes submitted {@link Runnable} tasks. This
+ * interface provides a way of decoupling task submission from the
+ * mechanics of how each task will be run, including details of thread
+ * use, scheduling, etc.  An <tt>Executor</tt> is normally used
+ * instead of explicitly creating threads. For example, rather than
+ * invoking <tt>new Thread(new(RunnableTask())).start()</tt> for each
+ * of a set of tasks, you might use:
+ *
+ * <pre>
+ * Executor executor = <em>anExecutor</em>;
+ * executor.execute(new RunnableTask1());
+ * executor.execute(new RunnableTask2());
+ * ...
+ * </pre>
+ * 
+ * However, the <tt>Executor</tt> interface does not strictly
+ * require that execution be asynchronous. In the simplest case, an
+ * executor can run the submitted task immediately in the caller's
+ * thread:
+ *
+ * <pre>
+ * class DirectExecutor implements Executor {
+ *     public void execute(Runnable r) {
+ *         r.run();
+ *     }
+ * }</pre>
+ *
+ * More typically, tasks are executed in some thread other
+ * than the caller's thread.  The executor below spawns a new thread
+ * for each task.
+ *
+ * <pre>
+ * class ThreadPerTaskExecutor implements Executor {
+ *     public void execute(Runnable r) {
+ *         new Thread(r).start();
+ *     }
+ * }</pre>
+ *
+ * Many <tt>Executor</tt> implementations impose some sort of
+ * limitation on how and when tasks are scheduled.  The executor below
+ * serializes the submission of tasks to a second executor,
+ * illustrating a composite executor.
+ *
+ * <pre>
+ * class SerialExecutor implements Executor {
+ *     final Queue&lt;Runnable&gt; tasks = new LinkedBlockingQueue&lt;Runnable&gt;();
+ *     final Executor executor;
+ *     Runnable active;
+ *
+ *     SerialExecutor(Executor executor) {
+ *         this.executor = executor;
+ *     }
+ *
+ *     public synchronized void execute(final Runnable r) {
+ *         tasks.offer(new Runnable() {
+ *             public void run() {
+ *                 try {
+ *                     r.run();
+ *                 } finally {
+ *                     scheduleNext();
+ *                 }
+ *             }
+ *         });
+ *         if (active == null) {
+ *             scheduleNext();
+ *         }
+ *     }
+ *
+ *     protected synchronized void scheduleNext() {
+ *         if ((active = tasks.poll()) != null) {
+ *             executor.execute(active);
+ *         }
+ *     }
+ * }</pre>
+ *
+ * The <tt>Executor</tt> implementations provided in this package
+ * implement {@link ExecutorService}, which is a more extensive
+ * interface.  The {@link ThreadPoolExecutor} class provides an
+ * extensible thread pool implementation. The {@link Executors} class
+ * provides convenient factory methods for these Executors.
+ *
+ * @since 1.5
+ * @author Doug Lea
+ */
+public interface Executor {
+
+    /**
+     * Executes the given command at some time in the future.  The command
+     * may execute in a new thread, in a pooled thread, or in the calling
+     * thread, at the discretion of the <tt>Executor</tt> implementation.
+     *
+     * @param command the runnable task
+     * @throws RejectedExecutionException if this task cannot be
+     * accepted for execution.
+     * @throws NullPointerException if command is null
+     */
+    void execute(Runnable command);
+}

Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Executor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorCompletionService.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorCompletionService.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorCompletionService.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorCompletionService.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,148 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+
+
+/**
+ * A {@link CompletionService} that uses a supplied {@link Executor}
+ * to execute tasks.  This class arranges that submitted tasks are,
+ * upon completion, placed on a queue accessible using <tt>take</tt>.
+ * The class is lightweight enough to be suitable for transient use
+ * when processing groups of tasks.
+ *
+ * <p>
+ *
+ * <b>Usage Examples.</b>
+ *
+ * Suppose you have a set of solvers for a certain problem, each
+ * returning a value of some type <tt>Result</tt>, and would like to
+ * run them concurrently, processing the results of each of them that
+ * return a non-null value, in some method <tt>use(Result r)</tt>. You
+ * could write this as:
+ *
+ * <pre>
+ *    void solve(Executor e, Collection&lt;Callable&lt;Result&gt;&gt; solvers)
+ *      throws InterruptedException, ExecutionException {
+ *        CompletionService&lt;Result&gt; ecs = new ExecutorCompletionService&lt;Result&gt;(e);
+ *        for (Callable&lt;Result&gt; s : solvers)
+ *            ecs.submit(s);
+ *        int n = solvers.size();
+ *        for (int i = 0; i &lt; n; ++i) {
+ *            Result r = ecs.take().get();
+ *            if (r != null) 
+ *                use(r);
+ *        }
+ *    }
+ * </pre>
+ *
+ * Suppose instead that you would like to use the first non-null result
+ * of the set of tasks, ignoring any that encounter exceptions,
+ * and cancelling all other tasks when the first one is ready:
+ *
+ * <pre>
+ *    void solve(Executor e, Collection&lt;Callable&lt;Result&gt;&gt; solvers) 
+ *      throws InterruptedException {
+ *        CompletionService&lt;Result&gt; ecs = new ExecutorCompletionService&lt;Result&gt;(e);
+ *        int n = solvers.size();
+ *        List&lt;Future&lt;Result&gt;&gt; futures = new ArrayList&lt;Future&lt;Result&gt;&gt;(n);
+ *        Result result = null;
+ *        try {
+ *            for (Callable&lt;Result&gt; s : solvers)
+ *                futures.add(ecs.submit(s));
+ *            for (int i = 0; i &lt; n; ++i) {
+ *                try {
+ *                    Result r = ecs.take().get();
+ *                    if (r != null) {
+ *                        result = r;
+ *                        break;
+ *                    }
+ *                } catch(ExecutionException ignore) {}
+ *            }
+ *        }
+ *        finally {
+ *            for (Future&lt;Result&gt; f : futures)
+ *                f.cancel(true);
+ *        }
+ *
+ *        if (result != null)
+ *            use(result);
+ *    }
+ * </pre>
+ */
+public class ExecutorCompletionService<V> implements CompletionService<V> {
+    private final Executor executor;
+    private final BlockingQueue<Future<V>> completionQueue;
+
+    /**
+     * FutureTask extension to enqueue upon completion
+     */
+    private class QueueingFuture extends FutureTask<V> {
+        QueueingFuture(Callable<V> c) { super(c); }
+        QueueingFuture(Runnable t, V r) { super(t, r); }
+        protected void done() { completionQueue.add(this); }
+    }
+
+    /**
+     * Creates an ExecutorCompletionService using the supplied
+     * executor for base task execution and a
+     * {@link LinkedBlockingQueue} as a completion queue.
+     * @param executor the executor to use
+     * @throws NullPointerException if executor is <tt>null</tt>
+     */
+    public ExecutorCompletionService(Executor executor) {
+        if (executor == null) 
+            throw new NullPointerException();
+        this.executor = executor;
+        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
+    }
+
+    /**
+     * Creates an ExecutorCompletionService using the supplied
+     * executor for base task execution and the supplied queue as its
+     * completion queue.
+     * @param executor the executor to use
+     * @param completionQueue the queue to use as the completion queue
+     * normally one dedicated for use by this service
+     * @throws NullPointerException if executor or completionQueue are <tt>null</tt>
+     */
+    public ExecutorCompletionService(Executor executor,
+                                     BlockingQueue<Future<V>> completionQueue) {
+        if (executor == null || completionQueue == null) 
+            throw new NullPointerException();
+        this.executor = executor;
+        this.completionQueue = completionQueue;
+    }
+
+    public Future<V> submit(Callable<V> task) {
+        if (task == null) throw new NullPointerException();
+        QueueingFuture f = new QueueingFuture(task);
+        executor.execute(f);
+        return f;
+    }
+
+    public Future<V> submit(Runnable task, V result) {
+        if (task == null) throw new NullPointerException();
+        QueueingFuture f = new QueueingFuture(task, result);
+        executor.execute(f);
+        return f;
+    }
+
+    public Future<V> take() throws InterruptedException {
+        return completionQueue.take();
+    }
+
+    public Future<V> poll() {
+        return completionQueue.poll();
+    }
+
+    public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
+        return completionQueue.poll(timeout, unit);
+    }
+
+}
+
+

Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorCompletionService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorService.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorService.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorService.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorService.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,286 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+
+import java.util.List;
+import java.util.Collection;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * An {@link Executor} that provides methods to manage termination and
+ * methods that can produce a {@link Future} for tracking progress of
+ * one or more asynchronous tasks.  
+ *
+ * <p>
+ * An <tt>ExecutorService</tt> can be shut down, which will cause it
+ * to stop accepting new tasks.  After being shut down, the executor
+ * will eventually terminate, at which point no tasks are actively
+ * executing, no tasks are awaiting execution, and no new tasks can be
+ * submitted.
+ *
+ * <p> Method <tt>submit</tt> extends base method {@link
+ * Executor#execute} by creating and returning a {@link Future} that
+ * can be used to cancel execution and/or wait for completion.
+ * Methods <tt>invokeAny</tt> and <tt>invokeAll</tt> perform the most
+ * commonly useful forms of bulk execution, executing a collection of
+ * tasks and then waiting for at least one, or all, to
+ * complete. (Class {@link ExecutorCompletionService} can be used to
+ * write customized variants of these methods.)
+ *
+ * <p>The {@link Executors} class provides factory methods for the
+ * executor services provided in this package.
+ *
+ * <h3>Usage Example</h3>
+ *
+ * Here is a sketch of a network service in which threads in a thread
+ * pool service incoming requests. It uses the preconfigured {@link
+ * Executors#newFixedThreadPool} factory method:
+ *
+ * <pre>
+ * class NetworkService {
+ *    private final ServerSocket serverSocket;
+ *    private final ExecutorService pool;
+ *
+ *    public NetworkService(int port, int poolSize) throws IOException {
+ *      serverSocket = new ServerSocket(port);
+ *      pool = Executors.newFixedThreadPool(poolSize);
+ *    }
+ * 
+ *    public void serve() {
+ *      try {
+ *        for (;;) {
+ *          pool.execute(new Handler(serverSocket.accept()));
+ *        }
+ *      } catch (IOException ex) {
+ *        pool.shutdown();
+ *      }
+ *    }
+ *  }
+ *
+ *  class Handler implements Runnable {
+ *    private final Socket socket;
+ *    Handler(Socket socket) { this.socket = socket; }
+ *    public void run() {
+ *      // read and service request
+ *    }
+ * }
+ * </pre>
+ * @since 1.5
+ * @author Doug Lea
+ */
+public interface ExecutorService extends Executor {
+
+    /**
+     * Initiates an orderly shutdown in which previously submitted
+     * tasks are executed, but no new tasks will be
+     * accepted. Invocation has no additional effect if already shut
+     * down.
+     * @throws SecurityException if a security manager exists and
+     * shutting down this ExecutorService may manipulate threads that
+     * the caller is not permitted to modify because it does not hold
+     * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
+     * or the security manager's <tt>checkAccess</tt>  method denies access.
+     */
+    void shutdown();
+
+    /**
+     * Attempts to stop all actively executing tasks, halts the
+     * processing of waiting tasks, and returns a list of the tasks that were
+     * awaiting execution. 
+     *  
+     * <p>There are no guarantees beyond best-effort attempts to stop
+     * processing actively executing tasks.  For example, typical
+     * implementations will cancel via {@link Thread#interrupt}, so if any
+     * tasks mask or fail to respond to interrupts, they may never terminate.
+     *
+     * @return list of tasks that never commenced execution
+     * @throws SecurityException if a security manager exists and
+     * shutting down this ExecutorService may manipulate threads that
+     * the caller is not permitted to modify because it does not hold
+     * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
+     * or the security manager's <tt>checkAccess</tt> method denies access.
+     */
+    List<Runnable> shutdownNow();
+
+    /**
+     * Returns <tt>true</tt> if this executor has been shut down.
+     *
+     * @return <tt>true</tt> if this executor has been shut down
+     */
+    boolean isShutdown();
+
+    /**
+     * Returns <tt>true</tt> if all tasks have completed following shut down.
+     * Note that <tt>isTerminated</tt> is never <tt>true</tt> unless
+     * either <tt>shutdown</tt> or <tt>shutdownNow</tt> was called first.
+     *
+     * @return <tt>true</tt> if all tasks have completed following shut down
+     */
+    boolean isTerminated();
+
+    /**
+     * Blocks until all tasks have completed execution after a shutdown
+     * request, or the timeout occurs, or the current thread is
+     * interrupted, whichever happens first.
+     *
+     * @param timeout the maximum time to wait
+     * @param unit the time unit of the timeout argument
+     * @return <tt>true</tt> if this executor terminated and <tt>false</tt>
+     * if the timeout elapsed before termination
+     * @throws InterruptedException if interrupted while waiting
+     */
+    boolean awaitTermination(long timeout, TimeUnit unit)
+        throws InterruptedException;
+
+
+    /**
+     * Submits a value-returning task for execution and returns a Future
+     * representing the pending results of the task. 
+     *
+     * <p>
+     * If you would like to immediately block waiting
+     * for a task, you can use constructions of the form
+     * <tt>result = exec.submit(aCallable).get();</tt>
+     *
+     * <p> Note: The {@link Executors} class includes a set of methods
+     * that can convert some other common closure-like objects,
+     * for example, {@link java.security.PrivilegedAction} to
+     * {@link Callable} form so they can be submitted.
+     *
+     * @param task the task to submit
+     * @return a Future representing pending completion of the task
+     * @throws RejectedExecutionException if task cannot be scheduled
+     * for execution
+     * @throws NullPointerException if task null
+     */
+    <T> Future<T> submit(Callable<T> task);
+
+    /**
+     * Submits a Runnable task for execution and returns a Future 
+     * representing that task that will upon completion return 
+     * the given result
+     *
+     * @param task the task to submit
+     * @param result the result to return
+     * @return a Future representing pending completion of the task,
+     * and whose <tt>get()</tt> method will return the given result
+     * upon completion.
+     * @throws RejectedExecutionException if task cannot be scheduled
+     * for execution
+     * @throws NullPointerException if task null     
+     */
+    <T> Future<T> submit(Runnable task, T result);
+
+    /**
+     * Submits a Runnable task for execution and returns a Future 
+     * representing that task.
+     *
+     * @param task the task to submit
+     * @return a Future representing pending completion of the task,
+     * and whose <tt>get()</tt> method will return <tt>null</tt>
+     * upon completion.
+     * @throws RejectedExecutionException if task cannot be scheduled
+     * for execution
+     * @throws NullPointerException if task null
+     */
+    Future<?> submit(Runnable task);
+
+    /**
+     * Executes the given tasks, returning their results
+     * when all complete.
+     * Note that a <em>completed</em> task could have
+     * terminated either normally or by throwing an exception.
+     * The results of this method are undefined if the given
+     * collection is modified while this operation is in progress.
+     * @param tasks the collection of tasks
+     * @return A list of Futures representing the tasks, in the same
+     * sequential order as produced by the iterator for the given task
+     * list, each of which has completed.
+     * @throws InterruptedException if interrupted while waiting, in
+     * which case unfinished tasks are cancelled.
+     * @throws NullPointerException if tasks or any of its elements are <tt>null</tt>
+     * @throws RejectedExecutionException if any task cannot be scheduled
+     * for execution
+     */
+
+    <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks)
+        throws InterruptedException;
+
+    /**
+     * Executes the given tasks, returning their results
+     * when all complete or the timeout expires, whichever happens first.
+     * Upon return, tasks that have not completed are cancelled.
+     * Note that a <em>completed</em> task could have
+     * terminated either normally or by throwing an exception.
+     * The results of this method are undefined if the given
+     * collection is modified while this operation is in progress.
+     * @param tasks the collection of tasks
+     * @param timeout the maximum time to wait
+     * @param unit the time unit of the timeout argument
+     * @return A list of Futures representing the tasks, in the same
+     * sequential order as produced by the iterator for the given
+     * task list. If the operation did not time out, each task will
+     * have completed. If it did time out, some of thiese tasks will
+     * not have completed.
+     * @throws InterruptedException if interrupted while waiting, in
+     * which case unfinished tasks are cancelled.
+     * @throws NullPointerException if tasks, any of its elements, or
+     * unit are <tt>null</tt>
+     * @throws RejectedExecutionException if any task cannot be scheduled
+     * for execution
+     */
+    <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks, 
+                                  long timeout, TimeUnit unit) 
+        throws InterruptedException;
+
+    /**
+     * Executes the given tasks, returning the result
+     * of one that has completed successfully (i.e., without throwing
+     * an exception), if any do. Upon normal or exceptional return,
+     * tasks that have not completed are cancelled.
+     * The results of this method are undefined if the given
+     * collection is modified while this operation is in progress.
+     * @param tasks the collection of tasks
+     * @return The result returned by one of the tasks.
+     * @throws InterruptedException if interrupted while waiting
+     * @throws NullPointerException if tasks or any of its elements
+     * are <tt>null</tt>
+     * @throws IllegalArgumentException if tasks empty
+     * @throws ExecutionException if no task successfully completes
+     * @throws RejectedExecutionException if tasks cannot be scheduled
+     * for execution
+     */
+    <T> T invokeAny(Collection<Callable<T>> tasks)
+        throws InterruptedException, ExecutionException;
+
+    /**
+     * Executes the given tasks, returning the result
+     * of one that has completed successfully (i.e., without throwing
+     * an exception), if any do before the given timeout elapses.
+     * Upon normal or exceptional return, tasks that have not
+     * completed are cancelled.
+     * The results of this method are undefined if the given
+     * collection is modified while this operation is in progress.
+     * @param tasks the collection of tasks
+     * @param timeout the maximum time to wait
+     * @param unit the time unit of the timeout argument
+     * @return The result returned by one of the tasks.
+     * @throws InterruptedException if interrupted while waiting
+     * @throws NullPointerException if tasks, any of its elements, or
+     * unit are <tt>null</tt>
+     * @throws TimeoutException if the given timeout elapses before
+     * any task successfully completes
+     * @throws ExecutionException if no task successfully completes
+     * @throws RejectedExecutionException if tasks cannot be scheduled
+     * for execution
+     */
+    <T> T invokeAny(Collection<Callable<T>> tasks, 
+                    long timeout, TimeUnit unit) 
+        throws InterruptedException, ExecutionException, TimeoutException;
+
+}

Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorService.java
------------------------------------------------------------------------------
    svn:eol-style = native