You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 08:57:31 UTC

svn commit: r749218 [6/34] - in /incubator/cassandra: branches/ dist/ nightly/ site/ tags/ trunk/ trunk/lib/ trunk/src/ trunk/src/org/ trunk/src/org/apache/ trunk/src/org/apache/cassandra/ trunk/src/org/apache/cassandra/analytics/ trunk/src/org/apache/...

Added: incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/ContinuationsExecutor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/ContinuationsExecutor.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/ContinuationsExecutor.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/ContinuationsExecutor.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,2244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.*;
+import java.util.concurrent.atomic.*;
+import java.util.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.Cachetable;
+import org.apache.cassandra.utils.GuidGenerator;
+import org.apache.cassandra.utils.ICachetable;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.commons.javaflow.Continuation;
+import org.apache.log4j.Logger;
+
+
+/**
+ * An {@link ExecutorService} that executes each submitted task using one of
+ * possibly several pooled threads, normally configured using {@link Executors}
+ * factory methods.
+ * 
+ * <p>
+ * Thread pools address two different problems: they usually provide improved
+ * performance when executing large numbers of asynchronous tasks, due to
+ * reduced per-task invocation overhead, and they provide a means of bounding
+ * and managing the resources, including threads, consumed when executing a
+ * collection of tasks. Each {@code ContinuationsExecutor} also maintains some
+ * basic statistics, such as the number of completed tasks.
+ * 
+ * <p>
+ * To be useful across a wide range of contexts, this class provides many
+ * adjustable parameters and extensibility hooks. However, programmers are urged
+ * to use the more convenient {@link Executors} factory methods {@link
+ * Executors#newCachedThreadPool} (unbounded thread pool, with automatic thread
+ * reclamation), {@link Executors#newFixedThreadPool} (fixed size thread pool)
+ * and {@link Executors#newSingleThreadExecutor} (single background thread),
+ * that preconfigure settings for the most common usage scenarios. Otherwise,
+ * use the following guide when manually configuring and tuning this class:
+ * 
+ * <dl>
+ * 
+ * <dt>Core and maximum pool sizes</dt>
+ * 
+ * <dd>A {@code ContinuationsExecutor} will automatically adjust the pool size
+ * (see {@link #getPoolSize}) according to the bounds set by corePoolSize (see
+ * {@link #getCorePoolSize}) and maximumPoolSize (see
+ * {@link #getMaximumPoolSize}).
+ * 
+ * When a new task is submitted in method {@link #execute}, and fewer than
+ * corePoolSize threads are running, a new thread is created to handle the
+ * request, even if other worker threads are idle. If there are more than
+ * corePoolSize but less than maximumPoolSize threads running, a new thread will
+ * be created only if the queue is full. By setting corePoolSize and
+ * maximumPoolSize the same, you create a fixed-size thread pool. By setting
+ * maximumPoolSize to an essentially unbounded value such as
+ * {@code Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary
+ * number of concurrent tasks. Most typically, core and maximum pool sizes are
+ * set only upon construction, but they may also be changed dynamically using
+ * {@link #setCorePoolSize} and {@link #setMaximumPoolSize}. </dd>
+ * 
+ * <dt>On-demand construction</dt>
+ * 
+ * <dd> By default, even core threads are initially created and started only
+ * when new tasks arrive, but this can be overridden dynamically using method
+ * {@link #prestartCoreThread} or {@link #prestartAllCoreThreads}. You probably
+ * want to prestart threads if you construct the pool with a non-empty queue.
+ * </dd>
+ * 
+ * <dt>Creating new threads</dt>
+ * 
+ * <dd>New threads are created using a {@link ThreadFactory}. If not otherwise
+ * specified, a {@link Executors#defaultThreadFactory} is used, that creates
+ * threads to all be in the same {@link ThreadGroup} and with the same
+ * {@code NORM_PRIORITY} priority and non-daemon status. By supplying a
+ * different ThreadFactory, you can alter the thread's name, thread group,
+ * priority, daemon status, etc. If a {@code ThreadFactory} fails to create a
+ * thread when asked by returning null from {@code newThread}, the executor
+ * will continue, but might not be able to execute any tasks. Threads should
+ * possess the "modifyThread" {@code RuntimePermission}. If worker threads or
+ * other threads using the pool do not possess this permission, service may be
+ * degraded: configuration changes may not take effect in a timely manner, and a
+ * shutdown pool may remain in a state in which termination is possible but not
+ * completed.</dd>
+ * 
+ * <dt>Keep-alive times</dt>
+ * 
+ * <dd>If the pool currently has more than corePoolSize threads, excess threads
+ * will be terminated if they have been idle for more than the keepAliveTime
+ * (see {@link #getKeepAliveTime}). This provides a means of reducing resource
+ * consumption when the pool is not being actively used. If the pool becomes
+ * more active later, new threads will be constructed. This parameter can also
+ * be changed dynamically using method {@link #setKeepAliveTime}. Using a value
+ * of {@code Long.MAX_VALUE} {@link TimeUnit#NANOSECONDS} effectively disables
+ * idle threads from ever terminating prior to shut down. By default, the
+ * keep-alive policy applies only when there are more than corePoolSizeThreads.
+ * But method {@link #allowCoreThreadTimeOut(boolean)} can be used to apply this
+ * time-out policy to core threads as well, so long as the keepAliveTime value
+ * is non-zero. </dd>
+ * 
+ * <dt>Queuing</dt>
+ * 
+ * <dd>Any {@link BlockingQueue} may be used to transfer and hold submitted
+ * tasks. The use of this queue interacts with pool sizing:
+ * 
+ * <ul>
+ * 
+ * <li> If fewer than corePoolSize threads are running, the Executor always
+ * prefers adding a new thread rather than queuing.</li>
+ * 
+ * <li> If corePoolSize or more threads are running, the Executor always prefers
+ * queuing a request rather than adding a new thread.</li>
+ * 
+ * <li> If a request cannot be queued, a new thread is created unless this would
+ * exceed maximumPoolSize, in which case, the task will be rejected.</li>
+ * 
+ * </ul>
+ * 
+ * There are three general strategies for queuing:
+ * <ol>
+ * 
+ * <li> <em> Direct handoffs.</em> A good default choice for a work queue is a
+ * {@link SynchronousQueue} that hands off tasks to threads without otherwise
+ * holding them. Here, an attempt to queue a task will fail if no threads are
+ * immediately available to run it, so a new thread will be constructed. This
+ * policy avoids lockups when handling sets of requests that might have internal
+ * dependencies. Direct handoffs generally require unbounded maximumPoolSizes to
+ * avoid rejection of new submitted tasks. This in turn admits the possibility
+ * of unbounded thread growth when commands continue to arrive on average faster
+ * than they can be processed. </li>
+ * 
+ * <li><em> Unbounded queues.</em> Using an unbounded queue (for example a
+ * {@link LinkedBlockingQueue} without a predefined capacity) will cause new
+ * tasks to wait in the queue when all corePoolSize threads are busy. Thus, no
+ * more than corePoolSize threads will ever be created. (And the value of the
+ * maximumPoolSize therefore doesn't have any effect.) This may be appropriate
+ * when each task is completely independent of others, so tasks cannot affect
+ * each others execution; for example, in a web page server. While this style of
+ * queuing can be useful in smoothing out transient bursts of requests, it
+ * admits the possibility of unbounded work queue growth when commands continue
+ * to arrive on average faster than they can be processed. </li>
+ * 
+ * <li><em>Bounded queues.</em> A bounded queue (for example, an
+ * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when used with
+ * finite maximumPoolSizes, but can be more difficult to tune and control. Queue
+ * sizes and maximum pool sizes may be traded off for each other: Using large
+ * queues and small pools minimizes CPU usage, OS resources, and
+ * context-switching overhead, but can lead to artificially low throughput. If
+ * tasks frequently block (for example if they are I/O bound), a system may be
+ * able to schedule time for more threads than you otherwise allow. Use of small
+ * queues generally requires larger pool sizes, which keeps CPUs busier but may
+ * encounter unacceptable scheduling overhead, which also decreases throughput.
+ * </li>
+ * 
+ * </ol>
+ * 
+ * </dd>
+ * 
+ * <dt>Rejected tasks</dt>
+ * 
+ * <dd> New tasks submitted in method {@link #execute} will be <em>rejected</em>
+ * when the Executor has been shut down, and also when the Executor uses finite
+ * bounds for both maximum threads and work queue capacity, and is saturated. In
+ * either case, the {@code execute} method invokes the {@link
+ * RejectedExecutionHandler#rejectedExecution} method of its {@link
+ * RejectedExecutionHandler}. Four predefined handler policies are provided:
+ * 
+ * <ol>
+ * 
+ * <li> In the default {@link ContinuationsExecutor.AbortPolicy}, the handler
+ * throws a runtime {@link RejectedExecutionException} upon rejection. </li>
+ * 
+ * <li> In {@link ContinuationsExecutor.CallerRunsPolicy}, the thread that invokes
+ * {@code execute} itself runs the task. This provides a simple feedback control
+ * mechanism that will slow down the rate that new tasks are submitted. </li>
+ * 
+ * <li> In {@link ContinuationsExecutor.DiscardPolicy}, a task that cannot be
+ * executed is simply dropped. </li>
+ * 
+ * <li>In {@link ContinuationsExecutor.DiscardOldestPolicy}, if the executor is
+ * not shut down, the task at the head of the work queue is dropped, and then
+ * execution is retried (which can fail again, causing this to be repeated.)
+ * </li>
+ * 
+ * </ol>
+ * 
+ * It is possible to define and use other kinds of {@link
+ * RejectedExecutionHandler} classes. Doing so requires some care especially
+ * when policies are designed to work only under particular capacity or queuing
+ * policies. </dd>
+ * 
+ * <dt>Hook methods</dt>
+ * 
+ * <dd>This class provides {@code protected} overridable {@link #beforeExecute}
+ * and {@link #afterExecute} methods that are called before and after execution
+ * of each task. These can be used to manipulate the execution environment; for
+ * example, reinitializing ThreadLocals, gathering statistics, or adding log
+ * entries. Additionally, method {@link #terminated} can be overridden to
+ * perform any special processing that needs to be done once the Executor has
+ * fully terminated.
+ * 
+ * <p>
+ * If hook or callback methods throw exceptions, internal worker threads may in
+ * turn fail and abruptly terminate.</dd>
+ * 
+ * <dt>Queue maintenance</dt>
+ * 
+ * <dd> Method {@link #getQueue} allows access to the work queue for purposes of
+ * monitoring and debugging. Use of this method for any other purpose is
+ * strongly discouraged. Two supplied methods, {@link #remove} and
+ * {@link #purge} are available to assist in storage reclamation when large
+ * numbers of queued tasks become cancelled.</dd>
+ * 
+ * <dt>Finalization</dt>
+ * 
+ * <dd> A pool that is no longer referenced in a program <em>AND</em> has no
+ * remaining threads will be {@code shutdown} automatically. If you would like
+ * to ensure that unreferenced pools are reclaimed even if users forget to call
+ * {@link #shutdown}, then you must arrange that unused threads eventually die,
+ * by setting appropriate keep-alive times, using a lower bound of zero core
+ * threads and/or setting {@link #allowCoreThreadTimeOut(boolean)}. </dd>
+ * 
+ * </dl>
+ * 
+ * <p>
+ * <b>Extension example</b>. Most extensions of this class override one or more
+ * of the protected hook methods. For example, here is a subclass that adds a
+ * simple pause/resume feature:
+ * 
+ * <pre>
+ *  {@code
+ *  class PausableThreadPoolExecutor extends ContinuationsExecutor {
+ *    private boolean isPaused;
+ *    private ReentrantLock pauseLock = new ReentrantLock();
+ *    private Condition unpaused = pauseLock.newCondition();
+ * 
+ *    public PausableThreadPoolExecutor(...) { super(...); }
+ * 
+ *    protected void beforeExecute(Thread t, Runnable r) {
+ *      super.beforeExecute(t, r);
+ *      pauseLock.lock();
+ *      try {
+ *        while (isPaused) unpaused.await();
+ *      } catch (InterruptedException ie) {
+ *        t.interrupt();
+ *      } finally {
+ *        pauseLock.unlock();
+ *      }
+ *    }
+ * 
+ *    public void pause() {
+ *      pauseLock.lock();
+ *      try {
+ *        isPaused = true;
+ *      } finally {
+ *        pauseLock.unlock();
+ *      }
+ *    }
+ * 
+ *    public void resume() {
+ *      pauseLock.lock();
+ *      try {
+ *        isPaused = false;
+ *        unpaused.signalAll();
+ *      } finally {
+ *        pauseLock.unlock();
+ *      }
+ *    }
+ *  }}
+ * </pre>
+ * 
+ * @since 1.5
+ * @author Doug Lea
+ */
+public class ContinuationsExecutor extends AbstractExecutorService
+{
+    /**
+     * The main pool control state, ctl, is an atomic integer packing two
+     * conceptual fields workerCount, indicating the effective number of threads
+     * runState, indicating whether running, shutting down etc
+     * 
+     * In order to pack them into one int, we limit workerCount to (2^29)-1
+     * (about 500 million) threads rather than (2^31)-1 (2 billion) otherwise
+     * representable. If this is ever an issue in the future, the variable can
+     * be changed to be an AtomicLong, and the shift/mask constants below
+     * adjusted. But until the need arises, this code is a bit faster and
+     * simpler using an int.
+     * 
+     * The workerCount is the number of workers that have been permitted to
+     * start and not permitted to stop. The value may be transiently different
+     * from the actual number of live threads, for example when a ThreadFactory
+     * fails to create a thread when asked, and when exiting threads are still
+     * performing bookkeeping before terminating. The user-visible pool size is
+     * reported as the current size of the workers set.
+     * 
+     * The runState provides the main lifecyle control, taking on values:
+     * 
+     * RUNNING: Accept new tasks and process queued tasks SHUTDOWN: Don't accept
+     * new tasks, but process queued tasks STOP: Don't accept new tasks, don't
+     * process queued tasks, and interrupt in-progress tasks TIDYING: All tasks
+     * have terminated, workerCount is zero, the thread transitioning to state
+     * TIDYING will run the terminated() hook method TERMINATED: terminated()
+     * has completed
+     * 
+     * The numerical order among these values matters, to allow ordered
+     * comparisons. The runState monotonically increases over time, but need not
+     * hit each state. The transitions are:
+     * 
+     * RUNNING -> SHUTDOWN On invocation of shutdown(), perhaps implicitly in
+     * finalize() (RUNNING or SHUTDOWN) -> STOP On invocation of shutdownNow()
+     * SHUTDOWN -> TIDYING When both queue and pool are empty STOP -> TIDYING
+     * When pool is empty TIDYING -> TERMINATED When the terminated() hook
+     * method has completed
+     * 
+     * Threads waiting in awaitTermination() will return when the state reaches
+     * TERMINATED.
+     * 
+     * Detecting the transition from SHUTDOWN to TIDYING is less straightforward
+     * than you'd like because the queue may become empty after non-empty and
+     * vice versa during SHUTDOWN state, but we can only terminate if, after
+     * seeing that it is empty, we see that workerCount is 0 (which sometimes
+     * entails a recheck -- see below).
+     */
+    private final static Logger logger_ = Logger.getLogger( ContinuationsExecutor.class );
+    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
+    private final static ThreadLocal<IContinuable> tls_ = new ThreadLocal<IContinuable>();
+    private static final int COUNT_BITS = Integer.SIZE - 3;
+    private static final int CAPACITY = (1 << COUNT_BITS) - 1;
+
+    // runState is stored in the high-order bits
+    private static final int RUNNING = -1 << COUNT_BITS;
+    private static final int SHUTDOWN = 0 << COUNT_BITS;
+    private static final int STOP = 1 << COUNT_BITS;
+    private static final int TIDYING = 2 << COUNT_BITS;
+    private static final int TERMINATED = 3 << COUNT_BITS;
+    
+    public static void putInTls(IContinuable run)
+    {
+        tls_.set(run);
+    }
+    
+    public static void doPostProcessing(Continuation c)
+    {
+        /* post process the call if need be */ 
+        IContinuable run = ContinuationsExecutor.tls_.get();
+        if ( run != null )
+        {              
+            run.run(c);                        
+        }
+        ContinuationsExecutor.tls_.remove();
+    }
+    
+    // Packing and unpacking ctl
+    private static int runStateOf(int c)
+    {
+        return c & ~CAPACITY;
+    }
+
+    private static int workerCountOf(int c)
+    {
+        return c & CAPACITY;
+    }
+
+    private static int ctlOf(int rs, int wc)
+    {
+        return rs | wc;
+    }
+
+    /*
+     * Bit field accessors that don't require unpacking ctl. These depend on the
+     * bit layout and on workerCount being never negative.
+     */
+
+    private static boolean runStateLessThan(int c, int s)
+    {
+        return c < s;
+    }
+
+    private static boolean runStateAtLeast(int c, int s)
+    {
+        return c >= s;
+    }
+
+    private static boolean isRunning(int c)
+    {
+        return c < SHUTDOWN;
+    }
+
+    /**
+     * Attempt to CAS-increment the workerCount field of ctl.
+     */
+    private boolean compareAndIncrementWorkerCount(int expect)
+    {
+        return ctl.compareAndSet(expect, expect + 1);
+    }
+    
+    /**
+     * Attempt to CAS-decrement the workerCount field of ctl.
+     */
+    private boolean compareAndDecrementWorkerCount(int expect)
+    {
+        return ctl.compareAndSet(expect, expect - 1);
+    }
+
+    /**
+     * Decrements the workerCount field of ctl. This is called only on abrupt
+     * termination of a thread (see processWorkerExit). Other decrements are
+     * performed within getTask.
+     */
+    private void decrementWorkerCount()
+    {
+        do
+        {
+        }
+        while (!compareAndDecrementWorkerCount(ctl.get()));
+    }
+
+    /**
+     * The queue used for holding tasks and handing off to worker threads. We do
+     * not require that workQueue.poll() returning null necessarily means that
+     * workQueue.isEmpty(), so rely solely on isEmpty to see if the queue is
+     * empty (which we must do for example when deciding whether to transition
+     * from SHUTDOWN to TIDYING). This accommodates special-purpose queues such
+     * as DelayQueues for which poll() is allowed to return null even if it may
+     * later return non-null when delays expire.
+     */
+    private final BlockingQueue<Runnable> workQueue;
+
+    /**
+     * Lock held on access to workers set and related bookkeeping. While we
+     * could use a concurrent set of some sort, it turns out to be generally
+     * preferable to use a lock. Among the reasons is that this serializes
+     * interruptIdleWorkers, which avoids unnecessary interrupt storms,
+     * especially during shutdown. Otherwise exiting threads would concurrently
+     * interrupt those that have not yet interrupted. It also simplifies some of
+     * the associated statistics bookkeeping of largestPoolSize etc. We also
+     * hold mainLock on shutdown and shutdownNow, for the sake of ensuring
+     * workers set is stable while separately checking permission to interrupt
+     * and actually interrupting.
+     */
+    private final ReentrantLock mainLock = new ReentrantLock();
+
+    /**
+     * Set containing all worker threads in pool. Accessed only when holding
+     * mainLock.
+     */
+    private final HashSet<Worker> workers = new HashSet<Worker>();
+
+    /**
+     * Wait condition to support awaitTermination
+     */
+    private final Condition termination = mainLock.newCondition();
+
+    /**
+     * Tracks largest attained pool size. Accessed only under mainLock.
+     */
+    private int largestPoolSize;
+
+    /**
+     * Counter for completed tasks. Updated only on termination of worker
+     * threads. Accessed only under mainLock.
+     */
+    private long completedTaskCount;
+
+    /*
+     * All user control parameters are declared as volatiles so that ongoing
+     * actions are based on freshest values, but without need for locking, since
+     * no internal invariants depend on them changing synchronously with respect
+     * to other actions.
+     */
+
+    /**
+     * Factory for new threads. All threads are created using this factory (via
+     * method addWorker). All callers must be prepared for addWorker to fail,
+     * which may reflect a system or user's policy limiting the number of
+     * threads. Even though it is not treated as an error, failure to create
+     * threads may result in new tasks being rejected or existing ones remaining
+     * stuck in the queue. On the other hand, no special precautions exist to
+     * handle OutOfMemoryErrors that might be thrown while trying to create
+     * threads, since there is generally no recourse from within this class.
+     */
+    private volatile ThreadFactory threadFactory;
+
+    /**
+     * Handler called when saturated or shutdown in execute.
+     */
+    private volatile RejectedExecutionHandler handler;
+
+    /**
+     * Timeout in nanoseconds for idle threads waiting for work. Threads use
+     * this timeout when there are more than corePoolSize present or if
+     * allowCoreThreadTimeOut. Otherwise they wait forever for new work.
+     */
+    private volatile long keepAliveTime;
+
+    /**
+     * If false (default), core threads stay alive even when idle. If true, core
+     * threads use keepAliveTime to time out waiting for work.
+     */
+    private volatile boolean allowCoreThreadTimeOut;
+
+    /**
+     * Core pool size is the minimum number of workers to keep alive (and not
+     * allow to time out etc) unless allowCoreThreadTimeOut is set, in which
+     * case the minimum is zero.
+     */
+    private volatile int corePoolSize;
+
+    /**
+     * Maximum pool size. Note that the actual maximum is internally bounded by
+     * CAPACITY.
+     */
+    private volatile int maximumPoolSize;
+
+    /**
+     * The default rejected execution handler
+     */
+    private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
+
+    /**
+     * Permission required for callers of shutdown and shutdownNow. We
+     * additionally require (see checkShutdownAccess) that callers have
+     * permission to actually interrupt threads in the worker set (as governed
+     * by Thread.interrupt, which relies on ThreadGroup.checkAccess, which in
+     * turn relies on SecurityManager.checkAccess). Shutdowns are attempted only
+     * if these checks pass.
+     * 
+     * All actual invocations of Thread.interrupt (see interruptIdleWorkers and
+     * interruptWorkers) ignore SecurityExceptions, meaning that the attempted
+     * interrupts silently fail. In the case of shutdown, they should not fail
+     * unless the SecurityManager has inconsistent policies, sometimes allowing
+     * access to a thread and sometimes not. In such cases, failure to actually
+     * interrupt threads may disable or delay full termination. Other uses of
+     * interruptIdleWorkers are advisory, and failure to actually interrupt will
+     * merely delay response to configuration changes so is not handled
+     * exceptionally.
+     */
+    private static final RuntimePermission shutdownPerm = new RuntimePermission(
+            "modifyThread");
+
+    /**
+     * Class Worker mainly maintains interrupt control state for threads running
+     * tasks, along with other minor bookkeeping. This class opportunistically
+     * extends AbstractQueuedSynchronizer to simplify acquiring and releasing a
+     * lock surrounding each task execution. This protects against interrupts
+     * that are intended to wake up a worker thread waiting for a task from
+     * instead interrupting a task being run. We implement a simple
+     * non-reentrant mutual exclusion lock rather than use ReentrantLock because
+     * we do not want worker tasks to be able to reacquire the lock when they
+     * invoke pool control methods like setCorePoolSize.
+     */
+    private final class Worker extends AbstractQueuedSynchronizer implements Runnable
+    {
+        /**
+         * This class will never be serialized, but we provide a
+         * serialVersionUID to suppress a javac warning.
+         */
+        private static final long serialVersionUID = 6138294804551838833L;
+
+        /** Thread this worker is running in. Null if factory fails. */
+        final Thread thread;
+        /** Initial task to run. Possibly null. */
+        Runnable firstTask;
+        /** Per-thread task counter */
+        volatile long completedTasks;
+
+        /**
+         * Creates with given first task and thread from ThreadFactory.
+         * 
+         * @param firstTask
+         *            the first task (null if none)
+         */
+        Worker(Runnable firstTask)
+        {
+            this.firstTask = firstTask;
+            this.thread = getThreadFactory().newThread(this);
+        }
+
+        /** Delegates main run loop to outer runWorker */
+        public void run()
+        {
+            runWorker(this);
+        }
+
+        // Lock methods
+        //
+        // The value 0 represents the unlocked state.
+        // The value 1 represents the locked state.
+
+        protected boolean isHeldExclusively()
+        {
+            return getState() == 1;
+        }
+
+        protected boolean tryAcquire(int unused)
+        {
+            if (compareAndSetState(0, 1))
+            {
+                setExclusiveOwnerThread(Thread.currentThread());
+                return true;
+            }
+            return false;
+        }
+
+        protected boolean tryRelease(int unused)
+        {
+            setExclusiveOwnerThread(null);
+            setState(0);
+            return true;
+        }
+
+        public void lock()
+        {
+            acquire(1);
+        }
+
+        public boolean tryLock()
+        {
+            return tryAcquire(1);
+        }
+
+        public void unlock()
+        {
+            release(1);
+        }
+
+        public boolean isLocked()
+        {
+            return isHeldExclusively();
+        }
+    }
+
+    /*
+     * Methods for setting control state
+     */
+
+    /**
+     * Transitions runState to given target, or leaves it alone if already at
+     * least the given target.
+     * 
+     * @param targetState
+     *            the desired state, either SHUTDOWN or STOP (but not TIDYING or
+     *            TERMINATED -- use tryTerminate for that)
+     */
+    private void advanceRunState(int targetState)
+    {
+        for (;;)
+        {
+            int c = ctl.get();
+            if (runStateAtLeast(c, targetState)
+                    || ctl.compareAndSet(c,
+                            ctlOf(targetState, workerCountOf(c))))
+                break;
+        }
+    }
+
+    /**
+     * Transitions to TERMINATED state if either (SHUTDOWN and pool and queue
+     * empty) or (STOP and pool empty). If otherwise eligible to terminate but
+     * workerCount is nonzero, interrupts an idle worker to ensure that shutdown
+     * signals propagate. This method must be called following any action that
+     * might make termination possible -- reducing worker count or removing
+     * tasks from the queue during shutdown. The method is non-private to allow
+     * access from ScheduledThreadPoolExecutor.
+     */
+    final void tryTerminate()
+    {
+        for (;;)
+        {
+            int c = ctl.get();
+            if (isRunning(c) || runStateAtLeast(c, TIDYING)
+                    || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
+                return;
+            if (workerCountOf(c) != 0)
+            { // Eligible to terminate
+                interruptIdleWorkers(ONLY_ONE);
+                return;
+            }
+
+            final ReentrantLock mainLock = this.mainLock;
+            mainLock.lock();
+            try
+            {
+                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0)))
+                {
+                    try
+                    {
+                        terminated();
+                    }
+                    finally
+                    {
+                        ctl.set(ctlOf(TERMINATED, 0));
+                        termination.signalAll();
+                    }
+                    return;
+                }
+            }
+            finally
+            {
+                mainLock.unlock();
+            }
+            // else retry on failed CAS
+        }
+    }
+
+    /*
+     * Methods for controlling interrupts to worker threads.
+     */
+
+    /**
+     * If there is a security manager, makes sure caller has permission to shut
+     * down threads in general (see shutdownPerm). If this passes, additionally
+     * makes sure the caller is allowed to interrupt each worker thread. This
+     * might not be true even if first check passed, if the SecurityManager
+     * treats some threads specially.
+     */
+    private void checkShutdownAccess()
+    {
+        SecurityManager security = System.getSecurityManager();
+        if (security != null)
+        {
+            security.checkPermission(shutdownPerm);
+            final ReentrantLock mainLock = this.mainLock;
+            mainLock.lock();
+            try
+            {
+                for (Worker w : workers)
+                    security.checkAccess(w.thread);
+            }
+            finally
+            {
+                mainLock.unlock();
+            }
+        }
+    }
+
+    /**
+     * Interrupts all threads, even if active. Ignores SecurityExceptions (in
+     * which case some threads may remain uninterrupted).
+     */
+    private void interruptWorkers()
+    {
+        final ReentrantLock mainLock = this.mainLock;
+        mainLock.lock();
+        try
+        {
+            for (Worker w : workers)
+            {
+                try
+                {
+                    w.thread.interrupt();
+                }
+                catch (SecurityException ignore)
+                {
+                }
+            }
+        }
+        finally
+        {
+            mainLock.unlock();
+        }
+    }
+
+    /**
+     * Interrupts threads that might be waiting for tasks (as indicated by not
+     * being locked) so they can check for termination or configuration changes.
+     * Ignores SecurityExceptions (in which case some threads may remain
+     * uninterrupted).
+     * 
+     * @param onlyOne
+     *            If true, interrupt at most one worker. This is called only
+     *            from tryTerminate when termination is otherwise enabled but
+     *            there are still other workers. In this case, at most one
+     *            waiting worker is interrupted to propagate shutdown signals in
+     *            case all threads are currently waiting. Interrupting any
+     *            arbitrary thread ensures that newly arriving workers since
+     *            shutdown began will also eventually exit. To guarantee
+     *            eventual termination, it suffices to always interrupt only one
+     *            idle worker, but shutdown() interrupts all idle workers so
+     *            that redundant workers exit promptly, not waiting for a
+     *            straggler task to finish.
+     */
+    private void interruptIdleWorkers(boolean onlyOne)
+    {
+        final ReentrantLock mainLock = this.mainLock;
+        mainLock.lock();
+        try
+        {
+            for (Worker w : workers)
+            {
+                Thread t = w.thread;
+                if (!t.isInterrupted() && w.tryLock())
+                {
+                    try
+                    {
+                        t.interrupt();
+                    }
+                    catch (SecurityException ignore)
+                    {
+                    }
+                    finally
+                    {
+                        w.unlock();
+                    }
+                }
+                if (onlyOne)
+                    break;
+            }
+        }
+        finally
+        {
+            mainLock.unlock();
+        }
+    }
+
+    /**
+     * Common form of interruptIdleWorkers, to avoid having to remember what the
+     * boolean argument means.
+     */
+    private void interruptIdleWorkers()
+    {
+        interruptIdleWorkers(false);
+    }
+
+    private static final boolean ONLY_ONE = true;
+
+    /**
+     * Ensures that unless the pool is stopping, the current thread does not
+     * have its interrupt set. This requires a double-check of state in case the
+     * interrupt was cleared concurrently with a shutdownNow -- if so, the
+     * interrupt is re-enabled.
+     */
+    private void clearInterruptsForTaskRun()
+    {
+        if (runStateLessThan(ctl.get(), STOP) && Thread.interrupted()
+                && runStateAtLeast(ctl.get(), STOP))
+            Thread.currentThread().interrupt();
+    }
+
+    /*
+     * Misc utilities, most of which are also exported to
+     * ScheduledThreadPoolExecutor
+     */
+
+    /**
+     * Invokes the rejected execution handler for the given command.
+     * Package-protected for use by ScheduledThreadPoolExecutor.
+     */
+    final void reject(Runnable command)
+    {
+        handler.rejectedExecution(command, this);
+    }
+
+    /**
+     * Performs any further cleanup following run state transition on invocation
+     * of shutdown. A no-op here, but used by ScheduledThreadPoolExecutor to
+     * cancel delayed tasks.
+     */
+    void onShutdown()
+    {
+    }
+
+    /**
+     * State check needed by ScheduledThreadPoolExecutor to enable running tasks
+     * during shutdown.
+     * 
+     * @param shutdownOK
+     *            true if should return true if SHUTDOWN
+     */
+    final boolean isRunningOrShutdown(boolean shutdownOK)
+    {
+        int rs = runStateOf(ctl.get());
+        return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
+    }
+
+    /**
+     * Drains the task queue into a new list, normally using drainTo. But if the
+     * queue is a DelayQueue or any other kind of queue for which poll or
+     * drainTo may fail to remove some elements, it deletes them one by one.
+     */
+    private List<Runnable> drainQueue()
+    {
+        BlockingQueue<Runnable> q = workQueue;
+        List<Runnable> taskList = new ArrayList<Runnable>();
+        q.drainTo(taskList);
+        if (!q.isEmpty())
+        {
+            for (Runnable r : q.toArray(new Runnable[0]))
+            {
+                if (q.remove(r))
+                    taskList.add(r);
+            }
+        }
+        return taskList;
+    }
+
+    /*
+     * Methods for creating, running and cleaning up after workers
+     */
+
+    /**
+     * Checks if a new worker can be added with respect to current pool state
+     * and the given bound (either core or maximum). If so, the worker count is
+     * adjusted accordingly, and, if possible, a new worker is created and
+     * started running firstTask as its first task. This method returns false if
+     * the pool is stopped or eligible to shut down. It also returns false if
+     * the thread factory fails to create a thread when asked, which requires a
+     * backout of workerCount, and a recheck for termination, in case the
+     * existence of this worker was holding up termination.
+     * 
+     * @param firstTask
+     *            the task the new thread should run first (or null if none).
+     *            Workers are created with an initial first task (in method
+     *            execute()) to bypass queuing when there are fewer than
+     *            corePoolSize threads (in which case we always start one), or
+     *            when the queue is full (in which case we must bypass queue).
+     *            Initially idle threads are usually created via
+     *            prestartCoreThread or to replace other dying workers.
+     * 
+     * @param core
+     *            if true use corePoolSize as bound, else maximumPoolSize. (A
+     *            boolean indicator is used here rather than a value to ensure
+     *            reads of fresh values after checking other pool state).
+     * @return true if successful
+     */
+    private boolean addWorker(Runnable firstTask, boolean core)
+    {
+        retry: for (;;)
+        {
+            int c = ctl.get();
+            int rs = runStateOf(c);
+
+            // Check if queue empty only if necessary.
+            if (rs >= SHUTDOWN
+                    && !(rs == SHUTDOWN && firstTask == null && !workQueue
+                            .isEmpty()))
+                return false;
+
+            for (;;)
+            {
+                int wc = workerCountOf(c);
+                if (wc >= CAPACITY
+                        || wc >= (core ? corePoolSize : maximumPoolSize))
+                    return false;
+                if (compareAndIncrementWorkerCount(c))
+                    break retry;
+                c = ctl.get(); // Re-read ctl
+                if (runStateOf(c) != rs)
+                    continue retry;
+                // else CAS failed due to workerCount change; retry inner loop
+            }
+        }
+
+        Worker w = new Worker(firstTask);
+        Thread t = w.thread;
+
+        final ReentrantLock mainLock = this.mainLock;
+        mainLock.lock();
+        try
+        {
+            // Recheck while holding lock.
+            // Back out on ThreadFactory failure or if
+            // shut down before lock acquired.
+            int c = ctl.get();
+            int rs = runStateOf(c);
+
+            if (t == null
+                    || (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null)))
+            {
+                decrementWorkerCount();
+                tryTerminate();
+                return false;
+            }
+
+            workers.add(w);
+
+            int s = workers.size();
+            if (s > largestPoolSize)
+                largestPoolSize = s;
+        }
+        finally
+        {
+            mainLock.unlock();
+        }
+
+        t.start();
+        // It is possible (but unlikely) for a thread to have been
+        // added to workers, but not yet started, during transition to
+        // STOP, which could result in a rare missed interrupt,
+        // because Thread.interrupt is not guaranteed to have any effect
+        // on a non-yet-started Thread (see Thread#interrupt).
+        if (runStateOf(ctl.get()) == STOP && !t.isInterrupted())
+            t.interrupt();
+
+        return true;
+    }
+
+    /**
+     * Performs cleanup and bookkeeping for a dying worker. Called only from
+     * worker threads. Unless completedAbruptly is set, assumes that workerCount
+     * has already been adjusted to account for exit. This method removes thread
+     * from worker set, and possibly terminates the pool or replaces the worker
+     * if either it exited due to user task exception or if fewer than
+     * corePoolSize workers are running or queue is non-empty but there are no
+     * workers.
+     * 
+     * @param w
+     *            the worker
+     * @param completedAbruptly
+     *            if the worker died due to user exception
+     */
+    private void processWorkerExit(Worker w, boolean completedAbruptly)
+    {
+        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
+            decrementWorkerCount();
+
+        final ReentrantLock mainLock = this.mainLock;
+        mainLock.lock();
+        try
+        {
+            completedTaskCount += w.completedTasks;
+            workers.remove(w);
+        }
+        finally
+        {
+            mainLock.unlock();
+        }
+
+        tryTerminate();
+
+        int c = ctl.get();
+        if (runStateLessThan(c, STOP))
+        {
+            if (!completedAbruptly)
+            {
+                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
+                if (min == 0 && !workQueue.isEmpty())
+                    min = 1;
+                if (workerCountOf(c) >= min)
+                    return; // replacement not needed
+            }
+            addWorker(null, false);
+        }
+    }
+
+    /**
+     * Performs blocking or timed wait for a task, depending on current
+     * configuration settings, or returns null if this worker must exit because
+     * of any of: 1. There are more than maximumPoolSize workers (due to a call
+     * to setMaximumPoolSize). 2. The pool is stopped. 3. The pool is shutdown
+     * and the queue is empty. 4. This worker timed out waiting for a task, and
+     * timed-out workers are subject to termination (that is,
+     * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) both
+     * before and after the timed wait.
+     * 
+     * @return task, or null if the worker must exit, in which case workerCount
+     *         is decremented
+     */
+    private Runnable getTask()
+    {
+        boolean timedOut = false; // Did the last poll() time out?
+
+        retry: for (;;)
+        {
+            int c = ctl.get();
+            int rs = runStateOf(c);
+
+            // Check if queue empty only if necessary.
+            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty()))
+            {
+                decrementWorkerCount();
+                return null;
+            }
+
+            boolean timed; // Are workers subject to culling?
+
+            for (;;)
+            {
+                int wc = workerCountOf(c);
+                timed = allowCoreThreadTimeOut || wc > corePoolSize;
+
+                if (wc <= maximumPoolSize && !(timedOut && timed))
+                    break;
+                if (compareAndDecrementWorkerCount(c))
+                    return null;
+                c = ctl.get(); // Re-read ctl
+                if (runStateOf(c) != rs)
+                    continue retry;
+                // else CAS failed due to workerCount change; retry inner loop
+            }
+
+            try
+            {
+                Runnable r = timed ? workQueue.poll(keepAliveTime,
+                        TimeUnit.NANOSECONDS) : workQueue.take();
+                if (r != null)
+                    return r;
+                timedOut = true;
+            }
+            catch (InterruptedException retry)
+            {
+                timedOut = false;
+            }
+        }
+    }
+
+    /**
+     * Main worker run loop. Repeatedly gets tasks from queue and executes them,
+     * while coping with a number of issues:
+     * 
+     * 1. We may start out with an initial task, in which case we don't need to
+     * get the first one. Otherwise, as long as pool is running, we get tasks
+     * from getTask. If it returns null then the worker exits due to changed
+     * pool state or configuration parameters. Other exits result from exception
+     * throws in external code, in which case completedAbruptly holds, which
+     * usually leads processWorkerExit to replace this thread.
+     * 
+     * 2. Before running any task, the lock is acquired to prevent other pool
+     * interrupts while the task is executing, and clearInterruptsForTaskRun
+     * called to ensure that unless pool is stopping, this thread does not have
+     * its interrupt set.
+     * 
+     * 3. Each task run is preceded by a call to beforeExecute, which might
+     * throw an exception, in which case we cause thread to die (breaking loop
+     * with completedAbruptly true) without processing the task.
+     * 
+     * 4. Assuming beforeExecute completes normally, we run the task, gathering
+     * any of its thrown exceptions to send to afterExecute. We separately
+     * handle RuntimeException, Error (both of which the specs guarantee that we
+     * trap) and arbitrary Throwables. Because we cannot rethrow Throwables
+     * within Runnable.run, we wrap them within Errors on the way out (to the
+     * thread's UncaughtExceptionHandler). Any thrown exception also
+     * conservatively causes thread to die.
+     * 
+     * 5. After task.run completes, we call afterExecute, which may also throw
+     * an exception, which will also cause thread to die. According to JLS Sec
+     * 14.20, this exception is the one that will be in effect even if task.run
+     * throws.
+     * 
+     * The net effect of the exception mechanics is that afterExecute and the
+     * thread's UncaughtExceptionHandler have as accurate information as we can
+     * provide about any problems encountered by user code.
+     * 
+     * @param w
+     *            the worker
+     */
+    final void runWorker(Worker w)
+    {
+        Runnable task = w.firstTask;
+        w.firstTask = null;
+        boolean completedAbruptly = true;
+        try
+        {
+            while (task != null || (task = getTask()) != null)
+            {
+                w.lock();
+                clearInterruptsForTaskRun();
+                try
+                {
+                    beforeExecute(w.thread, task);
+                    Throwable thrown = null;
+                    try
+                    {                                 
+                        /* start in suspended mode to get a handle to the continuation */
+                        Continuation c = Continuation.startSuspendedWith(task);                                              
+                        /* resume the damn continuation */                                                    
+                        c = Continuation.continueWith(c, new ContinuationContext(c));
+                        /* post process the call if need be */                        
+                        ContinuationsExecutor.doPostProcessing(c);
+                    }
+                    catch (RuntimeException x)
+                    {
+                        thrown = x;
+                        throw x;
+                    }
+                    catch (Error x)
+                    {
+                        thrown = x;
+                        throw x;
+                    }
+                    catch (Throwable x)
+                    {
+                        thrown = x;
+                        throw new Error(x);
+                    }
+                    finally
+                    {
+                        afterExecute(task, thrown);
+                    }
+                }
+                finally
+                {
+                    task = null;
+                    w.completedTasks++;
+                    w.unlock();
+                }
+            }
+            completedAbruptly = false;
+        }
+        finally
+        {
+            processWorkerExit(w, completedAbruptly);
+        }
+    }
+
+    // Public constructors and methods
+
+    /**
+     * Creates a new {@code ContinuationsExecutor} with the given initial
+     * parameters and default thread factory and rejected execution handler. It
+     * may be more convenient to use one of the {@link Executors} factory
+     * methods instead of this general purpose constructor.
+     * 
+     * @param corePoolSize
+     *            the number of threads to keep in the pool, even if they are
+     *            idle, unless {@code allowCoreThreadTimeOut} is set
+     * @param maximumPoolSize
+     *            the maximum number of threads to allow in the pool
+     * @param keepAliveTime
+     *            when the number of threads is greater than the core, this is
+     *            the maximum time that excess idle threads will wait for new
+     *            tasks before terminating.
+     * @param unit
+     *            the time unit for the {@code keepAliveTime} argument
+     * @param workQueue
+     *            the queue to use for holding tasks before they are executed.
+     *            This queue will hold only the {@code Runnable} tasks submitted
+     *            by the {@code execute} method.
+     * @throws IllegalArgumentException
+     *             if one of the following holds:<br>
+     *             {@code corePoolSize < 0}<br>
+     *             {@code keepAliveTime < 0}<br>
+     *             {@code maximumPoolSize <= 0}<br>
+     *             {@code maximumPoolSize < corePoolSize}
+     * @throws NullPointerException
+     *             if {@code workQueue} is null
+     */
+    public ContinuationsExecutor(int corePoolSize, int maximumPoolSize,
+            long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
+    {
+        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
+                Executors.defaultThreadFactory(), defaultHandler);
+    }
+
+    /**
+     * Creates a new {@code ContinuationsExecutor} with the given initial
+     * parameters and default rejected execution handler.
+     * 
+     * @param corePoolSize
+     *            the number of threads to keep in the pool, even if they are
+     *            idle, unless {@code allowCoreThreadTimeOut} is set
+     * @param maximumPoolSize
+     *            the maximum number of threads to allow in the pool
+     * @param keepAliveTime
+     *            when the number of threads is greater than the core, this is
+     *            the maximum time that excess idle threads will wait for new
+     *            tasks before terminating.
+     * @param unit
+     *            the time unit for the {@code keepAliveTime} argument
+     * @param workQueue
+     *            the queue to use for holding tasks before they are executed.
+     *            This queue will hold only the {@code Runnable} tasks submitted
+     *            by the {@code execute} method.
+     * @param threadFactory
+     *            the factory to use when the executor creates a new thread
+     * @throws IllegalArgumentException
+     *             if one of the following holds:<br>
+     *             {@code corePoolSize < 0}<br>
+     *             {@code keepAliveTime < 0}<br>
+     *             {@code maximumPoolSize <= 0}<br>
+     *             {@code maximumPoolSize < corePoolSize}
+     * @throws NullPointerException
+     *             if {@code workQueue} or {@code threadFactory} is null
+     */
+    public ContinuationsExecutor(int corePoolSize, int maximumPoolSize,
+            long keepAliveTime, TimeUnit unit,
+            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
+    {
+        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
+                threadFactory, defaultHandler);
+    }
+
+    /**
+     * Creates a new {@code ContinuationsExecutor} with the given initial
+     * parameters and default thread factory.
+     * 
+     * @param corePoolSize
+     *            the number of threads to keep in the pool, even if they are
+     *            idle, unless {@code allowCoreThreadTimeOut} is set
+     * @param maximumPoolSize
+     *            the maximum number of threads to allow in the pool
+     * @param keepAliveTime
+     *            when the number of threads is greater than the core, this is
+     *            the maximum time that excess idle threads will wait for new
+     *            tasks before terminating.
+     * @param unit
+     *            the time unit for the {@code keepAliveTime} argument
+     * @param workQueue
+     *            the queue to use for holding tasks before they are executed.
+     *            This queue will hold only the {@code Runnable} tasks submitted
+     *            by the {@code execute} method.
+     * @param handler
+     *            the handler to use when execution is blocked because the
+     *            thread bounds and queue capacities are reached
+     * @throws IllegalArgumentException
+     *             if one of the following holds:<br>
+     *             {@code corePoolSize < 0}<br>
+     *             {@code keepAliveTime < 0}<br>
+     *             {@code maximumPoolSize <= 0}<br>
+     *             {@code maximumPoolSize < corePoolSize}
+     * @throws NullPointerException
+     *             if {@code workQueue} or {@code handler} is null
+     */
+    public ContinuationsExecutor(int corePoolSize, int maximumPoolSize,
+            long keepAliveTime, TimeUnit unit,
+            BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
+    {
+        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
+                Executors.defaultThreadFactory(), handler);
+    }
+
+    /**
+     * Creates a new {@code ContinuationsExecutor} with the given initial
+     * parameters.
+     * 
+     * @param corePoolSize
+     *            the number of threads to keep in the pool, even if they are
+     *            idle, unless {@code allowCoreThreadTimeOut} is set
+     * @param maximumPoolSize
+     *            the maximum number of threads to allow in the pool
+     * @param keepAliveTime
+     *            when the number of threads is greater than the core, this is
+     *            the maximum time that excess idle threads will wait for new
+     *            tasks before terminating.
+     * @param unit
+     *            the time unit for the {@code keepAliveTime} argument
+     * @param workQueue
+     *            the queue to use for holding tasks before they are executed.
+     *            This queue will hold only the {@code Runnable} tasks submitted
+     *            by the {@code execute} method.
+     * @param threadFactory
+     *            the factory to use when the executor creates a new thread
+     * @param handler
+     *            the handler to use when execution is blocked because the
+     *            thread bounds and queue capacities are reached
+     * @throws IllegalArgumentException
+     *             if one of the following holds:<br>
+     *             {@code corePoolSize < 0}<br>
+     *             {@code keepAliveTime < 0}<br>
+     *             {@code maximumPoolSize <= 0}<br>
+     *             {@code maximumPoolSize < corePoolSize}
+     * @throws NullPointerException
+     *             if {@code workQueue} or {@code threadFactory} or
+     *             {@code handler} is null
+     */
+    public ContinuationsExecutor(int corePoolSize, int maximumPoolSize,
+            long keepAliveTime, TimeUnit unit,
+            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
+            RejectedExecutionHandler handler)
+    {
+        if (corePoolSize < 0 || maximumPoolSize <= 0
+                || maximumPoolSize < corePoolSize || keepAliveTime < 0)
+            throw new IllegalArgumentException();
+        if (workQueue == null || threadFactory == null || handler == null)
+            throw new NullPointerException();
+        this.corePoolSize = corePoolSize;
+        this.maximumPoolSize = maximumPoolSize;
+        this.workQueue = workQueue;
+        this.keepAliveTime = unit.toNanos(keepAliveTime);
+        this.threadFactory = threadFactory;
+        this.handler = handler;
+    }
+
+    /**
+     * Executes the given task sometime in the future. The task may execute in a
+     * new thread or in an existing pooled thread.
+     * 
+     * If the task cannot be submitted for execution, either because this
+     * executor has been shutdown or because its capacity has been reached, the
+     * task is handled by the current {@code RejectedExecutionHandler}.
+     * 
+     * @param command
+     *            the task to execute
+     * @throws RejectedExecutionException
+     *             at discretion of {@code RejectedExecutionHandler}, if the
+     *             task cannot be accepted for execution
+     * @throws NullPointerException
+     *             if {@code command} is null
+     */
+    public void execute(Runnable command)
+    {
+        if (command == null)
+            throw new NullPointerException();
+        /*
+         * Proceed in 3 steps:
+         * 
+         * 1. If fewer than corePoolSize threads are running, try to start a new
+         * thread with the given command as its first task. The call to
+         * addWorker atomically checks runState and workerCount, and so prevents
+         * false alarms that would add threads when it shouldn't, by returning
+         * false.
+         * 
+         * 2. If a task can be successfully queued, then we still need to
+         * double-check whether we should have added a thread (because existing
+         * ones died since last checking) or that the pool shut down since entry
+         * into this method. So we recheck state and if necessary roll back the
+         * enqueuing if stopped, or start a new thread if there are none.
+         * 
+         * 3. If we cannot queue task, then we try to add a new thread. If it
+         * fails, we know we are shut down or saturated and so reject the task.
+         */
+        int c = ctl.get();
+        if (workerCountOf(c) < corePoolSize)
+        {
+            if (addWorker(command, true))
+                return;
+            c = ctl.get();
+        }
+        if (isRunning(c) && workQueue.offer(command))
+        {
+            int recheck = ctl.get();
+            if (!isRunning(recheck) && remove(command))
+                reject(command);
+            else if (workerCountOf(recheck) == 0)
+                addWorker(null, false);
+        }
+        else if (!addWorker(command, false))
+            reject(command);
+    }
+
+    /**
+     * Initiates an orderly shutdown in which previously submitted tasks are
+     * executed, but no new tasks will be accepted. Invocation has no additional
+     * effect if already shut down.
+     * 
+     * @throws SecurityException
+     *             {@inheritDoc}
+     */
+    public void shutdown()
+    { 
+        /*
+        final ReentrantLock mainLock = this.mainLock;
+        mainLock.lock();
+        try
+        {
+            checkShutdownAccess();
+            advanceRunState(SHUTDOWN);
+            interruptIdleWorkers();
+            onShutdown(); // hook for ScheduledThreadPoolExecutor
+        }
+        finally
+        {
+            mainLock.unlock();
+        }
+        tryTerminate(); 
+        */           
+    }
+
+    /**
+     * Attempts to stop all actively executing tasks, halts the processing of
+     * waiting tasks, and returns a list of the tasks that were awaiting
+     * execution. These tasks are drained (removed) from the task queue upon
+     * return from this method.
+     * 
+     * <p>
+     * There are no guarantees beyond best-effort attempts to stop processing
+     * actively executing tasks. This implementation cancels tasks via
+     * {@link Thread#interrupt}, so any task that fails to respond to
+     * interrupts may never terminate.
+     * 
+     * @throws SecurityException
+     *             {@inheritDoc}
+     */
+    public List<Runnable> shutdownNow()
+    {
+        List<Runnable> tasks;
+        final ReentrantLock mainLock = this.mainLock;
+        mainLock.lock();
+        try
+        {
+            checkShutdownAccess();
+            advanceRunState(STOP);
+            interruptWorkers();
+            tasks = drainQueue();
+        }
+        finally
+        {
+            mainLock.unlock();
+        }
+        tryTerminate();
+        return tasks;
+    }
+
+    public boolean isShutdown()
+    {
+        return !isRunning(ctl.get());
+    }
+
+    /**
+     * Returns true if this executor is in the process of terminating after
+     * {@link #shutdown} or {@link #shutdownNow} but has not completely
+     * terminated. This method may be useful for debugging. A return of
+     * {@code true} reported a sufficient period after shutdown may indicate
+     * that submitted tasks have ignored or suppressed interruption, causing
+     * this executor not to properly terminate.
+     * 
+     * @return true if terminating but not yet terminated
+     */
+    public boolean isTerminating()
+    {
+        int c = ctl.get();
+        return !isRunning(c) && runStateLessThan(c, TERMINATED);
+    }
+
+    public boolean isTerminated()
+    {
+        return runStateAtLeast(ctl.get(), TERMINATED);
+    }
+
+    public boolean awaitTermination(long timeout, TimeUnit unit)
+            throws InterruptedException
+    {
+        long nanos = unit.toNanos(timeout);
+        final ReentrantLock mainLock = this.mainLock;
+        mainLock.lock();
+        try
+        {
+            for (;;)
+            {
+                if (runStateAtLeast(ctl.get(), TERMINATED))
+                    return true;
+                if (nanos <= 0)
+                    return false;
+                nanos = termination.awaitNanos(nanos);
+            }
+        }
+        finally
+        {
+            mainLock.unlock();
+        }
+    }
+
+    /**
+     * Invokes {@code shutdown} when this executor is no longer referenced and
+     * it has no threads.
+     */
+    protected void finalize()
+    {
+        shutdown();
+    }
+
+    /**
+     * Sets the thread factory used to create new threads.
+     * 
+     * @param threadFactory
+     *            the new thread factory
+     * @throws NullPointerException
+     *             if threadFactory is null
+     * @see #getThreadFactory
+     */
+    public void setThreadFactory(ThreadFactory threadFactory)
+    {
+        if (threadFactory == null)
+            throw new NullPointerException();
+        this.threadFactory = threadFactory;
+    }
+
+    /**
+     * Returns the thread factory used to create new threads.
+     * 
+     * @return the current thread factory
+     * @see #setThreadFactory
+     */
+    public ThreadFactory getThreadFactory()
+    {
+        return threadFactory;
+    }
+
+    /**
+     * Sets a new handler for unexecutable tasks.
+     * 
+     * @param handler
+     *            the new handler
+     * @throws NullPointerException
+     *             if handler is null
+     * @see #getRejectedExecutionHandler
+     */
+    public void setRejectedExecutionHandler(RejectedExecutionHandler handler)
+    {
+        if (handler == null)
+            throw new NullPointerException();
+        this.handler = handler;
+    }
+
+    /**
+     * Returns the current handler for unexecutable tasks.
+     * 
+     * @return the current handler
+     * @see #setRejectedExecutionHandler
+     */
+    public RejectedExecutionHandler getRejectedExecutionHandler()
+    {
+        return handler;
+    }
+
+    /**
+     * Sets the core number of threads. This overrides any value set in the
+     * constructor. If the new value is smaller than the current value, excess
+     * existing threads will be terminated when they next become idle. If
+     * larger, new threads will, if needed, be started to execute any queued
+     * tasks.
+     * 
+     * @param corePoolSize
+     *            the new core size
+     * @throws IllegalArgumentException
+     *             if {@code corePoolSize < 0}
+     * @see #getCorePoolSize
+     */
+    public void setCorePoolSize(int corePoolSize)
+    {
+        if (corePoolSize < 0)
+            throw new IllegalArgumentException();
+        int delta = corePoolSize - this.corePoolSize;
+        this.corePoolSize = corePoolSize;
+        if (workerCountOf(ctl.get()) > corePoolSize)
+            interruptIdleWorkers();
+        else if (delta > 0)
+        {
+            // We don't really know how many new threads are "needed".
+            // As a heuristic, prestart enough new workers (up to new
+            // core size) to handle the current number of tasks in
+            // queue, but stop if queue becomes empty while doing so.
+            int k = Math.min(delta, workQueue.size());
+            while (k-- > 0 && addWorker(null, true))
+            {
+                if (workQueue.isEmpty())
+                    break;
+            }
+        }
+    }
+
+    /**
+     * Returns the core number of threads.
+     * 
+     * @return the core number of threads
+     * @see #setCorePoolSize
+     */
+    public int getCorePoolSize()
+    {
+        return corePoolSize;
+    }
+
+    /**
+     * Starts a core thread, causing it to idly wait for work. This overrides
+     * the default policy of starting core threads only when new tasks are
+     * executed. This method will return {@code false} if all core threads have
+     * already been started.
+     * 
+     * @return {@code true} if a thread was started
+     */
+    public boolean prestartCoreThread()
+    {
+        return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true);
+    }
+
+    /**
+     * Starts all core threads, causing them to idly wait for work. This
+     * overrides the default policy of starting core threads only when new tasks
+     * are executed.
+     * 
+     * @return the number of threads started
+     */
+    public int prestartAllCoreThreads()
+    {
+        int n = 0;
+        while (addWorker(null, true))
+            ++n;
+        return n;
+    }
+
+    /**
+     * Returns true if this pool allows core threads to time out and terminate
+     * if no tasks arrive within the keepAlive time, being replaced if needed
+     * when new tasks arrive. When true, the same keep-alive policy applying to
+     * non-core threads applies also to core threads. When false (the default),
+     * core threads are never terminated due to lack of incoming tasks.
+     * 
+     * @return {@code true} if core threads are allowed to time out, else
+     *         {@code false}
+     * 
+     * @since 1.6
+     */
+    public boolean allowsCoreThreadTimeOut()
+    {
+        return allowCoreThreadTimeOut;
+    }
+
+    /**
+     * Sets the policy governing whether core threads may time out and terminate
+     * if no tasks arrive within the keep-alive time, being replaced if needed
+     * when new tasks arrive. When false, core threads are never terminated due
+     * to lack of incoming tasks. When true, the same keep-alive policy applying
+     * to non-core threads applies also to core threads. To avoid continual
+     * thread replacement, the keep-alive time must be greater than zero when
+     * setting {@code true}. This method should in general be called before the
+     * pool is actively used.
+     * 
+     * @param value
+     *            {@code true} if should time out, else {@code false}
+     * @throws IllegalArgumentException
+     *             if value is {@code true} and the current keep-alive time is
+     *             not greater than zero
+     * 
+     * @since 1.6
+     */
+    public void allowCoreThreadTimeOut(boolean value)
+    {
+        if (value && keepAliveTime <= 0)
+            throw new IllegalArgumentException(
+                    "Core threads must have nonzero keep alive times");
+        if (value != allowCoreThreadTimeOut)
+        {
+            allowCoreThreadTimeOut = value;
+            if (value)
+                interruptIdleWorkers();
+        }
+    }
+
+    /**
+     * Sets the maximum allowed number of threads. This overrides any value set
+     * in the constructor. If the new value is smaller than the current value,
+     * excess existing threads will be terminated when they next become idle.
+     * 
+     * @param maximumPoolSize
+     *            the new maximum
+     * @throws IllegalArgumentException
+     *             if the new maximum is less than or equal to zero, or less
+     *             than the {@linkplain #getCorePoolSize core pool size}
+     * @see #getMaximumPoolSize
+     */
+    public void setMaximumPoolSize(int maximumPoolSize)
+    {
+        if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
+            throw new IllegalArgumentException();
+        this.maximumPoolSize = maximumPoolSize;
+        if (workerCountOf(ctl.get()) > maximumPoolSize)
+            interruptIdleWorkers();
+    }
+
+    /**
+     * Returns the maximum allowed number of threads.
+     * 
+     * @return the maximum allowed number of threads
+     * @see #setMaximumPoolSize
+     */
+    public int getMaximumPoolSize()
+    {
+        return maximumPoolSize;
+    }
+
+    /**
+     * Sets the time limit for which threads may remain idle before being
+     * terminated. If there are more than the core number of threads currently
+     * in the pool, after waiting this amount of time without processing a task,
+     * excess threads will be terminated. This overrides any value set in the
+     * constructor.
+     * 
+     * @param time
+     *            the time to wait. A time value of zero will cause excess
+     *            threads to terminate immediately after executing tasks.
+     * @param unit
+     *            the time unit of the {@code time} argument
+     * @throws IllegalArgumentException
+     *             if {@code time} less than zero or if {@code time} is zero and
+     *             {@code allowsCoreThreadTimeOut}
+     * @see #getKeepAliveTime
+     */
+    public void setKeepAliveTime(long time, TimeUnit unit)
+    {
+        if (time < 0)
+            throw new IllegalArgumentException();
+        if (time == 0 && allowsCoreThreadTimeOut())
+            throw new IllegalArgumentException(
+                    "Core threads must have nonzero keep alive times");
+        long keepAliveTime = unit.toNanos(time);
+        long delta = keepAliveTime - this.keepAliveTime;
+        this.keepAliveTime = keepAliveTime;
+        if (delta < 0)
+            interruptIdleWorkers();
+    }
+
+    /**
+     * Returns the thread keep-alive time, which is the amount of time that
+     * threads in excess of the core pool size may remain idle before being
+     * terminated.
+     * 
+     * @param unit
+     *            the desired time unit of the result
+     * @return the time limit
+     * @see #setKeepAliveTime
+     */
+    public long getKeepAliveTime(TimeUnit unit)
+    {
+        return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
+    }
+
+    /* User-level queue utilities */
+
+    /**
+     * Returns the task queue used by this executor. Access to the task queue is
+     * intended primarily for debugging and monitoring. This queue may be in
+     * active use. Retrieving the task queue does not prevent queued tasks from
+     * executing.
+     * 
+     * @return the task queue
+     */
+    public BlockingQueue<Runnable> getQueue()
+    {
+        return workQueue;
+    }
+
+    /**
+     * Removes this task from the executor's internal queue if it is present,
+     * thus causing it not to be run if it has not already started.
+     * 
+     * <p>
+     * This method may be useful as one part of a cancellation scheme. It may
+     * fail to remove tasks that have been converted into other forms before
+     * being placed on the internal queue. For example, a task entered using
+     * {@code submit} might be converted into a form that maintains
+     * {@code Future} status. However, in such cases, method {@link #purge} may
+     * be used to remove those Futures that have been cancelled.
+     * 
+     * @param task
+     *            the task to remove
+     * @return true if the task was removed
+     */
+    public boolean remove(Runnable task)
+    {
+        boolean removed = workQueue.remove(task);
+        tryTerminate(); // In case SHUTDOWN and now empty
+        return removed;
+    }
+
+    /**
+     * Tries to remove from the work queue all {@link Future} tasks that have
+     * been cancelled. This method can be useful as a storage reclamation
+     * operation, that has no other impact on functionality. Cancelled tasks are
+     * never executed, but may accumulate in work queues until worker threads
+     * can actively remove them. Invoking this method instead tries to remove
+     * them now. However, this method may fail to remove tasks in the presence
+     * of interference by other threads.
+     */
+    public void purge()
+    {
+        final BlockingQueue<Runnable> q = workQueue;
+        try
+        {
+            Iterator<Runnable> it = q.iterator();
+            while (it.hasNext())
+            {
+                Runnable r = it.next();
+                if (r instanceof Future<?> && ((Future<?>) r).isCancelled())
+                    it.remove();
+            }
+        }
+        catch (ConcurrentModificationException fallThrough)
+        {
+            // Take slow path if we encounter interference during traversal.
+            // Make copy for traversal and call remove for cancelled entries.
+            // The slow path is more likely to be O(N*N).
+            for (Object r : q.toArray())
+                if (r instanceof Future<?> && ((Future<?>) r).isCancelled())
+                    q.remove(r);
+        }
+
+        tryTerminate(); // In case SHUTDOWN and now empty
+    }
+
+    /* Statistics */
+
+    /**
+     * Returns the current number of threads in the pool.
+     * 
+     * @return the number of threads
+     */
+    public int getPoolSize()
+    {
+        final ReentrantLock mainLock = this.mainLock;
+        mainLock.lock();
+        try
+        {
+            // Remove rare and surprising possibility of
+            // isTerminated() && getPoolSize() > 0
+            return runStateAtLeast(ctl.get(), TIDYING) ? 0 : workers.size();
+        }
+        finally
+        {
+            mainLock.unlock();
+        }
+    }
+
+    /**
+     * Returns the approximate number of threads that are actively executing
+     * tasks.
+     * 
+     * @return the number of threads
+     */
+    public int getActiveCount()
+    {
+        final ReentrantLock mainLock = this.mainLock;
+        mainLock.lock();
+        try
+        {
+            int n = 0;
+            for (Worker w : workers)
+                if (w.isLocked())
+                    ++n;
+            return n;
+        }
+        finally
+        {
+            mainLock.unlock();
+        }
+    }
+
+    /**
+     * Returns the largest number of threads that have ever simultaneously been
+     * in the pool.
+     * 
+     * @return the number of threads
+     */
+    public int getLargestPoolSize()
+    {
+        final ReentrantLock mainLock = this.mainLock;
+        mainLock.lock();
+        try
+        {
+            return largestPoolSize;
+        }
+        finally
+        {
+            mainLock.unlock();
+        }
+    }
+
+    /**
+     * Returns the approximate total number of tasks that have ever been
+     * scheduled for execution. Because the states of tasks and threads may
+     * change dynamically during computation, the returned value is only an
+     * approximation.
+     * 
+     * @return the number of tasks
+     */
+    public long getTaskCount()
+    {
+        final ReentrantLock mainLock = this.mainLock;
+        mainLock.lock();
+        try
+        {
+            long n = completedTaskCount;
+            for (Worker w : workers)
+            {
+                n += w.completedTasks;
+                if (w.isLocked())
+                    ++n;
+            }
+            return n + workQueue.size();
+        }
+        finally
+        {
+            mainLock.unlock();
+        }
+    }
+
+    /**
+     * Returns the approximate total number of tasks that have completed
+     * execution. Because the states of tasks and threads may change dynamically
+     * during computation, the returned value is only an approximation, but one
+     * that does not ever decrease across successive calls.
+     * 
+     * @return the number of tasks
+     */
+    public long getCompletedTaskCount()
+    {
+        final ReentrantLock mainLock = this.mainLock;
+        mainLock.lock();
+        try
+        {
+            long n = completedTaskCount;
+            for (Worker w : workers)
+                n += w.completedTasks;
+            return n;
+        }
+        finally
+        {
+            mainLock.unlock();
+        }
+    }
+
+    /* Extension hooks */
+
+    /**
+     * Method invoked prior to executing the given Runnable in the given thread.
+     * This method is invoked by thread {@code t} that will execute task
+     * {@code r}, and may be used to re-initialize ThreadLocals, or to perform
+     * logging.
+     * 
+     * <p>
+     * This implementation does nothing, but may be customized in subclasses.
+     * Note: To properly nest multiple overridings, subclasses should generally
+     * invoke {@code super.beforeExecute} at the end of this method.
+     * 
+     * @param t
+     *            the thread that will run task {@code r}
+     * @param r
+     *            the task that will be executed
+     */
+    protected void beforeExecute(Thread t, Runnable r)
+    {         
+    }
+
+    /**
+     * Method invoked upon completion of execution of the given Runnable. This
+     * method is invoked by the thread that executed the task. If non-null, the
+     * Throwable is the uncaught {@code RuntimeException} or {@code Error} that
+     * caused execution to terminate abruptly.
+     * 
+     * <p>
+     * This implementation does nothing, but may be customized in subclasses.
+     * Note: To properly nest multiple overridings, subclasses should generally
+     * invoke {@code super.afterExecute} at the beginning of this method.
+     * 
+     * <p>
+     * <b>Note:</b> When actions are enclosed in tasks (such as
+     * {@link FutureTask}) either explicitly or via methods such as
+     * {@code submit}, these task objects catch and maintain computational
+     * exceptions, and so they do not cause abrupt termination, and the internal
+     * exceptions are <em>not</em> passed to this method. If you would like to
+     * trap both kinds of failures in this method, you can further probe for
+     * such cases, as in this sample subclass that prints either the direct
+     * cause or the underlying exception if a task has been aborted:
+     * 
+     * <pre>
+     * {
+     *     @code
+     *     class ExtendedExecutor extends ContinuationsExecutor
+     *     {
+     *         // ...
+     *         protected void afterExecute(Runnable r, Throwable t)
+     *         {
+     *             super.afterExecute(r, t);
+     *             if (t == null &amp;&amp; r instanceof Future&lt;?&gt;)
+     *             {
+     *                 try
+     *                 {
+     *                     Object result = ((Future&lt;?&gt;) r).get();
+     *                 }
+     *                 catch (CancellationException ce)
+     *                 {
+     *                     t = ce;
+     *                 }
+     *                 catch (ExecutionException ee)
+     *                 {
+     *                     t = ee.getCause();
+     *                 }
+     *                 catch (InterruptedException ie)
+     *                 {
+     *                     Thread.currentThread().interrupt(); // ignore/reset
+     *                 }
+     *             }
+     *             if (t != null)
+     *                 System.out.println(t);
+     *         }
+     *     }
+     * }
+     * </pre>
+     * 
+     * @param r
+     *            the runnable that has completed
+     * @param t
+     *            the exception that caused termination, or null if execution
+     *            completed normally
+     */
+    protected void afterExecute(Runnable r, Throwable t)
+    {
+        if ( t != null )
+            logger_.info( LogUtil.throwableToString(t) );
+    }
+
+    /**
+     * Method invoked when the Executor has terminated. Default implementation
+     * does nothing. Note: To properly nest multiple overridings, subclasses
+     * should generally invoke {@code super.terminated} within this method.
+     */
+    protected void terminated()
+    {
+    }
+
+    /* Predefined RejectedExecutionHandlers */
+
+    /**
+     * A handler for rejected tasks that runs the rejected task directly in the
+     * calling thread of the {@code execute} method, unless the executor has
+     * been shut down, in which case the task is discarded.
+     */
+    public static class CallerRunsPolicy implements RejectedExecutionHandler
+    {
+        /**
+         * Creates a {@code CallerRunsPolicy}.
+         */
+        public CallerRunsPolicy()
+        {
+        }
+
+        /**
+         * Executes task r in the caller's thread, unless the executor has been
+         * shut down, in which case the task is discarded.
+         * 
+         * @param r
+         *            the runnable task requested to be executed
+         * @param e
+         *            the executor attempting to execute this task
+         */
+        public void rejectedExecution(Runnable r, ContinuationsExecutor e)
+        {
+            if (!e.isShutdown())
+            {
+                r.run();
+            }
+        }
+    }
+
+    /**
+     * A handler for rejected tasks that throws a
+     * {@code RejectedExecutionException}.
+     */
+    public static class AbortPolicy implements RejectedExecutionHandler
+    {
+        /**
+         * Creates an {@code AbortPolicy}.
+         */
+        public AbortPolicy()
+        {
+        }
+
+        /**
+         * Always throws RejectedExecutionException.
+         * 
+         * @param r
+         *            the runnable task requested to be executed
+         * @param e
+         *            the executor attempting to execute this task
+         * @throws RejectedExecutionException
+         *             always.
+         */
+        public void rejectedExecution(Runnable r, ContinuationsExecutor e)
+        {
+            throw new RejectedExecutionException();
+        }
+    }
+
+    /**
+     * A handler for rejected tasks that silently discards the rejected task.
+     */
+    public static class DiscardPolicy implements RejectedExecutionHandler
+    {
+        /**
+         * Creates a {@code DiscardPolicy}.
+         */
+        public DiscardPolicy()
+        {
+        }
+
+        /**
+         * Does nothing, which has the effect of discarding task r.
+         * 
+         * @param r
+         *            the runnable task requested to be executed
+         * @param e
+         *            the executor attempting to execute this task
+         */
+        public void rejectedExecution(Runnable r, ContinuationsExecutor e)
+        {
+        }
+    }
+
+    /**
+     * A handler for rejected tasks that discards the oldest unhandled request
+     * and then retries {@code execute}, unless the executor is shut down, in
+     * which case the task is discarded.
+     */
+    public static class DiscardOldestPolicy implements RejectedExecutionHandler
+    {
+        /**
+         * Creates a {@code DiscardOldestPolicy} for the given executor.
+         */
+        public DiscardOldestPolicy()
+        {
+        }
+
+        /**
+         * Obtains and ignores the next task that the executor
+         * would otherwise execute, if one is immediately available,
+         * and then retries execution of task r, unless the executor
+         * is shut down, in which case task r is instead discarded.
+         *
+         * @param r the runnable task requested to be executed
+         * @param e the executor attempting to execute this task
+         */
+        public void rejectedExecution(Runnable r, ContinuationsExecutor e)
+        {
+            if (!e.isShutdown())
+            {
+                e.getQueue().poll();
+                e.execute(r);
+            }
+        }
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.*;
+
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+
+/**
+ * This is a wrapper class for the <i>ScheduledThreadPoolExecutor</i>. It provides an implementation
+ * for the <i>afterExecute()</i> found in the <i>ThreadPoolExecutor</i> class to log any unexpected 
+ * Runtime Exceptions.
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public final class DebuggableScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor
+{
+    private static Logger logger_ = Logger.getLogger(DebuggableScheduledThreadPoolExecutor.class);
+    
+    public DebuggableScheduledThreadPoolExecutor(int threads,
+            ThreadFactory threadFactory)
+    {
+        super(threads, threadFactory);        
+    }
+    
+    /**
+     *  (non-Javadoc)
+     * @see java.util.concurrent.ThreadPoolExecutor#afterExecute(java.lang.Runnable, java.lang.Throwable)
+     */
+    public void afterExecute(Runnable r, Throwable t)
+    {
+        super.afterExecute(r,t);
+        if ( t != null )
+        {  
+            Context ctx = ThreadLocalContext.get();
+            if ( ctx != null )
+            {
+                Object object = ctx.get(r.getClass().getName());
+                
+                if ( object != null )
+                {
+                    logger_.info("**** In afterExecute() " + t.getClass().getName() + " occured while working with " + object + " ****");
+                }
+                else
+                {
+                    logger_.info("**** In afterExecute() " + t.getClass().getName() + " occured ****");
+                }
+            }
+            
+            Throwable cause = t.getCause();
+            if ( cause != null )
+            {
+                logger_.info( LogUtil.throwableToString(cause) );
+            }
+            logger_.info( LogUtil.throwableToString(t) );
+        }
+    }
+}