You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@harmony.apache.org by hi...@apache.org on 2009/07/28 11:30:48 UTC

svn commit: r798469 [11/28] - in /harmony/enhanced/classlib/branches/java6: ./ depends/build/platform/ depends/files/ depends/jars/ depends/manifests/icu4j_4.0/ depends/manifests/icu4j_4.2.1/ depends/manifests/icu4j_4.2.1/META-INF/ make/ modules/access...

Modified: harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ScheduledExecutorService.java
URL: http://svn.apache.org/viewvc/harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ScheduledExecutorService.java?rev=798469&r1=798468&r2=798469&view=diff
==============================================================================
--- harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ScheduledExecutorService.java (original)
+++ harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ScheduledExecutorService.java Tue Jul 28 09:30:33 2009
@@ -10,13 +10,13 @@
 
 /**
  * An {@link ExecutorService} that can schedule commands to run after a given
- * delay, or to execute periodically. 
+ * delay, or to execute periodically.
  *
  * <p> The <tt>schedule</tt> methods create tasks with various delays
  * and return a task object that can be used to cancel or check
  * execution. The <tt>scheduleAtFixedRate</tt> and
  * <tt>scheduleWithFixedDelay</tt> methods create and execute tasks
- * that run periodically until cancelled.  
+ * that run periodically until cancelled.
  *
  * <p> Commands submitted using the {@link Executor#execute} and
  * {@link ExecutorService} <tt>submit</tt> methods are scheduled with
@@ -33,27 +33,27 @@
  * TimeUnit.MILLISECONDS)</tt>. Beware however that expiration of a
  * relative delay need not coincide with the current <tt>Date</tt> at
  * which the task is enabled due to network time synchronization
- * protocols, clock drift, or other factors. 
+ * protocols, clock drift, or other factors.
  *
  * The {@link Executors} class provides convenient factory methods for
  * the ScheduledExecutorService implementations provided in this package.
  *
  * <h3>Usage Example</h3>
- * 
+ *
  * Here is a class with a method that sets up a ScheduledExecutorService
  * to beep every ten seconds for an hour:
  *
  * <pre>
- * import static java.util.concurrent.TimeUnit;
+ * import static java.util.concurrent.TimeUnit.*;
  * class BeeperControl {
- *    private final ScheduledExecutorService scheduler = 
+ *    private final ScheduledExecutorService scheduler =
  *       Executors.newScheduledThreadPool(1);
  *
  *    public void beepForAnHour() {
  *        final Runnable beeper = new Runnable() {
  *                public void run() { System.out.println("beep"); }
  *            };
- *        final ScheduledFuture&lt;?&gt; beeperHandle = 
+ *        final ScheduledFuture&lt;?&gt; beeperHandle =
  *            scheduler.scheduleAtFixedRate(beeper, 10, 10, SECONDS);
  *        scheduler.schedule(new Runnable() {
  *                public void run() { beeperHandle.cancel(true); }
@@ -70,76 +70,90 @@
     /**
      * Creates and executes a one-shot action that becomes enabled
      * after the given delay.
-     * @param command the task to execute.
-     * @param delay the time from now to delay execution.
-     * @param unit the time unit of the delay parameter.
-     * @return a Future representing pending completion of the task,
-     * and whose <tt>get()</tt> method will return <tt>null</tt>
-     * upon completion.
-     * @throws RejectedExecutionException if task cannot be scheduled
-     * for execution.
+     *
+     * @param command the task to execute
+     * @param delay the time from now to delay execution
+     * @param unit the time unit of the delay parameter
+     * @return a ScheduledFuture representing pending completion of
+     *         the task and whose <tt>get()</tt> method will return
+     *         <tt>null</tt> upon completion
+     * @throws RejectedExecutionException if the task cannot be
+     *         scheduled for execution
      * @throws NullPointerException if command is null
      */
-    public ScheduledFuture<?> schedule(Runnable command, long delay,  TimeUnit unit);
+    public ScheduledFuture<?> schedule(Runnable command,
+                                       long delay, TimeUnit unit);
 
     /**
      * Creates and executes a ScheduledFuture that becomes enabled after the
      * given delay.
-     * @param callable the function to execute.
-     * @param delay the time from now to delay execution.
-     * @param unit the time unit of the delay parameter.
-     * @return a ScheduledFuture that can be used to extract result or cancel.
-     * @throws RejectedExecutionException if task cannot be scheduled
-     * for execution.
+     *
+     * @param callable the function to execute
+     * @param delay the time from now to delay execution
+     * @param unit the time unit of the delay parameter
+     * @return a ScheduledFuture that can be used to extract result or cancel
+     * @throws RejectedExecutionException if the task cannot be
+     *         scheduled for execution
      * @throws NullPointerException if callable is null
      */
-    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
+    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
+                                           long delay, TimeUnit unit);
 
     /**
      * Creates and executes a periodic action that becomes enabled first
      * after the given initial delay, and subsequently with the given
      * period; that is executions will commence after
      * <tt>initialDelay</tt> then <tt>initialDelay+period</tt>, then
-     * <tt>initialDelay + 2 * period</tt>, and so on.  
+     * <tt>initialDelay + 2 * period</tt>, and so on.
      * If any execution of the task
      * encounters an exception, subsequent executions are suppressed.
      * Otherwise, the task will only terminate via cancellation or
-     * termination of the executor.
-     * @param command the task to execute.
-     * @param initialDelay the time to delay first execution.
-     * @param period the period between successive executions.
+     * termination of the executor.  If any execution of this task
+     * takes longer than its period, then subsequent executions
+     * may start late, but will not concurrently execute.
+     *
+     * @param command the task to execute
+     * @param initialDelay the time to delay first execution
+     * @param period the period between successive executions
      * @param unit the time unit of the initialDelay and period parameters
-     * @return a Future representing pending completion of the task,
-     * and whose <tt>get()</tt> method will throw an exception upon
-     * cancellation.
-     * @throws RejectedExecutionException if task cannot be scheduled
-     * for execution.
+     * @return a ScheduledFuture representing pending completion of
+     *         the task, and whose <tt>get()</tt> method will throw an
+     *         exception upon cancellation
+     * @throws RejectedExecutionException if the task cannot be
+     *         scheduled for execution
      * @throws NullPointerException if command is null
-     * @throws IllegalArgumentException if period less than or equal to zero.
+     * @throws IllegalArgumentException if period less than or equal to zero
      */
