You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@harmony.apache.org by hi...@apache.org on 2009/07/28 11:30:48 UTC
svn commit: r798469 [11/28] - in /harmony/enhanced/classlib/branches/java6:
./ depends/build/platform/ depends/files/ depends/jars/
depends/manifests/icu4j_4.0/ depends/manifests/icu4j_4.2.1/
depends/manifests/icu4j_4.2.1/META-INF/ make/ modules/access...
Modified: harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ScheduledExecutorService.java
URL: http://svn.apache.org/viewvc/harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ScheduledExecutorService.java?rev=798469&r1=798468&r2=798469&view=diff
==============================================================================
--- harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ScheduledExecutorService.java (original)
+++ harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ScheduledExecutorService.java Tue Jul 28 09:30:33 2009
@@ -10,13 +10,13 @@
/**
* An {@link ExecutorService} that can schedule commands to run after a given
- * delay, or to execute periodically.
+ * delay, or to execute periodically.
*
* <p> The <tt>schedule</tt> methods create tasks with various delays
* and return a task object that can be used to cancel or check
* execution. The <tt>scheduleAtFixedRate</tt> and
* <tt>scheduleWithFixedDelay</tt> methods create and execute tasks
- * that run periodically until cancelled.
+ * that run periodically until cancelled.
*
* <p> Commands submitted using the {@link Executor#execute} and
* {@link ExecutorService} <tt>submit</tt> methods are scheduled with
@@ -33,27 +33,27 @@
* TimeUnit.MILLISECONDS)</tt>. Beware however that expiration of a
* relative delay need not coincide with the current <tt>Date</tt> at
* which the task is enabled due to network time synchronization
- * protocols, clock drift, or other factors.
+ * protocols, clock drift, or other factors.
*
* The {@link Executors} class provides convenient factory methods for
* the ScheduledExecutorService implementations provided in this package.
*
* <h3>Usage Example</h3>
- *
+ *
* Here is a class with a method that sets up a ScheduledExecutorService
* to beep every ten seconds for an hour:
*
* <pre>
- * import static java.util.concurrent.TimeUnit;
+ * import static java.util.concurrent.TimeUnit.*;
* class BeeperControl {
- * private final ScheduledExecutorService scheduler =
+ * private final ScheduledExecutorService scheduler =
* Executors.newScheduledThreadPool(1);
*
* public void beepForAnHour() {
* final Runnable beeper = new Runnable() {
* public void run() { System.out.println("beep"); }
* };
- * final ScheduledFuture<?> beeperHandle =
+ * final ScheduledFuture<?> beeperHandle =
* scheduler.scheduleAtFixedRate(beeper, 10, 10, SECONDS);
* scheduler.schedule(new Runnable() {
* public void run() { beeperHandle.cancel(true); }
@@ -70,76 +70,90 @@
/**
* Creates and executes a one-shot action that becomes enabled
* after the given delay.
- * @param command the task to execute.
- * @param delay the time from now to delay execution.
- * @param unit the time unit of the delay parameter.
- * @return a Future representing pending completion of the task,
- * and whose <tt>get()</tt> method will return <tt>null</tt>
- * upon completion.
- * @throws RejectedExecutionException if task cannot be scheduled
- * for execution.
+ *
+ * @param command the task to execute
+ * @param delay the time from now to delay execution
+ * @param unit the time unit of the delay parameter
+ * @return a ScheduledFuture representing pending completion of
+ * the task and whose <tt>get()</tt> method will return
+ * <tt>null</tt> upon completion
+ * @throws RejectedExecutionException if the task cannot be
+ * scheduled for execution
* @throws NullPointerException if command is null
*/
- public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
+ public ScheduledFuture<?> schedule(Runnable command,
+ long delay, TimeUnit unit);
/**
* Creates and executes a ScheduledFuture that becomes enabled after the
* given delay.
- * @param callable the function to execute.
- * @param delay the time from now to delay execution.
- * @param unit the time unit of the delay parameter.
- * @return a ScheduledFuture that can be used to extract result or cancel.
- * @throws RejectedExecutionException if task cannot be scheduled
- * for execution.
+ *
+ * @param callable the function to execute
+ * @param delay the time from now to delay execution
+ * @param unit the time unit of the delay parameter
+ * @return a ScheduledFuture that can be used to extract result or cancel
+ * @throws RejectedExecutionException if the task cannot be
+ * scheduled for execution
* @throws NullPointerException if callable is null
*/
- public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable,
+ long delay, TimeUnit unit);
/**
* Creates and executes a periodic action that becomes enabled first
* after the given initial delay, and subsequently with the given
* period; that is executions will commence after
* <tt>initialDelay</tt> then <tt>initialDelay+period</tt>, then
- * <tt>initialDelay + 2 * period</tt>, and so on.
+ * <tt>initialDelay + 2 * period</tt>, and so on.
* If any execution of the task
* encounters an exception, subsequent executions are suppressed.
* Otherwise, the task will only terminate via cancellation or
- * termination of the executor.
- * @param command the task to execute.
- * @param initialDelay the time to delay first execution.
- * @param period the period between successive executions.
+ * termination of the executor. If any execution of this task
+ * takes longer than its period, then subsequent executions
+ * may start late, but will not concurrently execute.
+ *
+ * @param command the task to execute
+ * @param initialDelay the time to delay first execution
+ * @param period the period between successive executions
* @param unit the time unit of the initialDelay and period parameters
- * @return a Future representing pending completion of the task,
- * and whose <tt>get()</tt> method will throw an exception upon
- * cancellation.
- * @throws RejectedExecutionException if task cannot be scheduled
- * for execution.
+ * @return a ScheduledFuture representing pending completion of
+ * the task, and whose <tt>get()</tt> method will throw an
+ * exception upon cancellation
+ * @throws RejectedExecutionException if the task cannot be
+ * scheduled for execution
* @throws NullPointerException if command is null
- * @throws IllegalArgumentException if period less than or equal to zero.
+ * @throws IllegalArgumentException if period less than or equal to zero
*/
- public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
-
+ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
+ long initialDelay,
+ long period,
+ TimeUnit unit);
+
/**
* Creates and executes a periodic action that becomes enabled first
* after the given initial delay, and subsequently with the
* given delay between the termination of one execution and the
- * commencement of the next. If any execution of the task
+ * commencement of the next. If any execution of the task
* encounters an exception, subsequent executions are suppressed.
* Otherwise, the task will only terminate via cancellation or
* termination of the executor.
- * @param command the task to execute.
- * @param initialDelay the time to delay first execution.
+ *
+ * @param command the task to execute
+ * @param initialDelay the time to delay first execution
* @param delay the delay between the termination of one
- * execution and the commencement of the next.
+ * execution and the commencement of the next
* @param unit the time unit of the initialDelay and delay parameters
- * @return a Future representing pending completion of the task,
- * and whose <tt>get()</tt> method will throw an exception upon
- * cancellation.
- * @throws RejectedExecutionException if task cannot be scheduled
- * for execution.
+ * @return a ScheduledFuture representing pending completion of
+ * the task, and whose <tt>get()</tt> method will throw an
+ * exception upon cancellation
+ * @throws RejectedExecutionException if the task cannot be
+ * scheduled for execution
* @throws NullPointerException if command is null
- * @throws IllegalArgumentException if delay less than or equal to zero.
+ * @throws IllegalArgumentException if delay less than or equal to zero
*/
- public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
+ long initialDelay,
+ long delay,
+ TimeUnit unit);
}
Modified: harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java?rev=798469&r1=798468&r2=798469&view=diff
==============================================================================
--- harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java (original)
+++ harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java Tue Jul 28 09:30:33 2009
@@ -10,6 +10,7 @@
package java.util.concurrent;
import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
import java.util.*;
/**
@@ -20,25 +21,60 @@
* flexibility or capabilities of {@link ThreadPoolExecutor} (which
* this class extends) are required.
*
- * <p> Delayed tasks execute no sooner than they are enabled, but
+ * <p>Delayed tasks execute no sooner than they are enabled, but
* without any real-time guarantees about when, after they are
* enabled, they will commence. Tasks scheduled for exactly the same
* execution time are enabled in first-in-first-out (FIFO) order of
* submission.
*
+ * <p>Successive executions of a task scheduled via
+ * <code>scheduleAtFixedRate</code> or
+ * <code>scheduleWithFixedDelay</code> do not overlap. While different
+ * executions may be performed by different threads, the effects of
+ * prior executions <a
+ * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
+ * those of subsequent ones.
+ *
* <p>While this class inherits from {@link ThreadPoolExecutor}, a few
* of the inherited tuning methods are not useful for it. In
- * particular, because it acts as a fixed-sized pool using
- * <tt>corePoolSize</tt> threads and an unbounded queue, adjustments
- * to <tt>maximumPoolSize</tt> have no useful effect.
+ * particular, because it acts as a fixed-sized pool using {@code
+ * corePoolSize} threads and an unbounded queue, adjustments to {@code
+ * maximumPoolSize} have no useful effect.
*
* @since 1.5
* @author Doug Lea
*/
-public class ScheduledThreadPoolExecutor
- extends ThreadPoolExecutor
+public class ScheduledThreadPoolExecutor
+ extends ThreadPoolExecutor
implements ScheduledExecutorService {
+ /*
+ * This class specializes ThreadPoolExecutor implementation by
+ *
+ * 1. Using a custom task type, ScheduledFutureTask for
+ * tasks, even those that don't require scheduling (i.e.,
+ * those submitted using ExecutorService execute, not
+ * ScheduledExecutorService methods) which are treated as
+ * delayed tasks with a delay of zero.
+ *
+ * 2. Using a custom queue (DelayedWorkQueue), a variant of
+ * unbounded DelayQueue. The lack of capacity constraint and
+ * the fact that corePoolSize and maximumPoolSize are
+ * effectively identical simplifies some execution mechanics
+ * (see delayedExecute) compared to ThreadPoolExecutor.
+ *
+ * 3. Supporting optional run-after-shutdown parameters, which
+ * leads to overrides of shutdown methods to remove and cancel
+ * tasks that should NOT be run after shutdown, as well as
+ * different recheck logic when task (re)submission overlaps
+ * with a shutdown.
+ *
+ * 4. Task decoration methods to allow interception and
+ * instrumentation, which are needed because subclasses cannot
+ * otherwise override submit methods to get this effect. These
+ * don't have any impact on pool control logic though.
+ */
+
/**
* False if should cancel/suppress periodic tasks on shutdown.
*/
@@ -50,28 +86,40 @@
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
/**
+ * True if ScheduledFutureTask.cancel should remove from queue
+ */
+ private volatile boolean removeOnCancel = false;
+
+ /**
* Sequence number to break scheduling ties, and in turn to
* guarantee FIFO order among tied entries.
*/
private static final AtomicLong sequencer = new AtomicLong(0);
- /** Base of nanosecond timings, to avoid wrapping */
- private static final long NANO_ORIGIN = System.nanoTime();
/**
- * Returns nanosecond time offset by origin
+ * Value of System.nanoTime upon static initialization. This is
+ * used as an offset by now() to avoid wraparound of time values
+ * that would make them appear negative.
*/
- final long now() {
- return System.nanoTime() - NANO_ORIGIN;
+ static final long initialNanoTime = System.nanoTime();
+
+ /**
+ * Returns current nanosecond time.
+ */
+ static long now() {
+ return System.nanoTime() - initialNanoTime;
}
- private class ScheduledFutureTask<V>
+ private class ScheduledFutureTask<V>
extends FutureTask<V> implements ScheduledFuture<V> {
-
+
/** Sequence number to break ties FIFO */
private final long sequenceNumber;
+
/** The time the task is enabled to execute in nanoTime units */
private long time;
+
/**
* Period in nanoseconds for repeating tasks. A positive
* value indicates fixed-rate execution. A negative value
@@ -80,8 +128,16 @@
*/
private final long period;
+ /** The actual task to be re-enqueued by reExecutePeriodic */
+ ScheduledFutureTask<V> outerTask = this;
+
/**
- * Creates a one-shot action with given nanoTime-based trigger time
+ * Index into delay queue, to support faster cancellation.
+ */
+ int heapIndex;
+
+ /**
+ * Creates a one-shot action with given nanoTime-based trigger time.
*/
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
@@ -91,9 +147,9 @@
}
/**
- * Creates a periodic action with given nano time and period
+ * Creates a periodic action with given nano time and period.
*/
- ScheduledFutureTask(Runnable r, V result, long ns, long period) {
+ ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
@@ -101,7 +157,7 @@
}
/**
- * Creates a one-shot action with given nanoTime-based trigger
+ * Creates a one-shot action with given nanoTime-based trigger.
*/
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
@@ -111,122 +167,162 @@
}
public long getDelay(TimeUnit unit) {
- long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
- return d;
+ long d = time - now();
+ return d<=0? 0 : unit.convert(d, TimeUnit.NANOSECONDS);
}
public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
- ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
- long diff = time - x.time;
- if (diff < 0)
- return -1;
- else if (diff > 0)
- return 1;
- else if (sequenceNumber < x.sequenceNumber)
- return -1;
- else
- return 1;
+ if (other instanceof ScheduledFutureTask) {
+ ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
+ long diff = time - x.time;
+ if (diff < 0)
+ return -1;
+ else if (diff > 0)
+ return 1;
+ else if (sequenceNumber < x.sequenceNumber)
+ return -1;
+ else
+ return 1;
+ }
+ long d = (getDelay(TimeUnit.NANOSECONDS) -
+ other.getDelay(TimeUnit.NANOSECONDS));
+ return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
/**
* Returns true if this is a periodic (not a one-shot) action.
+ *
* @return true if periodic
*/
- boolean isPeriodic() {
+ public boolean isPeriodic() {
return period != 0;
}
/**
- * Run a periodic task
+ * Sets the next time to run for a periodic task.
*/
- private void runPeriodic() {
- boolean ok = ScheduledFutureTask.super.runAndReset();
- boolean down = isShutdown();
- // Reschedule if not cancelled and not shutdown or policy allows
- if (ok && (!down ||
- (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
- !isTerminating()))) {
- long p = period;
- if (p > 0)
- time += p;
- else
- time = now() - p;
- ScheduledThreadPoolExecutor.super.getQueue().add(this);
- }
- // This might have been the final executed delayed
- // task. Wake up threads to check.
- else if (down)
- interruptIdleWorkers();
+ private void setNextRunTime() {
+ long p = period;
+ if (p > 0)
+ time += p;
+ else
+ time = now() - p;
+ }
+
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ boolean cancelled = super.cancel(mayInterruptIfRunning);
+ if (cancelled && removeOnCancel && heapIndex >= 0)
+ remove(this);
+ return cancelled;
}
/**
* Overrides FutureTask version so as to reset/requeue if periodic.
- */
+ */
public void run() {
- if (isPeriodic())
- runPeriodic();
- else
+ boolean periodic = isPeriodic();
+ if (!canRunInCurrentRunState(periodic))
+ cancel(false);
+ else if (!periodic)
ScheduledFutureTask.super.run();
+ else if (ScheduledFutureTask.super.runAndReset()) {
+ setNextRunTime();
+ reExecutePeriodic(outerTask);
+ }
+ }
+ }
+
+ /**
+ * Returns true if can run a task given current run state
+ * and run-after-shutdown parameters.
+ *
+ * @param periodic true if this task periodic, false if delayed
+ */
+ boolean canRunInCurrentRunState(boolean periodic) {
+ return isRunningOrShutdown(periodic ?
+ continueExistingPeriodicTasksAfterShutdown :
+ executeExistingDelayedTasksAfterShutdown);
+ }
+
+ /**
+ * Main execution method for delayed or periodic tasks. If pool
+ * is shut down, rejects the task. Otherwise adds task to queue
+ * and starts a thread, if necessary, to run it. (We cannot
+ * prestart the thread to run the task because the task (probably)
+ * shouldn't be run yet,) If the pool is shut down while the task
+ * is being added, cancel and remove it if required by state and
+ * run-after-shutdown parameters.
+ *
+ * @param task the task
+ */
+ private void delayedExecute(ScheduledFutureTask<?> task) {
+ if (isShutdown())
+ reject(task);
+ else {
+ super.getQueue().add(task);
+ if (isShutdown() &&
+ !canRunInCurrentRunState(task.isPeriodic()) &&
+ remove(task))
+ task.cancel(false);
+ else
+ prestartCoreThread();
}
}
/**
- * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
+ * Requeues a periodic task unless current run state precludes it.
+ * Same idea as delayedExecute except drops task rather than rejecting.
+ *
+ * @param task the task
*/
- private void delayedExecute(Runnable command) {
- if (isShutdown()) {
- reject(command);
- return;
- }
- // Prestart a thread if necessary. We cannot prestart it
- // running the task because the task (probably) shouldn't be
- // run yet, so thread will just idle until delay elapses.
- if (getPoolSize() < getCorePoolSize())
- prestartCoreThread();
-
- super.getQueue().add(command);
+ void reExecutePeriodic(ScheduledFutureTask<?> task) {
+ if (canRunInCurrentRunState(true)) {
+ super.getQueue().add(task);
+ if (!canRunInCurrentRunState(true) && remove(task))
+ task.cancel(false);
+ else
+ prestartCoreThread();
+ }
}
/**
- * Cancel and clear the queue of all tasks that should not be run
- * due to shutdown policy.
- */
- private void cancelUnwantedTasks() {
- boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
- boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
- if (!keepDelayed && !keepPeriodic)
- super.getQueue().clear();
- else if (keepDelayed || keepPeriodic) {
- Object[] entries = super.getQueue().toArray();
- for (int i = 0; i < entries.length; ++i) {
- Object e = entries[i];
+ * Cancels and clears the queue of all tasks that should not be run
+ * due to shutdown policy. Invoked within super.shutdown.
+ */
+ @Override void onShutdown() {
+ BlockingQueue<Runnable> q = super.getQueue();
+ boolean keepDelayed =
+ getExecuteExistingDelayedTasksAfterShutdownPolicy();
+ boolean keepPeriodic =
+ getContinueExistingPeriodicTasksAfterShutdownPolicy();
+ if (!keepDelayed && !keepPeriodic)
+ q.clear();
+ else {
+ // Traverse snapshot to avoid iterator exceptions
+ for (Object e : q.toArray()) {
if (e instanceof ScheduledFutureTask) {
- ScheduledFutureTask<?> t = (ScheduledFutureTask<?>)e;
- if (t.isPeriodic()? !keepPeriodic : !keepDelayed)
- t.cancel(false);
+ ScheduledFutureTask<?> t =
+ (ScheduledFutureTask<?>)e;
+ if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
+ t.isCancelled()) { // also remove if already cancelled
+ if (q.remove(t))
+ t.cancel(false);
+ }
}
}
- entries = null;
- purge();
}
- }
-
- public boolean remove(Runnable task) {
- if (!(task instanceof ScheduledFutureTask))
- return false;
- return getQueue().remove(task);
+ tryTerminate();
}
/**
- * Creates a new ScheduledThreadPoolExecutor with the given core
- * pool size.
- *
- * @param corePoolSize the number of threads to keep in the pool,
- * even if they are idle.
- * @throws IllegalArgumentException if corePoolSize less than or
- * equal to zero
+ * Creates a new {@code ScheduledThreadPoolExecutor} with the
+ * given core pool size.
+ *
+ * @param corePoolSize the number of threads to keep in the pool, even
+ * if they are idle
+ * @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
@@ -234,14 +330,15 @@
}
/**
- * Creates a new ScheduledThreadPoolExecutor with the given
- * initial parameters.
- *
- * @param corePoolSize the number of threads to keep in the pool,
- * even if they are idle.
+ * Creates a new {@code ScheduledThreadPoolExecutor} with the
+ * given initial parameters.
+ *
+ * @param corePoolSize the number of threads to keep in the pool, even
+ * if they are idle
* @param threadFactory the factory to use when the executor
- * creates a new thread.
- * @throws NullPointerException if threadFactory is null
+ * creates a new thread
+ * @throws IllegalArgumentException if {@code corePoolSize < 0}
+ * @throws NullPointerException if {@code threadFactory} is null
*/
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
@@ -252,12 +349,13 @@
/**
* Creates a new ScheduledThreadPoolExecutor with the given
* initial parameters.
- *
- * @param corePoolSize the number of threads to keep in the pool,
- * even if they are idle.
+ *
+ * @param corePoolSize the number of threads to keep in the pool, even
+ * if they are idle
* @param handler the handler to use when execution is blocked
- * because the thread bounds and queue capacities are reached.
- * @throws NullPointerException if handler is null
+ * because the thread bounds and queue capacities are reached
+ * @throws IllegalArgumentException if {@code corePoolSize < 0}
+ * @throws NullPointerException if {@code handler} is null
*/
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
@@ -268,14 +366,16 @@
/**
* Creates a new ScheduledThreadPoolExecutor with the given
* initial parameters.
- *
- * @param corePoolSize the number of threads to keep in the pool,
- * even if they are idle.
+ *
+ * @param corePoolSize the number of threads to keep in the pool, even
+ * if they are idle
* @param threadFactory the factory to use when the executor
- * creates a new thread.
+ * creates a new thread
* @param handler the handler to use when execution is blocked
- * because the thread bounds and queue capacities are reached.
- * @throws NullPointerException if threadFactory or handler is null
+ * because the thread bounds and queue capacities are reached
+ * @throws IllegalArgumentException if {@code corePoolSize < 0}
+ * @throws NullPointerException if {@code threadFactory} or
+ * {@code handler} is null
*/
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
@@ -284,128 +384,178 @@
new DelayedWorkQueue(), threadFactory, handler);
}
- public ScheduledFuture<?> schedule(Runnable command,
- long delay,
+ /**
+ * Returns the trigger time of a delayed action
+ */
+ private static long nextTriggerTime(long delay, TimeUnit unit) {
+ long triggerTime;
+ long now = now();
+ if (delay <= 0)
+ return now; // avoid negative trigger times
+ else if ((triggerTime = now + unit.toNanos(delay)) < 0)
+ return Long.MAX_VALUE; // avoid numerical overflow
+ else
+ return triggerTime;
+ }
+
+ /**
+ * @throws RejectedExecutionException {@inheritDoc}
+ * @throws NullPointerException {@inheritDoc}
+ */
+ public ScheduledFuture<?> schedule(Runnable command,
+ long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
- long triggerTime = now() + unit.toNanos(delay);
- ScheduledFutureTask<?> t =
- new ScheduledFutureTask<Boolean>(command, null, triggerTime);
+ long triggerTime = nextTriggerTime(delay, unit);
+ ScheduledFutureTask<?> t
+ = new ScheduledFutureTask<Void>(command, null, triggerTime);
delayedExecute(t);
return t;
}
-
- public <V> ScheduledFuture<V> schedule(Callable<V> callable,
- long delay,
+
+ /**
+ * @throws RejectedExecutionException {@inheritDoc}
+ * @throws NullPointerException {@inheritDoc}
+ */
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable,
+ long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
- if (delay < 0) delay = 0;
- long triggerTime = now() + unit.toNanos(delay);
- ScheduledFutureTask<V> t =
- new ScheduledFutureTask<V>(callable, triggerTime);
+ long triggerTime = nextTriggerTime(delay, unit);
+ ScheduledFutureTask<V> t
+ = new ScheduledFutureTask<V>(callable, triggerTime);
delayedExecute(t);
return t;
}
- public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
- long initialDelay,
- long period,
+ /**
+ * @throws RejectedExecutionException {@inheritDoc}
+ * @throws NullPointerException {@inheritDoc}
+ * @throws IllegalArgumentException {@inheritDoc}
+ */
+ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
+ long initialDelay,
+ long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
if (initialDelay < 0) initialDelay = 0;
- long triggerTime = now() + unit.toNanos(initialDelay);
- ScheduledFutureTask<?> t =
- new ScheduledFutureTask<Object>(command,
- null,
- triggerTime,
- unit.toNanos(period));
- delayedExecute(t);
- return t;
+ long triggerTime = nextTriggerTime(initialDelay, unit);
+ ScheduledFutureTask<Void> sft =
+ new ScheduledFutureTask<Void>(command,
+ null,
+ triggerTime,
+ unit.toNanos(period));
+ sft.outerTask = sft;
+ delayedExecute(sft);
+ return sft;
}
-
- public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
- long initialDelay,
- long delay,
+
+ /**
+ * @throws RejectedExecutionException {@inheritDoc}
+ * @throws NullPointerException {@inheritDoc}
+ * @throws IllegalArgumentException {@inheritDoc}
+ */
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
+ long initialDelay,
+ long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
- if (initialDelay < 0) initialDelay = 0;
- long triggerTime = now() + unit.toNanos(initialDelay);
- ScheduledFutureTask<?> t =
- new ScheduledFutureTask<Boolean>(command,
- null,
- triggerTime,
- unit.toNanos(-delay));
- delayedExecute(t);
- return t;
+ long triggerTime = nextTriggerTime(initialDelay, unit);
+ ScheduledFutureTask<Void> sft =
+ new ScheduledFutureTask<Void>(command,
+ null,
+ triggerTime,
+ unit.toNanos(-delay));
+ sft.outerTask = sft;
+ delayedExecute(sft);
+ return sft;
}
-
/**
- * Execute command with zero required delay. This has effect
- * equivalent to <tt>schedule(command, 0, anyUnit)</tt>. Note
- * that inspections of the queue and of the list returned by
- * <tt>shutdownNow</tt> will access the zero-delayed
- * {@link ScheduledFuture}, not the <tt>command</tt> itself.
+ * Executes {@code command} with zero required delay.
+ * This has effect equivalent to
+ * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
+ * Note that inspections of the queue and of the list returned by
+ * {@code shutdownNow} will access the zero-delayed
+ * {@link ScheduledFuture}, not the {@code command} itself.
+ *
+ * <p>A consequence of the use of {@code ScheduledFuture} objects is
+ * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
+ * called with a null second {@code Throwable} argument, even if the
+ * {@code command} terminated abruptly. Instead, the {@code Throwable}
+ * thrown by such a task can be obtained via {@link Future#get}.
*
- * @param command the task to execute
* @throws RejectedExecutionException at discretion of
- * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
- * for execution because the executor has been shut down.
- * @throws NullPointerException if command is null
+ * {@code RejectedExecutionHandler}, if the task
+ * cannot be accepted for execution because the
+ * executor has been shut down
+ * @throws NullPointerException {@inheritDoc}
*/
public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
schedule(command, 0, TimeUnit.NANOSECONDS);
}
// Override AbstractExecutorService methods
+ /**
+ * @throws RejectedExecutionException {@inheritDoc}
+ * @throws NullPointerException {@inheritDoc}
+ */
public Future<?> submit(Runnable task) {
return schedule(task, 0, TimeUnit.NANOSECONDS);
}
+ /**
+ * @throws RejectedExecutionException {@inheritDoc}
+ * @throws NullPointerException {@inheritDoc}
+ */
public <T> Future<T> submit(Runnable task, T result) {
- return schedule(Executors.callable(task, result),
+ return schedule(Executors.callable(task, result),
0, TimeUnit.NANOSECONDS);
}
+ /**
+ * @throws RejectedExecutionException {@inheritDoc}
+ * @throws NullPointerException {@inheritDoc}
+ */
public <T> Future<T> submit(Callable<T> task) {
return schedule(task, 0, TimeUnit.NANOSECONDS);
}
/**
- * Set policy on whether to continue executing existing periodic
- * tasks even when this executor has been <tt>shutdown</tt>. In
- * this case, these tasks will only terminate upon
- * <tt>shutdownNow</tt>, or after setting the policy to
- * <tt>false</tt> when already shutdown. This value is by default
- * false.
- * @param value if true, continue after shutdown, else don't.
- * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
+ * Sets the policy on whether to continue executing existing
+ * periodic tasks even when this executor has been {@code shutdown}.
+ * In this case, these tasks will only terminate upon
+ * {@code shutdownNow} or after setting the policy to
+ * {@code false} when already shutdown.
+ * This value is by default {@code false}.
+ *
+ * @param value if {@code true}, continue after shutdown, else don't.
+ * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
*/
public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
continueExistingPeriodicTasksAfterShutdown = value;
if (!value && isShutdown())
- cancelUnwantedTasks();
+ onShutdown();
}
/**
- * Get the policy on whether to continue executing existing
- * periodic tasks even when this executor has been
- * <tt>shutdown</tt>. In this case, these tasks will only
- * terminate upon <tt>shutdownNow</tt> or after setting the policy
- * to <tt>false</tt> when already shutdown. This value is by
- * default false.
- * @return true if will continue after shutdown.
+ * Gets the policy on whether to continue executing existing
+ * periodic tasks even when this executor has been {@code shutdown}.
+ * In this case, these tasks will only terminate upon
+ * {@code shutdownNow} or after setting the policy to
+ * {@code false} when already shutdown.
+ * This value is by default {@code false}.
+ *
+ * @return {@code true} if will continue after shutdown
* @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
*/
public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
@@ -413,66 +563,79 @@
}
/**
- * Set policy on whether to execute existing delayed
- * tasks even when this executor has been <tt>shutdown</tt>. In
- * this case, these tasks will only terminate upon
- * <tt>shutdownNow</tt>, or after setting the policy to
- * <tt>false</tt> when already shutdown. This value is by default
- * true.
- * @param value if true, execute after shutdown, else don't.
+ * Sets the policy on whether to execute existing delayed
+ * tasks even when this executor has been {@code shutdown}.
+ * In this case, these tasks will only terminate upon
+ * {@code shutdownNow}, or after setting the policy to
+ * {@code false} when already shutdown.
+ * This value is by default {@code true}.
+ *
+ * @param value if {@code true}, execute after shutdown, else don't.
* @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
*/
public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
executeExistingDelayedTasksAfterShutdown = value;
if (!value && isShutdown())
- cancelUnwantedTasks();
+ onShutdown();
}
/**
- * Get policy on whether to execute existing delayed
- * tasks even when this executor has been <tt>shutdown</tt>. In
- * this case, these tasks will only terminate upon
- * <tt>shutdownNow</tt>, or after setting the policy to
- * <tt>false</tt> when already shutdown. This value is by default
- * true.
- * @return true if will execute after shutdown.
+ * Gets the policy on whether to execute existing delayed
+ * tasks even when this executor has been {@code shutdown}.
+ * In this case, these tasks will only terminate upon
+ * {@code shutdownNow}, or after setting the policy to
+ * {@code false} when already shutdown.
+ * This value is by default {@code true}.
+ *
+ * @return {@code true} if will execute after shutdown
* @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
*/
public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
return executeExistingDelayedTasksAfterShutdown;
}
-
/**
* Initiates an orderly shutdown in which previously submitted
- * tasks are executed, but no new tasks will be accepted. If the
- * <tt>ExecuteExistingDelayedTasksAfterShutdownPolicy</tt> has
- * been set <tt>false</tt>, existing delayed tasks whose delays
- * have not yet elapsed are cancelled. And unless the
- * <tt>ContinueExistingPeriodicTasksAfterShutdownPolicy</tt> has
- * been set <tt>true</tt>, future executions of existing periodic
- * tasks will be cancelled.
+ * tasks are executed, but no new tasks will be accepted.
+ * Invocation has no additional effect if already shut down.
+ *
+ * <p>This method does not wait for previously submitted tasks to
+ * complete execution. Use {@link #awaitTermination awaitTermination}
+ * to do that.
+ *
+ * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
+ * has been set {@code false}, existing delayed tasks whose delays
+ * have not yet elapsed are cancelled. And unless the {@code
+ * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
+ * {@code true}, future executions of existing periodic tasks will
+ * be cancelled.
+ *
+ * @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
- cancelUnwantedTasks();
super.shutdown();
}
/**
* Attempts to stop all actively executing tasks, halts the
- * processing of waiting tasks, and returns a list of the tasks that were
- * awaiting execution.
- *
+ * processing of waiting tasks, and returns a list of the tasks
+ * that were awaiting execution.
+ *
+ * <p>This method does not wait for actively executing tasks to
+ * terminate. Use {@link #awaitTermination awaitTermination} to
+ * do that.
+ *
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
- * cancels tasks via {@link Thread#interrupt}, so if any tasks mask or
- * fail to respond to interrupts, they may never terminate.
+ * cancels tasks via {@link Thread#interrupt}, so any task that
+ * fails to respond to interrupts may never terminate.
*
- * @return list of tasks that never commenced execution. Each
- * element of this list is a {@link ScheduledFuture},
- * including those tasks submitted using <tt>execute</tt>, which
- * are for scheduling purposes used as the basis of a zero-delay
- * <tt>ScheduledFuture</tt>.
+ * @return list of tasks that never commenced execution.
+ * Each element of this list is a {@link ScheduledFuture},
+ * including those tasks submitted using {@code execute},
+ * which are for scheduling purposes used as the basis of a
+ * zero-delay {@code ScheduledFuture}.
+ * @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
return super.shutdownNow();
@@ -481,9 +644,9 @@
/**
* Returns the task queue used by this executor. Each element of
* this queue is a {@link ScheduledFuture}, including those
- * tasks submitted using <tt>execute</tt> which are for scheduling
+ * tasks submitted using {@code execute} which are for scheduling
* purposes used as the basis of a zero-delay
- * <tt>ScheduledFuture</tt>. Iteration over this queue is
+ * {@code ScheduledFuture}. Iteration over this queue is
* <em>not</em> guaranteed to traverse tasks in the order in
* which they will execute.
*
@@ -494,52 +657,478 @@
}
/**
- * An annoying wrapper class to convince generics compiler to
- * use a DelayQueue<ScheduledFutureTask> as a BlockingQueue<Runnable>
- */
- private static class DelayedWorkQueue
- extends AbstractCollection<Runnable>
+ * Specialized delay queue. To mesh with TPE declarations, this
+ * class must be declared as a BlockingQueue<Runnable> even though
+ * it can only hold RunnableScheduledFutures.
+ */
+ static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
-
- private final DelayQueue<ScheduledFutureTask> dq = new DelayQueue<ScheduledFutureTask>();
- public Runnable poll() { return dq.poll(); }
- public Runnable peek() { return dq.peek(); }
- public Runnable take() throws InterruptedException { return dq.take(); }
- public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
- return dq.poll(timeout, unit);
- }
-
- public boolean add(Runnable x) { return dq.add((ScheduledFutureTask)x); }
- public boolean offer(Runnable x) { return dq.offer((ScheduledFutureTask)x); }
- public void put(Runnable x) {
- dq.put((ScheduledFutureTask)x);
- }
- public boolean offer(Runnable x, long timeout, TimeUnit unit) {
- return dq.offer((ScheduledFutureTask)x, timeout, unit);
- }
-
- public Runnable remove() { return dq.remove(); }
- public Runnable element() { return dq.element(); }
- public void clear() { dq.clear(); }
- public int drainTo(Collection<? super Runnable> c) { return dq.drainTo(c); }
- public int drainTo(Collection<? super Runnable> c, int maxElements) {
- return dq.drainTo(c, maxElements);
- }
-
- public int remainingCapacity() { return dq.remainingCapacity(); }
- public boolean remove(Object x) { return dq.remove(x); }
- public boolean contains(Object x) { return dq.contains(x); }
- public int size() { return dq.size(); }
- public boolean isEmpty() { return dq.isEmpty(); }
- public Object[] toArray() { return dq.toArray(); }
- public <T> T[] toArray(T[] array) { return dq.toArray(array); }
- public Iterator<Runnable> iterator() {
- return new Iterator<Runnable>() {
- private Iterator<ScheduledFutureTask> it = dq.iterator();
- public boolean hasNext() { return it.hasNext(); }
- public Runnable next() { return it.next(); }
- public void remove() { it.remove(); }
- };
+
+ /*
+ * A DelayedWorkQueue is based on a heap-based data structure
+ * like those in DelayQueue and PriorityQueue, except that
+ * every ScheduledFutureTask also records its index into the
+ * heap array. This eliminates the need to find a task upon
+ * cancellation, greatly speeding up removal (down from O(n)
+ * to O(log n)), and reducing garbage retention that would
+ * otherwise occur by waiting for the element to rise to top
+ * before clearing. But because the queue may also hold
+ * RunnableScheduledFutures that are not ScheduledFutureTasks,
+ * we are not guaranteed to have such indices available, in
+ * which case we fall back to linear search. (We expect that
+ * most tasks will not be decorated, and that the faster cases
+ * will be much more common.)
+ *
+ * All heap operations must record index changes -- mainly
+ * within siftUp and siftDown. Upon removal, a task's
+ * heapIndex is set to -1. Note that ScheduledFutureTasks can
+ * appear at most once in the queue (this need not be true for
+ * other kinds of tasks or work queues), so are uniquely
+ * identified by heapIndex.
+ */
+
+ private static final int INITIAL_CAPACITY = 16;
+ private ScheduledFutureTask[] queue =
+ new ScheduledFutureTask[INITIAL_CAPACITY];
+ private final ReentrantLock lock = new ReentrantLock();
+ private int size = 0;
+
+ /**
+ * Thread designated to wait for the task at the head of the
+ * queue. This variant of the Leader-Follower pattern
+ * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
+ * minimize unnecessary timed waiting. When a thread becomes
+ * the leader, it waits only for the next delay to elapse, but
+ * other threads await indefinitely. The leader thread must
+ * signal some other thread before returning from take() or
+ * poll(...), unless some other thread becomes leader in the
+ * interim. Whenever the head of the queue is replaced with a
+ * task with an earlier expiration time, the leader field is
+ * invalidated by being reset to null, and some waiting
+ * thread, but not necessarily the current leader, is
+ * signalled. So waiting threads must be prepared to acquire
+ * and lose leadership while waiting.
+ */
+ private Thread leader = null;
+
+ /**
+ * Condition signalled when a newer task becomes available at the
+ * head of the queue or a new thread may need to become leader.
+ */
+ private final Condition available = lock.newCondition();
+
+ /**
+ * Set f's heapIndex if it is a ScheduledFutureTask.
+ */
+ private void setIndex(ScheduledFutureTask f, int idx) {
+ if (f instanceof ScheduledFutureTask)
+ ((ScheduledFutureTask)f).heapIndex = idx;
+ }
+
+ /**
+ * Sift element added at bottom up to its heap-ordered spot.
+ * Call only when holding lock.
+ */
+ private void siftUp(int k, ScheduledFutureTask key) {
+ while (k > 0) {
+ int parent = (k - 1) >>> 1;
+ ScheduledFutureTask e = queue[parent];
+ if (key.compareTo(e) >= 0)
+ break;
+ queue[k] = e;
+ setIndex(e, k);
+ k = parent;
+ }
+ queue[k] = key;
+ setIndex(key, k);
+ }
+
+ /**
+ * Sift element added at top down to its heap-ordered spot.
+ * Call only when holding lock.
+ */
+ private void siftDown(int k, ScheduledFutureTask key) {
+ int half = size >>> 1;
+ while (k < half) {
+ int child = (k << 1) + 1;
+ ScheduledFutureTask c = queue[child];
+ int right = child + 1;
+ if (right < size && c.compareTo(queue[right]) > 0)
+ c = queue[child = right];
+ if (key.compareTo(c) <= 0)
+ break;
+ queue[k] = c;
+ setIndex(c, k);
+ k = child;
+ }
+ queue[k] = key;
+ setIndex(key, k);
+ }
+
+ /**
+ * Resize the heap array. Call only when holding lock.
+ */
+ private void grow() {
+ int oldCapacity = queue.length;
+ int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
+ if (newCapacity < 0) // overflow
+ newCapacity = Integer.MAX_VALUE;
+ queue = Java6Arrays.copyOf(queue, newCapacity);
+ }
+
+ /**
+ * Find index of given object, or -1 if absent
+ */
+ private int indexOf(Object x) {
+ if (x != null) {
+ if (x instanceof ScheduledFutureTask) {
+ int i = ((ScheduledFutureTask) x).heapIndex;
+ // Sanity check; x could conceivably be a
+ // ScheduledFutureTask from some other pool.
+ if (i >= 0 && i < size && queue[i] == x)
+ return i;
+ } else {
+ for (int i = 0; i < size; i++)
+ if (x.equals(queue[i]))
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ public boolean contains(Object x) {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ return indexOf(x) != -1;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean remove(Object x) {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ int i = indexOf(x);
+ if (i < 0)
+ return false;
+
+ setIndex(queue[i], -1);
+ int s = --size;
+ ScheduledFutureTask replacement = queue[s];
+ queue[s] = null;
+ if (s != i) {
+ siftDown(i, replacement);
+ if (queue[i] == replacement)
+ siftUp(i, replacement);
+ }
+ return true;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public int size() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ return size;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean isEmpty() {
+ return size() == 0;
+ }
+
+ public int remainingCapacity() {
+ return Integer.MAX_VALUE;
+ }
+
+ public ScheduledFutureTask peek() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ return queue[0];
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean offer(Runnable x) {
+ if (x == null)
+ throw new NullPointerException();
+ ScheduledFutureTask e = (ScheduledFutureTask)x;
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ int i = size;
+ if (i >= queue.length)
+ grow();
+ size = i + 1;
+ if (i == 0) {
+ queue[0] = e;
+ setIndex(e, 0);
+ } else {
+ siftUp(i, e);
+ }
+ if (queue[0] == e) {
+ leader = null;
+ available.signal();
+ }
+ } finally {
+ lock.unlock();
+ }
+ return true;
+ }
+
+ public void put(Runnable e) {
+ offer(e);
+ }
+
+ public boolean add(Runnable e) {
+ return offer(e);
+ }
+
+ public boolean offer(Runnable e, long timeout, TimeUnit unit) {
+ return offer(e);
+ }
+
+ /**
+ * Performs common bookkeeping for poll and take: Replaces
+ * first element with last and sifts it down. Call only when
+ * holding lock.
+ * @param f the task to remove and return
+ */
+ private ScheduledFutureTask finishPoll(ScheduledFutureTask f) {
+ int s = --size;
+ ScheduledFutureTask x = queue[s];
+ queue[s] = null;
+ if (s != 0)
+ siftDown(0, x);
+ setIndex(f, -1);
+ return f;
+ }
+
+ public ScheduledFutureTask poll() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ ScheduledFutureTask first = queue[0];
+ if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
+ return null;
+ else
+ return finishPoll(first);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public ScheduledFutureTask take() throws InterruptedException {
+ final ReentrantLock lock = this.lock;
+ lock.lockInterruptibly();
+ try {
+ for (;;) {
+ ScheduledFutureTask first = queue[0];
+ if (first == null)
+ available.await();
+ else {
+ long delay = first.getDelay(TimeUnit.NANOSECONDS);
+ if (delay <= 0)
+ return finishPoll(first);
+ else if (leader != null)
+ available.await();
+ else {
+ Thread thisThread = Thread.currentThread();
+ leader = thisThread;
+ try {
+ available.awaitNanos(delay);
+ } finally {
+ if (leader == thisThread)
+ leader = null;
+ }
+ }
+ }
+ }
+ } finally {
+ if (leader == null && queue[0] != null)
+ available.signal();
+ lock.unlock();
+ }
+ }
+
+ public ScheduledFutureTask poll(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ long nanos = unit.toNanos(timeout);
+ final ReentrantLock lock = this.lock;
+ lock.lockInterruptibly();
+ try {
+ for (;;) {
+ ScheduledFutureTask first = queue[0];
+ if (first == null) {
+ if (nanos <= 0)
+ return null;
+ else
+ nanos = available.awaitNanos(nanos);
+ } else {
+ long delay = first.getDelay(TimeUnit.NANOSECONDS);
+ if (delay <= 0)
+ return finishPoll(first);
+ if (nanos <= 0)
+ return null;
+ if (nanos < delay || leader != null)
+ nanos = available.awaitNanos(nanos);
+ else {
+ Thread thisThread = Thread.currentThread();
+ leader = thisThread;
+ try {
+ long timeLeft = available.awaitNanos(delay);
+ nanos -= delay - timeLeft;
+ } finally {
+ if (leader == thisThread)
+ leader = null;
+ }
+ }
+ }
+ }
+ } finally {
+ if (leader == null && queue[0] != null)
+ available.signal();
+ lock.unlock();
+ }
+ }
+
+ public void clear() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ for (int i = 0; i < size; i++) {
+ ScheduledFutureTask t = queue[i];
+ if (t != null) {
+ queue[i] = null;
+ setIndex(t, -1);
+ }
+ }
+ size = 0;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Return and remove first element only if it is expired.
+ * Used only by drainTo. Call only when holding lock.
+ */
+ private ScheduledFutureTask pollExpired() {
+ ScheduledFutureTask first = queue[0];
+ if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
+ return null;
+ return finishPoll(first);
+ }
+
+ public int drainTo(Collection<? super Runnable> c) {
+ if (c == null)
+ throw new NullPointerException();
+ if (c == this)
+ throw new IllegalArgumentException();
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ ScheduledFutureTask first;
+ int n = 0;
+ while ((first = pollExpired()) != null) {
+ c.add(first);
+ ++n;
+ }
+ return n;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public int drainTo(Collection<? super Runnable> c, int maxElements) {
+ if (c == null)
+ throw new NullPointerException();
+ if (c == this)
+ throw new IllegalArgumentException();
+ if (maxElements <= 0)
+ return 0;
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ ScheduledFutureTask first;
+ int n = 0;
+ while (n < maxElements && (first = pollExpired()) != null) {
+ c.add(first);
+ ++n;
+ }
+ return n;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public Object[] toArray() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ return Java6Arrays.copyOf(queue, size, Object[].class);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> T[] toArray(T[] a) {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ if (a.length < size)
+ return (T[]) Java6Arrays.copyOf(queue, size, a.getClass());
+ System.arraycopy(queue, 0, a, 0, size);
+ if (a.length > size)
+ a[size] = null;
+ return a;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public Iterator<Runnable> iterator() {
+ return new Itr(Java6Arrays.copyOf(queue, size));
+ }
+
+ /**
+ * Snapshot iterator that works off copy of underlying q array.
+ */
+ private class Itr implements Iterator<Runnable> {
+ final ScheduledFutureTask[] array;
+ int cursor = 0; // index of next element to return
+ int lastRet = -1; // index of last element, or -1 if no such
+
+ Itr(ScheduledFutureTask[] array) {
+ this.array = array;
+ }
+
+ public boolean hasNext() {
+ return cursor < array.length;
+ }
+
+ public Runnable next() {
+ if (cursor >= array.length)
+ throw new NoSuchElementException();
+ lastRet = cursor;
+ return array[cursor++];
+ }
+
+ public void remove() {
+ if (lastRet < 0)
+ throw new IllegalStateException();
+ DelayedWorkQueue.this.remove(array[lastRet]);
+ lastRet = -1;
+ }
}
}
}