-    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,  long period, TimeUnit unit);
-    
+    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
+                                                  long initialDelay,
+                                                  long period,
+                                                  TimeUnit unit);
+
     /**
      * Creates and executes a periodic action that becomes enabled first
      * after the given initial delay, and subsequently with the
      * given delay between the termination of one execution and the
-     * commencement of the next. If any execution of the task
+     * commencement of the next.  If any execution of the task
      * encounters an exception, subsequent executions are suppressed.
      * Otherwise, the task will only terminate via cancellation or
      * termination of the executor.
-     * @param command the task to execute.
-     * @param initialDelay the time to delay first execution.
+     *
+     * @param command the task to execute
+     * @param initialDelay the time to delay first execution
      * @param delay the delay between the termination of one
-     * execution and the commencement of the next.
+     * execution and the commencement of the next
      * @param unit the time unit of the initialDelay and delay parameters
-     * @return a Future representing pending completion of the task,
-     * and whose <tt>get()</tt> method will throw an exception upon
-     * cancellation.
-     * @throws RejectedExecutionException if task cannot be scheduled
-     * for execution.
+     * @return a ScheduledFuture representing pending completion of
+     *         the task, and whose <tt>get()</tt> method will throw an
+     *         exception upon cancellation
+     * @throws RejectedExecutionException if the task cannot be
+     *         scheduled for execution
      * @throws NullPointerException if command is null
-     * @throws IllegalArgumentException if delay less than or equal to zero.
+     * @throws IllegalArgumentException if delay less than or equal to zero
      */
-    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,  long delay, TimeUnit unit);
+    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
+                                                     long initialDelay,
+                                                     long delay,
+                                                     TimeUnit unit);
 
 }

Modified: harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java?rev=798469&r1=798468&r2=798469&view=diff
==============================================================================
--- harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java (original)
+++ harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java Tue Jul 28 09:30:33 2009
@@ -10,6 +10,7 @@
 
 package java.util.concurrent;
 import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
 import java.util.*;
 
 /**
@@ -20,25 +21,60 @@
  * flexibility or capabilities of {@link ThreadPoolExecutor} (which
  * this class extends) are required.
  *
- * <p> Delayed tasks execute no sooner than they are enabled, but
+ * <p>Delayed tasks execute no sooner than they are enabled, but
  * without any real-time guarantees about when, after they are
  * enabled, they will commence. Tasks scheduled for exactly the same
  * execution time are enabled in first-in-first-out (FIFO) order of
  * submission.
  *
+ * <p>Successive executions of a task scheduled via
+ * <code>scheduleAtFixedRate</code> or
+ * <code>scheduleWithFixedDelay</code> do not overlap. While different
+ * executions may be performed by different threads, the effects of
+ * prior executions <a
+ * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
+ * those of subsequent ones.
+ *
  * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
  * of the inherited tuning methods are not useful for it. In
- * particular, because it acts as a fixed-sized pool using
- * <tt>corePoolSize</tt> threads and an unbounded queue, adjustments
- * to <tt>maximumPoolSize</tt> have no useful effect.
+ * particular, because it acts as a fixed-sized pool using {@code
+ * corePoolSize} threads and an unbounded queue, adjustments to {@code
+ * maximumPoolSize} have no useful effect.
  *
  * @since 1.5
  * @author Doug Lea
  */
-public class ScheduledThreadPoolExecutor 
-        extends ThreadPoolExecutor 
+public class ScheduledThreadPoolExecutor
+        extends ThreadPoolExecutor
         implements ScheduledExecutorService {
 
+    /*
+     * This class specializes ThreadPoolExecutor implementation by
+     *
+     * 1. Using a custom task type, ScheduledFutureTask for
+     *    tasks, even those that don't require scheduling (i.e.,
+     *    those submitted using ExecutorService execute, not
+     *    ScheduledExecutorService methods) which are treated as
+     *    delayed tasks with a delay of zero.
+     *
+     * 2. Using a custom queue (DelayedWorkQueue), a variant of
+     *    unbounded DelayQueue. The lack of capacity constraint and
+     *    the fact that corePoolSize and maximumPoolSize are
+     *    effectively identical simplifies some execution mechanics
+     *    (see delayedExecute) compared to ThreadPoolExecutor.
+     *
+     * 3. Supporting optional run-after-shutdown parameters, which
+     *    leads to overrides of shutdown methods to remove and cancel
+     *    tasks that should NOT be run after shutdown, as well as
+     *    different recheck logic when task (re)submission overlaps
+     *    with a shutdown.
+     *
+     * 4. Task decoration methods to allow interception and
+     *    instrumentation, which are needed because subclasses cannot
+     *    otherwise override submit methods to get this effect. These
+     *    don't have any impact on pool control logic though.
+     */
+
     /**
      * False if should cancel/suppress periodic tasks on shutdown.
      */
@@ -50,28 +86,40 @@
     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
 
     /**
+     * True if ScheduledFutureTask.cancel should remove from queue
+     */
+    private volatile boolean removeOnCancel = false;
+
+    /**
      * Sequence number to break scheduling ties, and in turn to
      * guarantee FIFO order among tied entries.
      */
     private static final AtomicLong sequencer = new AtomicLong(0);
 
-    /** Base of nanosecond timings, to avoid wrapping */
-    private static final long NANO_ORIGIN = System.nanoTime();
 
     /**
-     * Returns nanosecond time offset by origin
+     * Value of System.nanoTime upon static initialization.  This is
+     * used as an offset by now() to avoid wraparound of time values
+     * that would make them appear negative.
      */
-    final long now() {
-	return System.nanoTime() - NANO_ORIGIN;
+    static final long initialNanoTime = System.nanoTime();
+
+    /**
+     * Returns current nanosecond time.
+     */
+    static long now() {
+        return System.nanoTime() - initialNanoTime;
     }
 
-    private class ScheduledFutureTask<V> 
+    private class ScheduledFutureTask<V>
             extends FutureTask<V> implements ScheduledFuture<V> {
-        
+
         /** Sequence number to break ties FIFO */
         private final long sequenceNumber;
+
         /** The time the task is enabled to execute in nanoTime units */
         private long time;
+
         /**
          * Period in nanoseconds for repeating tasks.  A positive
          * value indicates fixed-rate execution.  A negative value
@@ -80,8 +128,16 @@
          */
         private final long period;
 
+        /** The actual task to be re-enqueued by reExecutePeriodic */
+        ScheduledFutureTask<V> outerTask = this;
+
         /**
-         * Creates a one-shot action with given nanoTime-based trigger time
+         * Index into delay queue, to support faster cancellation.
+         */
+        int heapIndex;
+
+        /**
+         * Creates a one-shot action with given nanoTime-based trigger time.
          */
         ScheduledFutureTask(Runnable r, V result, long ns) {
             super(r, result);
@@ -91,9 +147,9 @@
         }
 
         /**
-         * Creates a periodic action with given nano time and period
+         * Creates a periodic action with given nano time and period.
          */
-        ScheduledFutureTask(Runnable r, V result, long ns,  long period) {
+        ScheduledFutureTask(Runnable r, V result, long ns, long period) {
             super(r, result);
             this.time = ns;
             this.period = period;
@@ -101,7 +157,7 @@
         }
 
         /**
-         * Creates a one-shot action with given nanoTime-based trigger
+         * Creates a one-shot action with given nanoTime-based trigger.
          */
         ScheduledFutureTask(Callable<V> callable, long ns) {
             super(callable);
@@ -111,122 +167,162 @@
         }
 
         public long getDelay(TimeUnit unit) {
-            long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
-            return d;
+            long d = time - now();
+            return d<=0? 0 : unit.convert(d, TimeUnit.NANOSECONDS);
         }
 
         public int compareTo(Delayed other) {
             if (other == this) // compare zero ONLY if same object
                 return 0;
-            ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
-            long diff = time - x.time;
-            if (diff < 0)
-                return -1;
-            else if (diff > 0)
-                return 1;
-            else if (sequenceNumber < x.sequenceNumber)
-                return -1;
-            else
-                return 1;
+            if (other instanceof ScheduledFutureTask) {
+                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
+                long diff = time - x.time;
+                if (diff < 0)
+                    return -1;
+                else if (diff > 0)
+                    return 1;
+                else if (sequenceNumber < x.sequenceNumber)
+                    return -1;
+                else
+                    return 1;
+            }
+            long d = (getDelay(TimeUnit.NANOSECONDS) -
+                      other.getDelay(TimeUnit.NANOSECONDS));
+            return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
         }
 
         /**
          * Returns true if this is a periodic (not a one-shot) action.
+         *
          * @return true if periodic
          */
-        boolean isPeriodic() {
+        public boolean isPeriodic() {
             return period != 0;
         }
 
         /**
-         * Run a periodic task
+         * Sets the next time to run for a periodic task.
          */
-        private void runPeriodic() {
-            boolean ok = ScheduledFutureTask.super.runAndReset();
-            boolean down = isShutdown();
-            // Reschedule if not cancelled and not shutdown or policy allows
-            if (ok && (!down ||
-                       (getContinueExistingPeriodicTasksAfterShutdownPolicy() && 
-                        !isTerminating()))) {
-                long p = period;
-                if (p > 0)
-                    time += p;
-                else
-                    time = now() - p;
-                ScheduledThreadPoolExecutor.super.getQueue().add(this);
-            }
-            // This might have been the final executed delayed
-            // task.  Wake up threads to check.
-            else if (down) 
-                interruptIdleWorkers();
+        private void setNextRunTime() {
+            long p = period;
+            if (p > 0)
+                time += p;
+            else
+                time = now() - p;
+        }
+
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            boolean cancelled = super.cancel(mayInterruptIfRunning);
+            if (cancelled && removeOnCancel && heapIndex >= 0)
+                remove(this);
+            return cancelled;
         }
 
         /**
          * Overrides FutureTask version so as to reset/requeue if periodic.
-         */ 
+         */
         public void run() {
-            if (isPeriodic())
-                runPeriodic();
-            else 
+            boolean periodic = isPeriodic();
+            if (!canRunInCurrentRunState(periodic))
+                cancel(false);
+            else if (!periodic)
                 ScheduledFutureTask.super.run();
+            else if (ScheduledFutureTask.super.runAndReset()) {
+                setNextRunTime();
+                reExecutePeriodic(outerTask);
+            }
+        }
+    }
+
+    /**
+     * Returns true if can run a task given current run state
+     * and run-after-shutdown parameters.
+     *
+     * @param periodic true if this task periodic, false if delayed
+     */
+    boolean canRunInCurrentRunState(boolean periodic) {
+        return isRunningOrShutdown(periodic ?
+                                   continueExistingPeriodicTasksAfterShutdown :
+                                   executeExistingDelayedTasksAfterShutdown);
+    }
+
+    /**
+     * Main execution method for delayed or periodic tasks.  If pool
+     * is shut down, rejects the task. Otherwise adds task to queue
+     * and starts a thread, if necessary, to run it.  (We cannot
+     * prestart the thread to run the task because the task (probably)
+     * shouldn't be run yet,) If the pool is shut down while the task
+     * is being added, cancel and remove it if required by state and
+     * run-after-shutdown parameters.
+     *
+     * @param task the task
+     */
+    private void delayedExecute(ScheduledFutureTask<?> task) {
+        if (isShutdown())
+            reject(task);
+        else {
+            super.getQueue().add(task);
+            if (isShutdown() &&
+                !canRunInCurrentRunState(task.isPeriodic()) &&
+                remove(task))
+                task.cancel(false);
+            else
+                prestartCoreThread();
         }
     }
 
     /**
-     * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
+     * Requeues a periodic task unless current run state precludes it.
+     * Same idea as delayedExecute except drops task rather than rejecting.
+     *
+     * @param task the task
      */
-    private void delayedExecute(Runnable command) {
-        if (isShutdown()) {
-            reject(command);
-            return;
-        }
-        // Prestart a thread if necessary. We cannot prestart it
-        // running the task because the task (probably) shouldn't be
-        // run yet, so thread will just idle until delay elapses.
-        if (getPoolSize() < getCorePoolSize())
-            prestartCoreThread();
-            
-        super.getQueue().add(command);
+    void reExecutePeriodic(ScheduledFutureTask<?> task) {
+        if (canRunInCurrentRunState(true)) {
+            super.getQueue().add(task);
+            if (!canRunInCurrentRunState(true) && remove(task))
+                task.cancel(false);
+            else
+                prestartCoreThread();
+        }
     }
 
     /**
-     * Cancel and clear the queue of all tasks that should not be run
-     * due to shutdown policy.
-     */
-    private void cancelUnwantedTasks() {
-        boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
-        boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
-        if (!keepDelayed && !keepPeriodic) 
-            super.getQueue().clear();
-        else if (keepDelayed || keepPeriodic) {
-            Object[] entries = super.getQueue().toArray();
-            for (int i = 0; i < entries.length; ++i) {
-                Object e = entries[i];
+     * Cancels and clears the queue of all tasks that should not be run
+     * due to shutdown policy.  Invoked within super.shutdown.
+     */
+    @Override void onShutdown() {
+        BlockingQueue<Runnable> q = super.getQueue();
+        boolean keepDelayed =
+            getExecuteExistingDelayedTasksAfterShutdownPolicy();
+        boolean keepPeriodic =
+            getContinueExistingPeriodicTasksAfterShutdownPolicy();
+        if (!keepDelayed && !keepPeriodic)
+            q.clear();
+        else {
+            // Traverse snapshot to avoid iterator exceptions
+            for (Object e : q.toArray()) {
                 if (e instanceof ScheduledFutureTask) {
-                    ScheduledFutureTask<?> t = (ScheduledFutureTask<?>)e;
-                    if (t.isPeriodic()? !keepPeriodic : !keepDelayed)
-                        t.cancel(false);
+                    ScheduledFutureTask<?> t =
+                        (ScheduledFutureTask<?>)e;
+                    if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
+                        t.isCancelled()) { // also remove if already cancelled
+                        if (q.remove(t))
+                            t.cancel(false);
+                    }
                 }
             }
-            entries = null;
-            purge();
         }
-    }
-
-    public boolean remove(Runnable task) {
-        if (!(task instanceof ScheduledFutureTask))
-            return false;
-        return getQueue().remove(task);
+        tryTerminate();
     }
 
     /**
-     * Creates a new ScheduledThreadPoolExecutor with the given core
-     * pool size.
-     * 
-     * @param corePoolSize the number of threads to keep in the pool,
-     * even if they are idle.
-     * @throws IllegalArgumentException if corePoolSize less than or
-     * equal to zero
+     * Creates a new {@code ScheduledThreadPoolExecutor} with the
+     * given core pool size.
+     *
+     * @param corePoolSize the number of threads to keep in the pool, even
+     *        if they are idle
+     * @throws IllegalArgumentException if {@code corePoolSize < 0}
      */
     public ScheduledThreadPoolExecutor(int corePoolSize) {
         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
@@ -234,14 +330,15 @@
     }
 
     /**
-     * Creates a new ScheduledThreadPoolExecutor with the given
-     * initial parameters.
-     * 
-     * @param corePoolSize the number of threads to keep in the pool,
-     * even if they are idle.
+     * Creates a new {@code ScheduledThreadPoolExecutor} with the
+     * given initial parameters.
+     *
+     * @param corePoolSize the number of threads to keep in the pool, even
+     *        if they are idle
      * @param threadFactory the factory to use when the executor
-     * creates a new thread. 
-     * @throws NullPointerException if threadFactory is null
+     *        creates a new thread
+     * @throws IllegalArgumentException if {@code corePoolSize < 0}
+     * @throws NullPointerException if {@code threadFactory} is null
      */
     public ScheduledThreadPoolExecutor(int corePoolSize,
                              ThreadFactory threadFactory) {
@@ -252,12 +349,13 @@
     /**
      * Creates a new ScheduledThreadPoolExecutor with the given
      * initial parameters.
-     * 
-     * @param corePoolSize the number of threads to keep in the pool,
-     * even if they are idle.
+     *
+     * @param corePoolSize the number of threads to keep in the pool, even
+     *        if they are idle
      * @param handler the handler to use when execution is blocked
-     * because the thread bounds and queue capacities are reached.
-     * @throws NullPointerException if handler is null
+     *        because the thread bounds and queue capacities are reached
+     * @throws IllegalArgumentException if {@code corePoolSize < 0}
+     * @throws NullPointerException if {@code handler} is null
      */
     public ScheduledThreadPoolExecutor(int corePoolSize,
                               RejectedExecutionHandler handler) {
@@ -268,14 +366,16 @@
     /**
      * Creates a new ScheduledThreadPoolExecutor with the given
      * initial parameters.
-     * 
-     * @param corePoolSize the number of threads to keep in the pool,
-     * even if they are idle.
+     *
+     * @param corePoolSize the number of threads to keep in the pool, even
+     *        if they are idle
      * @param threadFactory the factory to use when the executor
-     * creates a new thread. 
+     *        creates a new thread
      * @param handler the handler to use when execution is blocked
-     * because the thread bounds and queue capacities are reached.
-     * @throws NullPointerException if threadFactory or handler is null
+     *        because the thread bounds and queue capacities are reached
+     * @throws IllegalArgumentException if {@code corePoolSize < 0}
+     * @throws NullPointerException if {@code threadFactory} or
+     *         {@code handler} is null
      */
     public ScheduledThreadPoolExecutor(int corePoolSize,
                               ThreadFactory threadFactory,
@@ -284,128 +384,178 @@
               new DelayedWorkQueue(), threadFactory, handler);
     }
 
-    public ScheduledFuture<?> schedule(Runnable command, 
-                                       long delay, 
+    /**
+     * Returns the trigger time of a delayed action
+     */
+    private static long nextTriggerTime(long delay, TimeUnit unit) {
+        long triggerTime;
+        long now = now();
+        if (delay <= 0) 
+            return now;            // avoid negative trigger times
+        else if ((triggerTime = now + unit.toNanos(delay)) < 0)
+            return Long.MAX_VALUE; // avoid numerical overflow
+        else
+            return triggerTime;
+    }
+
+    /**
+     * @throws RejectedExecutionException {@inheritDoc}
+     * @throws NullPointerException       {@inheritDoc}
+     */
+    public ScheduledFuture<?> schedule(Runnable command,
+                                       long delay,
                                        TimeUnit unit) {
         if (command == null || unit == null)
             throw new NullPointerException();
-        long triggerTime = now() + unit.toNanos(delay);
-        ScheduledFutureTask<?> t = 
-            new ScheduledFutureTask<Boolean>(command, null, triggerTime);
+        long triggerTime = nextTriggerTime(delay, unit);
+        ScheduledFutureTask<?> t
+                = new ScheduledFutureTask<Void>(command, null, triggerTime);
         delayedExecute(t);
         return t;
     }
-
-    public <V> ScheduledFuture<V> schedule(Callable<V> callable, 
-                                           long delay, 
+    
+    /**
+     * @throws RejectedExecutionException {@inheritDoc}
+     * @throws NullPointerException       {@inheritDoc}
+     */
+    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
+                                           long delay,
                                            TimeUnit unit) {
         if (callable == null || unit == null)
             throw new NullPointerException();
-        if (delay < 0) delay = 0;
-        long triggerTime = now() + unit.toNanos(delay);
-        ScheduledFutureTask<V> t = 
-            new ScheduledFutureTask<V>(callable, triggerTime);
+        long triggerTime = nextTriggerTime(delay, unit);
+        ScheduledFutureTask<V> t
+                = new ScheduledFutureTask<V>(callable, triggerTime);
         delayedExecute(t);
         return t;
     }
 
-    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, 
-                                                  long initialDelay,  
-                                                  long period, 
+    /**
+     * @throws RejectedExecutionException {@inheritDoc}
+     * @throws NullPointerException       {@inheritDoc}
+     * @throws IllegalArgumentException   {@inheritDoc}
+     */
+    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
+                                                  long initialDelay,
+                                                  long period,
                                                   TimeUnit unit) {
         if (command == null || unit == null)
             throw new NullPointerException();
         if (period <= 0)
             throw new IllegalArgumentException();
         if (initialDelay < 0) initialDelay = 0;
-        long triggerTime = now() + unit.toNanos(initialDelay);
-        ScheduledFutureTask<?> t = 
-            new ScheduledFutureTask<Object>(command, 
-                                            null,
-                                            triggerTime,
-                                            unit.toNanos(period));
-        delayedExecute(t);
-        return t;
+        long triggerTime = nextTriggerTime(initialDelay, unit);
+        ScheduledFutureTask<Void> sft =
+            new ScheduledFutureTask<Void>(command,
+                                          null,
+                                          triggerTime,
+                                          unit.toNanos(period));
+        sft.outerTask = sft;
+        delayedExecute(sft);
+        return sft;
     }
-    
-    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, 
-                                                     long initialDelay,  
-                                                     long delay, 
+
+    /**
+     * @throws RejectedExecutionException {@inheritDoc}
+     * @throws NullPointerException       {@inheritDoc}
+     * @throws IllegalArgumentException   {@inheritDoc}
+     */
+    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
+                                                     long initialDelay,
+                                                     long delay,
                                                      TimeUnit unit) {
         if (command == null || unit == null)
             throw new NullPointerException();
         if (delay <= 0)
             throw new IllegalArgumentException();
-        if (initialDelay < 0) initialDelay = 0;
-        long triggerTime = now() + unit.toNanos(initialDelay);
-        ScheduledFutureTask<?> t = 
-            new ScheduledFutureTask<Boolean>(command, 
-                                             null,
-                                             triggerTime,
-                                             unit.toNanos(-delay));
-        delayedExecute(t);
-        return t;
+        long triggerTime = nextTriggerTime(initialDelay, unit);
+        ScheduledFutureTask<Void> sft =
+            new ScheduledFutureTask<Void>(command,
+                                          null,
+                                          triggerTime,
+                                          unit.toNanos(-delay));
+        sft.outerTask = sft;
+        delayedExecute(sft);
+        return sft;
     }
-    
 
     /**
-     * Execute command with zero required delay. This has effect
-     * equivalent to <tt>schedule(command, 0, anyUnit)</tt>.  Note
-     * that inspections of the queue and of the list returned by
-     * <tt>shutdownNow</tt> will access the zero-delayed
-     * {@link ScheduledFuture}, not the <tt>command</tt> itself.
+     * Executes {@code command} with zero required delay.
+     * This has effect equivalent to
+     * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
+     * Note that inspections of the queue and of the list returned by
+     * {@code shutdownNow} will access the zero-delayed
+     * {@link ScheduledFuture}, not the {@code command} itself.
+     *
+     * <p>A consequence of the use of {@code ScheduledFuture} objects is
+     * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
+     * called with a null second {@code Throwable} argument, even if the
+     * {@code command} terminated abruptly.  Instead, the {@code Throwable}
+     * thrown by such a task can be obtained via {@link Future#get}.
      *
-     * @param command the task to execute
      * @throws RejectedExecutionException at discretion of
-     * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
-     * for execution because the executor has been shut down.
-     * @throws NullPointerException if command is null
+     *         {@code RejectedExecutionHandler}, if the task
+     *         cannot be accepted for execution because the
+     *         executor has been shut down
+     * @throws NullPointerException {@inheritDoc}
      */
     public void execute(Runnable command) {
-        if (command == null)
-            throw new NullPointerException();
         schedule(command, 0, TimeUnit.NANOSECONDS);
     }
 
     // Override AbstractExecutorService methods
 
+    /**
+     * @throws RejectedExecutionException {@inheritDoc}
+     * @throws NullPointerException       {@inheritDoc}
+     */
     public Future<?> submit(Runnable task) {
         return schedule(task, 0, TimeUnit.NANOSECONDS);
     }
 
+    /**
+     * @throws RejectedExecutionException {@inheritDoc}
+     * @throws NullPointerException       {@inheritDoc}
+     */
     public <T> Future<T> submit(Runnable task, T result) {
-        return schedule(Executors.callable(task, result), 
+        return schedule(Executors.callable(task, result),
                         0, TimeUnit.NANOSECONDS);
     }
 
+    /**
+     * @throws RejectedExecutionException {@inheritDoc}
+     * @throws NullPointerException       {@inheritDoc}
+     */
     public <T> Future<T> submit(Callable<T> task) {
         return schedule(task, 0, TimeUnit.NANOSECONDS);
     }
 
     /**
-     * Set policy on whether to continue executing existing periodic
-     * tasks even when this executor has been <tt>shutdown</tt>. In
-     * this case, these tasks will only terminate upon
-     * <tt>shutdownNow</tt>, or after setting the policy to
-     * <tt>false</tt> when already shutdown. This value is by default
-     * false.
-     * @param value if true, continue after shutdown, else don't.
-     * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
+     * Sets the policy on whether to continue executing existing
+     * periodic tasks even when this executor has been {@code shutdown}.
+     * In this case, these tasks will only terminate upon
+     * {@code shutdownNow} or after setting the policy to
+     * {@code false} when already shutdown.
+     * This value is by default {@code false}.
+     *
+     * @param value if {@code true}, continue after shutdown, else don't.
+     * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
      */
     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
         continueExistingPeriodicTasksAfterShutdown = value;
         if (!value && isShutdown())
-            cancelUnwantedTasks();
+            onShutdown();
     }
 
     /**
-     * Get the policy on whether to continue executing existing
-     * periodic tasks even when this executor has been
-     * <tt>shutdown</tt>. In this case, these tasks will only
-     * terminate upon <tt>shutdownNow</tt> or after setting the policy
-     * to <tt>false</tt> when already shutdown. This value is by
-     * default false.
-     * @return true if will continue after shutdown.
+     * Gets the policy on whether to continue executing existing
+     * periodic tasks even when this executor has been {@code shutdown}.
+     * In this case, these tasks will only terminate upon
+     * {@code shutdownNow} or after setting the policy to
+     * {@code false} when already shutdown.
+     * This value is by default {@code false}.
+     *
+     * @return {@code true} if will continue after shutdown
      * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
      */
     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
@@ -413,66 +563,79 @@
     }
 
     /**
-     * Set policy on whether to execute existing delayed
-     * tasks even when this executor has been <tt>shutdown</tt>. In
-     * this case, these tasks will only terminate upon
-     * <tt>shutdownNow</tt>, or after setting the policy to
-     * <tt>false</tt> when already shutdown. This value is by default
-     * true.
-     * @param value if true, execute after shutdown, else don't.
+     * Sets the policy on whether to execute existing delayed
+     * tasks even when this executor has been {@code shutdown}.
+     * In this case, these tasks will only terminate upon
+     * {@code shutdownNow}, or after setting the policy to
+     * {@code false} when already shutdown.
+     * This value is by default {@code true}.
+     *
+     * @param value if {@code true}, execute after shutdown, else don't.
      * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
      */
     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
         executeExistingDelayedTasksAfterShutdown = value;
         if (!value && isShutdown())
-            cancelUnwantedTasks();
+            onShutdown();
     }
 
     /**
-     * Get policy on whether to execute existing delayed
-     * tasks even when this executor has been <tt>shutdown</tt>. In
-     * this case, these tasks will only terminate upon
-     * <tt>shutdownNow</tt>, or after setting the policy to
-     * <tt>false</tt> when already shutdown. This value is by default
-     * true.
-     * @return true if will execute after shutdown.
+     * Gets the policy on whether to execute existing delayed
+     * tasks even when this executor has been {@code shutdown}.
+     * In this case, these tasks will only terminate upon
+     * {@code shutdownNow}, or after setting the policy to
+     * {@code false} when already shutdown.
+     * This value is by default {@code true}.
+     *
+     * @return {@code true} if will execute after shutdown
      * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
      */
     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
         return executeExistingDelayedTasksAfterShutdown;
     }
 
-
     /**
      * Initiates an orderly shutdown in which previously submitted
-     * tasks are executed, but no new tasks will be accepted. If the
-     * <tt>ExecuteExistingDelayedTasksAfterShutdownPolicy</tt> has
-     * been set <tt>false</tt>, existing delayed tasks whose delays
-     * have not yet elapsed are cancelled. And unless the
-     * <tt>ContinueExistingPeriodicTasksAfterShutdownPolicy</tt> has
-     * been set <tt>true</tt>, future executions of existing periodic
-     * tasks will be cancelled.
+     * tasks are executed, but no new tasks will be accepted.
+     * Invocation has no additional effect if already shut down.
+     *
+     * <p>This method does not wait for previously submitted tasks to
+     * complete execution.  Use {@link #awaitTermination awaitTermination}
+     * to do that.
+     *
+     * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
+     * has been set {@code false}, existing delayed tasks whose delays
+     * have not yet elapsed are cancelled.  And unless the {@code
+     * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
+     * {@code true}, future executions of existing periodic tasks will
+     * be cancelled.
+     *
+     * @throws SecurityException {@inheritDoc}
      */
     public void shutdown() {
-        cancelUnwantedTasks();
         super.shutdown();
     }
 
     /**
      * Attempts to stop all actively executing tasks, halts the
-     * processing of waiting tasks, and returns a list of the tasks that were
-     * awaiting execution. 
-     *  
+     * processing of waiting tasks, and returns a list of the tasks
+     * that were awaiting execution.
+     *
+     * <p>This method does not wait for actively executing tasks to
+     * terminate.  Use {@link #awaitTermination awaitTermination} to
+     * do that.
+     *
      * <p>There are no guarantees beyond best-effort attempts to stop
      * processing actively executing tasks.  This implementation
-     * cancels tasks via {@link Thread#interrupt}, so if any tasks mask or
-     * fail to respond to interrupts, they may never terminate.
+     * cancels tasks via {@link Thread#interrupt}, so any task that
+     * fails to respond to interrupts may never terminate.
      *
-     * @return list of tasks that never commenced execution.  Each
-     * element of this list is a {@link ScheduledFuture},
-     * including those tasks submitted using <tt>execute</tt>, which
-     * are for scheduling purposes used as the basis of a zero-delay
-     * <tt>ScheduledFuture</tt>.
+     * @return list of tasks that never commenced execution.
+     *         Each element of this list is a {@link ScheduledFuture},
+     *         including those tasks submitted using {@code execute},
+     *         which are for scheduling purposes used as the basis of a
+     *         zero-delay {@code ScheduledFuture}.
+     * @throws SecurityException {@inheritDoc}
      */
     public List<Runnable> shutdownNow() {
         return super.shutdownNow();
@@ -481,9 +644,9 @@
     /**
      * Returns the task queue used by this executor.  Each element of
      * this queue is a {@link ScheduledFuture}, including those
-     * tasks submitted using <tt>execute</tt> which are for scheduling
+     * tasks submitted using {@code execute} which are for scheduling
      * purposes used as the basis of a zero-delay
-     * <tt>ScheduledFuture</tt>. Iteration over this queue is
+     * {@code ScheduledFuture}.  Iteration over this queue is
      * <em>not</em> guaranteed to traverse tasks in the order in
      * which they will execute.
      *
@@ -494,52 +657,478 @@
     }
 
     /**
-     * An annoying wrapper class to convince generics compiler to
-     * use a DelayQueue<ScheduledFutureTask> as a BlockingQueue<Runnable>
-     */ 
-    private static class DelayedWorkQueue 
-        extends AbstractCollection<Runnable> 
+     * Specialized delay queue. To mesh with TPE declarations, this
+     * class must be declared as a BlockingQueue<Runnable> even though
+     * it can only hold RunnableScheduledFutures.
+     */
+    static class DelayedWorkQueue extends AbstractQueue<Runnable>
         implements BlockingQueue<Runnable> {
-        
-        private final DelayQueue<ScheduledFutureTask> dq = new DelayQueue<ScheduledFutureTask>();
-        public Runnable poll() { return dq.poll(); }
-        public Runnable peek() { return dq.peek(); }
-        public Runnable take() throws InterruptedException { return dq.take(); }
-        public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
-            return dq.poll(timeout, unit);
-        }
-
-        public boolean add(Runnable x) { return dq.add((ScheduledFutureTask)x); }
-        public boolean offer(Runnable x) { return dq.offer((ScheduledFutureTask)x); }
-        public void put(Runnable x)  {
-            dq.put((ScheduledFutureTask)x); 
-        }
-        public boolean offer(Runnable x, long timeout, TimeUnit unit) {
-            return dq.offer((ScheduledFutureTask)x, timeout, unit);
-        }
-
-        public Runnable remove() { return dq.remove(); }
-        public Runnable element() { return dq.element(); }
-        public void clear() { dq.clear(); }
-        public int drainTo(Collection<? super Runnable> c) { return dq.drainTo(c); }
-        public int drainTo(Collection<? super Runnable> c, int maxElements) { 
-            return dq.drainTo(c, maxElements); 
-        }
-
-        public int remainingCapacity() { return dq.remainingCapacity(); }
-        public boolean remove(Object x) { return dq.remove(x); }
-        public boolean contains(Object x) { return dq.contains(x); }
-        public int size() { return dq.size(); }
-        public boolean isEmpty() { return dq.isEmpty(); }
-        public Object[] toArray() { return dq.toArray(); }
-        public <T> T[] toArray(T[] array) { return dq.toArray(array); }
-        public Iterator<Runnable> iterator() { 
-            return new Iterator<Runnable>() {
-                private Iterator<ScheduledFutureTask> it = dq.iterator();
-                public boolean hasNext() { return it.hasNext(); }
-                public Runnable next() { return it.next(); }
-                public void remove() {  it.remove(); }
-            };
+
+        /*
+         * A DelayedWorkQueue is based on a heap-based data structure
+         * like those in DelayQueue and PriorityQueue, except that
+         * every ScheduledFutureTask also records its index into the
+         * heap array. This eliminates the need to find a task upon
+         * cancellation, greatly speeding up removal (down from O(n)
+         * to O(log n)), and reducing garbage retention that would
+         * otherwise occur by waiting for the element to rise to top
+         * before clearing. But because the queue may also hold
+         * RunnableScheduledFutures that are not ScheduledFutureTasks,
+         * we are not guaranteed to have such indices available, in
+         * which case we fall back to linear search. (We expect that
+         * most tasks will not be decorated, and that the faster cases
+         * will be much more common.)
+         *
+         * All heap operations must record index changes -- mainly
+         * within siftUp and siftDown. Upon removal, a task's
+         * heapIndex is set to -1. Note that ScheduledFutureTasks can
+         * appear at most once in the queue (this need not be true for
+         * other kinds of tasks or work queues), so are uniquely
+         * identified by heapIndex.
+         */
+
+        private static final int INITIAL_CAPACITY = 16;
+        private ScheduledFutureTask[] queue =
+            new ScheduledFutureTask[INITIAL_CAPACITY];
+        private final ReentrantLock lock = new ReentrantLock();
+        private int size = 0;
+
+        /**
+         * Thread designated to wait for the task at the head of the
+         * queue.  This variant of the Leader-Follower pattern
+         * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
+         * minimize unnecessary timed waiting.  When a thread becomes
+         * the leader, it waits only for the next delay to elapse, but
+         * other threads await indefinitely.  The leader thread must
+         * signal some other thread before returning from take() or
+         * poll(...), unless some other thread becomes leader in the
+         * interim.  Whenever the head of the queue is replaced with a
+         * task with an earlier expiration time, the leader field is
+         * invalidated by being reset to null, and some waiting
+         * thread, but not necessarily the current leader, is
+         * signalled.  So waiting threads must be prepared to acquire
+         * and lose leadership while waiting.
+         */
+        private Thread leader = null;
+
+        /**
+         * Condition signalled when a newer task becomes available at the
+         * head of the queue or a new thread may need to become leader.
+         */
+        private final Condition available = lock.newCondition();
+
+        /**
+         * Set f's heapIndex if it is a ScheduledFutureTask.
+         */
+        private void setIndex(ScheduledFutureTask f, int idx) {
+            if (f instanceof ScheduledFutureTask)
+                ((ScheduledFutureTask)f).heapIndex = idx;
+        }
+
+        /**
+         * Sift element added at bottom up to its heap-ordered spot.
+         * Call only when holding lock.
+         */
+        private void siftUp(int k, ScheduledFutureTask key) {
+            while (k > 0) {
+                int parent = (k - 1) >>> 1;
+                ScheduledFutureTask e = queue[parent];
+                if (key.compareTo(e) >= 0)
+                    break;
+                queue[k] = e;
+                setIndex(e, k);
+                k = parent;
+            }
+            queue[k] = key;
+            setIndex(key, k);
+        }
+
+        /**
+         * Sift element added at top down to its heap-ordered spot.
+         * Call only when holding lock.
+         */
+        private void siftDown(int k, ScheduledFutureTask key) {
+            int half = size >>> 1;
+            while (k < half) {
+                int child = (k << 1) + 1;
+                ScheduledFutureTask c = queue[child];
+                int right = child + 1;
+                if (right < size && c.compareTo(queue[right]) > 0)
+                    c = queue[child = right];
+                if (key.compareTo(c) <= 0)
+                    break;
+                queue[k] = c;
+                setIndex(c, k);
+                k = child;
+            }
+            queue[k] = key;
+            setIndex(key, k);
+        }
+
+        /**
+         * Resize the heap array.  Call only when holding lock.
+         */
+        private void grow() {
+            int oldCapacity = queue.length;
+            int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
+            if (newCapacity < 0) // overflow
+                newCapacity = Integer.MAX_VALUE;
+            queue = Java6Arrays.copyOf(queue, newCapacity);
+        }
+
+        /**
+         * Find index of given object, or -1 if absent
+         */
+        private int indexOf(Object x) {
+            if (x != null) {
+                if (x instanceof ScheduledFutureTask) {
+                    int i = ((ScheduledFutureTask) x).heapIndex;
+                    // Sanity check; x could conceivably be a
+                    // ScheduledFutureTask from some other pool.
+                    if (i >= 0 && i < size && queue[i] == x)
+                        return i;
+                } else {
+                    for (int i = 0; i < size; i++)
+                        if (x.equals(queue[i]))
+                            return i;
+                }
+            }
+            return -1;
+        }
+
+        public boolean contains(Object x) {
+            final ReentrantLock lock = this.lock;
+            lock.lock();
+            try {
+                return indexOf(x) != -1;
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        public boolean remove(Object x) {
+            final ReentrantLock lock = this.lock;
+            lock.lock();
+            try {
+                int i = indexOf(x);
+                if (i < 0)
+                    return false;
+
+                setIndex(queue[i], -1);
+                int s = --size;
+                ScheduledFutureTask replacement = queue[s];
+                queue[s] = null;
+                if (s != i) {
+                    siftDown(i, replacement);
+                    if (queue[i] == replacement)
+                        siftUp(i, replacement);
+                }
+                return true;
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        public int size() {
+            final ReentrantLock lock = this.lock;
+            lock.lock();
+            try {
+                return size;
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        public boolean isEmpty() {
+            return size() == 0;
+        }
+
+        public int remainingCapacity() {
+            return Integer.MAX_VALUE;
+        }
+
+        public ScheduledFutureTask peek() {
+            final ReentrantLock lock = this.lock;
+            lock.lock();
+            try {
+                return queue[0];
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        public boolean offer(Runnable x) {
+            if (x == null)
+                throw new NullPointerException();
+            ScheduledFutureTask e = (ScheduledFutureTask)x;
+            final ReentrantLock lock = this.lock;
+            lock.lock();
+            try {
+                int i = size;
+                if (i >= queue.length)
+                    grow();
+                size = i + 1;
+                if (i == 0) {
+                    queue[0] = e;
+                    setIndex(e, 0);
+                } else {
+                    siftUp(i, e);
+                }
+                if (queue[0] == e) {
+                    leader = null;
+                    available.signal();
+                }
+            } finally {
+                lock.unlock();
+            }
+            return true;
+        }
+
+        public void put(Runnable e) {
+            offer(e);
+        }
+
+        public boolean add(Runnable e) {
+            return offer(e);
+        }
+
+        public boolean offer(Runnable e, long timeout, TimeUnit unit) {
+            return offer(e);
+        }
+
+        /**
+         * Performs common bookkeeping for poll and take: Replaces
+         * first element with last and sifts it down.  Call only when
+         * holding lock.
+         * @param f the task to remove and return
+         */
+        private ScheduledFutureTask finishPoll(ScheduledFutureTask f) {
+            int s = --size;
+            ScheduledFutureTask x = queue[s];
+            queue[s] = null;
+            if (s != 0)
+                siftDown(0, x);
+            setIndex(f, -1);
+            return f;
+        }
+
+        public ScheduledFutureTask poll() {
+            final ReentrantLock lock = this.lock;
+            lock.lock();
+            try {
+                ScheduledFutureTask first = queue[0];
+                if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
+                    return null;
+                else
+                    return finishPoll(first);
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        public ScheduledFutureTask take() throws InterruptedException {
+            final ReentrantLock lock = this.lock;
+            lock.lockInterruptibly();
+            try {
+                for (;;) {
+                    ScheduledFutureTask first = queue[0];
+                    if (first == null)
+                        available.await();
+                    else {
+                        long delay = first.getDelay(TimeUnit.NANOSECONDS);
+                        if (delay <= 0)
+                            return finishPoll(first);
+                        else if (leader != null)
+                            available.await();
+                        else {
+                            Thread thisThread = Thread.currentThread();
+                            leader = thisThread;
+                            try {
+                                available.awaitNanos(delay);
+                            } finally {
+                                if (leader == thisThread)
+                                    leader = null;
+                            }
+                        }
+                    }
+                }
+            } finally {
+                if (leader == null && queue[0] != null)
+                    available.signal();
+                lock.unlock();
+            }
+        }
+
+        public ScheduledFutureTask poll(long timeout, TimeUnit unit)
+            throws InterruptedException {
+            long nanos = unit.toNanos(timeout);
+            final ReentrantLock lock = this.lock;
+            lock.lockInterruptibly();
+            try {
+                for (;;) {
+                    ScheduledFutureTask first = queue[0];
+                    if (first == null) {
+                        if (nanos <= 0)
+                            return null;
+                        else
+                            nanos = available.awaitNanos(nanos);
+                    } else {
+                        long delay = first.getDelay(TimeUnit.NANOSECONDS);
+                        if (delay <= 0)
+                            return finishPoll(first);
+                        if (nanos <= 0)
+                            return null;
+                        if (nanos < delay || leader != null)
+                            nanos = available.awaitNanos(nanos);
+                        else {
+                            Thread thisThread = Thread.currentThread();
+                            leader = thisThread;
+                            try {
+                                long timeLeft = available.awaitNanos(delay);
+                                nanos -= delay - timeLeft;
+                            } finally {
+                                if (leader == thisThread)
+                                    leader = null;
+                            }
+                        }
+                    }
+                }
+            } finally {
+                if (leader == null && queue[0] != null)
+                    available.signal();
+                lock.unlock();
+            }
+        }
+
+        public void clear() {
+            final ReentrantLock lock = this.lock;
+            lock.lock();
+            try {
+                for (int i = 0; i < size; i++) {
+                    ScheduledFutureTask t = queue[i];
+                    if (t != null) {
+                        queue[i] = null;
+                        setIndex(t, -1);
+                    }
+                }
+                size = 0;
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        /**
+         * Return and remove first element only if it is expired.
+         * Used only by drainTo.  Call only when holding lock.
+         */
+        private ScheduledFutureTask pollExpired() {
+            ScheduledFutureTask first = queue[0];
+            if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
+                return null;
+            return finishPoll(first);
+        }
+
+        public int drainTo(Collection<? super Runnable> c) {
+            if (c == null)
+                throw new NullPointerException();
+            if (c == this)
+                throw new IllegalArgumentException();
+            final ReentrantLock lock = this.lock;
+            lock.lock();
+            try {
+                ScheduledFutureTask first;
+                int n = 0;
+                while ((first = pollExpired()) != null) {
+                    c.add(first);
+                    ++n;
+                }
+                return n;
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        public int drainTo(Collection<? super Runnable> c, int maxElements) {
+            if (c == null)
+                throw new NullPointerException();
+            if (c == this)
+                throw new IllegalArgumentException();
+            if (maxElements <= 0)
+                return 0;
+            final ReentrantLock lock = this.lock;
+            lock.lock();
+            try {
+                ScheduledFutureTask first;
+                int n = 0;
+                while (n < maxElements && (first = pollExpired()) != null) {
+                    c.add(first);
+                    ++n;
+                }
+                return n;
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        public Object[] toArray() {
+            final ReentrantLock lock = this.lock;
+            lock.lock();
+            try {
+                return Java6Arrays.copyOf(queue, size, Object[].class);
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        @SuppressWarnings("unchecked")
+        public <T> T[] toArray(T[] a) {
+            final ReentrantLock lock = this.lock;
+            lock.lock();
+            try {
+                if (a.length < size)
+                    return (T[]) Java6Arrays.copyOf(queue, size, a.getClass());
+                System.arraycopy(queue, 0, a, 0, size);
+                if (a.length > size)
+                    a[size] = null;
+                return a;
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        public Iterator<Runnable> iterator() {
+            return new Itr(Java6Arrays.copyOf(queue, size));
+        }
+
+        /**
+         * Snapshot iterator that works off copy of underlying q array.
+         */
+        private class Itr implements Iterator<Runnable> {
+            final ScheduledFutureTask[] array;
+            int cursor = 0;     // index of next element to return
+            int lastRet = -1;   // index of last element, or -1 if no such
+
+            Itr(ScheduledFutureTask[] array) {
+                this.array = array;
+            }
+
+            public boolean hasNext() {
+                return cursor < array.length;
+            }
+
+            public Runnable next() {
+                if (cursor >= array.length)
+                    throw new NoSuchElementException();
+                lastRet = cursor;
+                return array[cursor++];
+            }
+
+            public void remove() {
+                if (lastRet < 0)
+                    throw new IllegalStateException();
+                DelayedWorkQueue.this.remove(array[lastRet]);
+                lastRet = -1;
+            }
         }
     }
 }