You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by ga...@apache.org on 2008/03/07 20:56:28 UTC

svn commit: r634792 [3/9] - in /geronimo/sandbox/concurrent: ./ concurrent-deployer/ concurrent-deployer/src/ concurrent-deployer/src/main/ concurrent-deployer/src/main/plan/ concurrent/ concurrent/src/ concurrent/src/main/ concurrent/src/main/plan/ ge...

Added: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/AbstractManagedScheduledExecutorService.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/AbstractManagedScheduledExecutorService.java?rev=634792&view=auto
==============================================================================
--- geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/AbstractManagedScheduledExecutorService.java (added)
+++ geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/AbstractManagedScheduledExecutorService.java Fri Mar  7 11:56:14 2008
@@ -0,0 +1,695 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/*
+ * This class is based on and borrows code from java.util.concurrent.ScheduledThreadPoolExecutor
+ * class in Apache Harmony.
+ */
+package org.apache.geronimo.concurrent.executor;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.util.concurrent.ManagedScheduledExecutorService;
+import javax.util.concurrent.ManagedTaskListener;
+import javax.util.concurrent.Trigger;
+
+import org.apache.geronimo.concurrent.ManagedContext;
+import org.apache.geronimo.concurrent.ManagedTaskListenerSupport;
+
+public abstract class AbstractManagedScheduledExecutorService 
+    extends AbstractManagedExecutorService 
+    implements ManagedScheduledExecutorService {
+
+    /**
+     * False if should cancel/suppress periodic tasks on shutdown.
+     */
+    private volatile boolean continueExistingPeriodicTasksAfterShutdown;
+
+    /**
+     * False if should cancel non-periodic tasks on shutdown.
+     */
+    private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
+
+    /**
+     * Sequence number to break scheduling ties, and in turn to
+     * guarantee FIFO order among tied entries.
+     */
+    private static final AtomicLong sequencer = new AtomicLong(0);
+
+    private class ScheduledFutureTask<V> 
+            extends ManagedFutureTask<V> implements ScheduledFuture<V> {
+                        
+        /** Sequence number to break ties FIFO */
+        private final long sequenceNumber;
+        
+        private final Trigger trigger;
+        
+        private Date lastActualRunTime;
+        private Date lastScheduledRunTime;
+        private Date lastCompleteTime;
+
+        ScheduledFutureTask(Runnable r, 
+                            V result, 
+                            ManagedContext managedContext,
+                            ManagedTaskListenerSupport listener,
+                            Date triggerTime) {
+            super(r, result, managedContext, listener);
+            this.lastScheduledRunTime = triggerTime;
+            this.trigger = null;
+            this.sequenceNumber = sequencer.getAndIncrement();
+        }
+        
+        ScheduledFutureTask(Runnable r, 
+                            V result,
+                            ManagedContext managedContext,
+                            ManagedTaskListenerSupport listener,
+                            Trigger trigger) {
+            super(r, result, managedContext, listener);
+            this.trigger = trigger;
+            this.lastScheduledRunTime = 
+                trigger.getNextRunTime(this, new Date(), null, null, null);           
+            this.sequenceNumber = sequencer.getAndIncrement();
+        }
+
+        ScheduledFutureTask(Callable<V> callable,
+                            ManagedContext managedContext,
+                            ManagedTaskListenerSupport listener,
+                            Date triggerTime) { 
+            super(callable, managedContext, listener);
+            this.lastScheduledRunTime = triggerTime;
+            this.trigger = null;
+            this.sequenceNumber = sequencer.getAndIncrement();
+        }
+        
+        ScheduledFutureTask(Callable<V> callable,
+                            ManagedContext managedContext,
+                            ManagedTaskListenerSupport listener,
+                            Trigger trigger) {
+            super(callable, managedContext, listener);
+            this.trigger = trigger;
+            this.lastScheduledRunTime = 
+                trigger.getNextRunTime(this, new Date(), null, null, null);
+            this.sequenceNumber = sequencer.getAndIncrement();
+        }
+        
+        public long getDelay(TimeUnit unit) {
+            long d = unit.convert(this.lastScheduledRunTime.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);         
+            return d;
+        }
+
+        public int compareTo(Delayed other) {
+            if (other == this) // compare zero ONLY if same object
+                return 0;
+            ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
+            long diff = this.lastScheduledRunTime.getTime() - x.lastScheduledRunTime.getTime();
+            if (diff < 0)
+                return -1;
+            else if (diff > 0)
+                return 1;
+            else if (sequenceNumber < x.sequenceNumber)
+                return -1;
+            else
+                return 1;
+        }
+
+        /**
+         * Returns true if this is a periodic (not a one-shot) action.
+         * @return true if periodic
+         */
+        boolean isPeriodic() {
+            return this.trigger != null;
+        }
+
+        /**
+         * Run a periodic task
+         */
+        private void runPeriodicSub() {
+            boolean ok = false;
+            if (isCancelled()) {
+                ok = false;
+                if (this.listenerSupport != null) {
+                    this.listenerSupport.taskDone(this, null);
+                }
+            } else {
+                if (this.trigger.skipRun(this, this.lastScheduledRunTime)) {
+                    // skip run, task is rescheduled only if not cancelled
+                    ok = super.setSkipped();
+                    if (this.listenerSupport != null) {
+                        this.listenerSupport.taskDone(this, null);
+                    }
+                } else {
+                    // run now
+                    this.lastActualRunTime = new Date();
+                    ok = super.runAndReset();
+                    this.lastCompleteTime = new Date();
+                }
+            }
+            boolean down = isShutdown();
+            // Reschedule if not cancelled and not shutdown or policy allows
+            if (ok && (!down ||
+                       (getContinueExistingPeriodicTasksAfterShutdownPolicy() && 
+                        !isTerminating()))) {
+                this.lastScheduledRunTime = 
+                    this.trigger.getNextRunTime(this, 
+                                                new Date(), 
+                                                this.lastActualRunTime, 
+                                                this.lastScheduledRunTime, 
+                                                this.lastCompleteTime); 
+                if (this.lastScheduledRunTime != null) {
+                    // reschedule the task
+                    if (this.listenerSupport != null) {
+                        this.listenerSupport.taskSubmitted(this);
+                    }
+                    AbstractManagedScheduledExecutorService.super.getQueue().add(this);
+                }
+            } else if (down) {
+                // This might have been the final executed delayed
+                // task.  Wake up threads to check.           
+                interruptIdleWorkers();
+            }
+        }
+        
+        private void runPeriodic() {
+            if (this.setContextOnRun && this.managedContext != null) {
+                Map<String, Object> threadContext = null;
+                try {
+                    threadContext = this.managedContext.set();
+                } catch (RuntimeException e) {
+                    LOG.warn("Failed to apply context to thread for task " + this + ": " + 
+                             e.getMessage() + ". Cancelling task");        
+                    cancelFutureTask(false);
+                    return;
+                }
+                try {
+                    runPeriodicSub();
+                } finally {
+                    this.managedContext.unset(threadContext);
+                }
+            } else {
+                runPeriodicSub();
+            }
+        }
+
+        /**
+         * Overrides FutureTask version so as to reset/requeue if periodic.
+         */ 
+        public void run() {
+            if (isPeriodic()) {
+                runPeriodic();
+            } else { 
+                super.run();
+            }
+        }
+    }
+
+    public AbstractManagedScheduledExecutorService(int corePoolSize,
+                                                   ThreadFactory threadFactory) {
+        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
+              new DelayedWorkQueue(), threadFactory);
+    }
+        
+    /**
+     * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
+     */
+    protected void delayedExecute(ScheduledFutureTask<?> task) {
+        preExecute(task);
+        super.getQueue().add(task);
+    }
+    
+    protected void delayedExecute(ManagedFutureTask<?> task) {
+        preExecute(task);
+        // wrap ManagedFutureTask into ScheduledFutureTask
+        Date triggerTime = new Date();
+        ScheduledFutureTask<?> t = 
+            new ScheduledFutureTask<Object>(task, null, null, null, triggerTime);    
+        super.getQueue().add(t);
+    }
+    
+    protected void preExecute(ManagedFutureTask<?> task) {
+        ManagedTaskListenerSupport listenerSupport = task.getManagedTaskListenerSupport();
+        
+        if (listenerSupport != null) {
+            listenerSupport.taskSubmitted(task);
+        }
+        
+        if (isShutdown()) {
+            try {
+                reject(task);
+            } catch (RejectedExecutionException exception) {
+                if (listenerSupport != null) {
+                    listenerSupport.taskDone(task, exception);
+                }
+                throw exception;
+            }
+            return;
+        }
+        
+        // Prestart a thread if necessary. We cannot prestart it
+        // running the task because the task (probably) shouldn't be
+        // run yet, so thread will just idle until delay elapses.
+        if (getPoolSize() < getCorePoolSize()) {
+            prestartCoreThread();
+        }
+    }    
+
+    /**
+     * Cancel and clear the queue of all tasks that should not be run
+     * due to shutdown policy.
+     */
+    private void cancelUnwantedTasks() {
+        boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
+        boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
+        if (!keepDelayed && !keepPeriodic) 
+            super.getQueue().clear();
+        else if (keepDelayed || keepPeriodic) {
+            Object[] entries = super.getQueue().toArray();
+            for (int i = 0; i < entries.length; ++i) {
+                Object e = entries[i];
+                if (e instanceof ScheduledFutureTask) {
+                    ScheduledFutureTask<?> t = (ScheduledFutureTask<?>)e;
+                    if (t.isPeriodic()? !keepPeriodic : !keepDelayed)
+                        t.cancel(false);
+                }
+            }
+            entries = null;
+            purge();
+        }
+    }
+
+    public boolean remove(Runnable task) {
+        if (!(task instanceof ScheduledFutureTask))
+            return false;
+        return getQueue().remove(task);
+    }
+  
+    // *** schedule commands ***
+    
+    public ScheduledFuture<?> schedule(Runnable command, 
+                                       long delay, 
+                                       TimeUnit unit) {
+        return schedule(command, null, delay, unit, null);
+    }
+
+    public ScheduledFuture<?> schedule(Runnable command,
+                                       long delay,
+                                       TimeUnit unit,
+                                       ManagedTaskListener listener) {
+        return schedule(command, null, delay, unit, listener);
+    }
+    
+    protected <T> ScheduledFuture<T> schedule(Runnable command,
+                                              T result, 
+                                              long delay,
+                                              TimeUnit unit,
+                                              ManagedTaskListener listener) {
+        if (command == null || unit == null) {
+            throw new NullPointerException();
+        }
+        Date triggerTime = new Date(System.currentTimeMillis() + unit.toMillis(delay));
+        ManagedTaskListenerSupport listenerSupport = getManagedTaskListenerSupport(listener);
+        ManagedContext managedContext = getManagedContext();
+        ScheduledFutureTask<T> t = new ScheduledFutureTask<T>(command, 
+                                                              result, 
+                                                              managedContext, 
+                                                              listenerSupport, 
+                                                              triggerTime);
+        delayedExecute(t);
+        return t;
+    }
+    
+    public ScheduledFuture<?> schedule(Runnable command,
+                                       Trigger trigger,
+                                       ManagedTaskListener listener) {
+        if (command == null || trigger == null) {
+            throw new NullPointerException();
+        }
+        ManagedTaskListenerSupport listenerSupport = getManagedTaskListenerSupport(listener);
+        ManagedContext managedContext = getManagedContext();
+        ScheduledFutureTask<?> t = new ScheduledFutureTask<Boolean>(command, 
+                                                                    null, 
+                                                                    managedContext, 
+                                                                    listenerSupport, 
+                                                                    trigger);
+        delayedExecute(t);
+        return t;
+    }
+    
+    public <V> ScheduledFuture<V> schedule(Callable<V> callable, 
+                                           long delay, 
+                                           TimeUnit unit) {
+        return schedule(callable, delay, unit, null);
+    }
+    
+    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
+                                           long delay,
+                                           TimeUnit unit,
+                                           ManagedTaskListener listener) {
+        if (callable == null || unit == null) {
+            throw new NullPointerException();
+        }
+        if (delay < 0) delay = 0;
+        Date triggerTime = new Date(System.currentTimeMillis() + unit.toMillis(delay));
+        ManagedTaskListenerSupport listenerSupport = getManagedTaskListenerSupport(listener);
+        ManagedContext managedContext = getManagedContext();
+        ScheduledFutureTask<V> t = new ScheduledFutureTask<V>(callable, 
+                                                              managedContext, 
+                                                              listenerSupport,
+                                                              triggerTime);
+        delayedExecute(t);
+        return t;
+    }
+    
+    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
+                                           Trigger trigger,
+                                           ManagedTaskListener listener) {
+        if (callable == null || trigger == null) {
+            throw new NullPointerException();
+        }
+        ManagedTaskListenerSupport listenerSupport = getManagedTaskListenerSupport(listener);
+        ManagedContext managedContext = getManagedContext();
+        ScheduledFutureTask<V> t = new ScheduledFutureTask<V>(callable, 
+                                                              managedContext, 
+                                                              listenerSupport,
+                                                              trigger);
+        delayedExecute(t);
+        return t;
+    }
+    
+    // *** scheduleAt* commands *****
+         
+    private static class PeriodicTrigger implements Trigger {
+
+        private long period;
+        private long initialDelay;
+        private boolean rate;
+
+        public PeriodicTrigger(long initialDelay, long period, boolean rate) {
+            this.initialDelay = initialDelay;
+            this.period = period;
+            this.rate = rate;
+        }
+        
+        public Date getNextRunTime(Future<?> future,
+                                   Date baseTime,
+                                   Date lastActualRunTime,
+                                   Date lastScheduledRunTime,
+                                   Date lastCompleteTime) {
+            long nextRunTime;
+            if (lastScheduledRunTime == null) {
+                nextRunTime = baseTime.getTime() + this.initialDelay;
+            } else {
+                if (this.rate) {
+                    nextRunTime = lastScheduledRunTime.getTime() + this.period;
+                } else {
+                    nextRunTime = baseTime.getTime() + this.period;
+                }
+            }
+            return new Date(nextRunTime);
+        }
+
+        public boolean skipRun(Future<?> arg0, Date arg1) {
+            return false;
+        }
+
+    }
+    
+    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, 
+                                                  long initialDelay,  
+                                                  long period, 
+                                                  TimeUnit unit) {
+        return scheduleAtFixedRate(command, initialDelay, period, unit, null);
+    }
+    
+    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
+                                                  long initialDelay,
+                                                  long period,
+                                                  TimeUnit unit,
+                                                  ManagedTaskListener listener) {
+        if (command == null || unit == null) {
+            throw new NullPointerException();
+        }
+        if (period <= 0) {
+            throw new IllegalArgumentException();
+        }
+        if (initialDelay < 0) initialDelay = 0;
+        PeriodicTrigger trigger = 
+            new PeriodicTrigger(unit.toMillis(initialDelay), unit.toMillis(period), true);
+        return schedule(command, trigger, listener);
+    }
+    
+    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, 
+                                                     long initialDelay,  
+                                                     long delay, 
+                                                     TimeUnit unit) {
+        return scheduleWithFixedDelay(command, initialDelay, delay, unit, null);
+    }
+    
+    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
+                                                     long initialDelay,
+                                                     long delay,
+                                                     TimeUnit unit,
+                                                     ManagedTaskListener listener) {
+        if (command == null || unit == null) {
+            throw new NullPointerException();
+        }
+        if (delay <= 0) {
+            throw new IllegalArgumentException();
+        }
+        if (initialDelay < 0) initialDelay = 0;
+        PeriodicTrigger trigger = 
+            new PeriodicTrigger(unit.toMillis(initialDelay), unit.toMillis(delay), false);
+        return schedule(command, trigger, listener);
+    }
+
+
+    // Override ExecutorService methods
+
+    public void execute(Runnable command) {
+        schedule(command, null, 0, TimeUnit.NANOSECONDS, null);
+    }
+   
+    public Future<?> submit(Runnable task) {
+        return schedule(task, null, 0, TimeUnit.NANOSECONDS, null);
+    }
+
+    public <T> Future<T> submit(Runnable task, T result) {
+        return schedule(task, result, 0, TimeUnit.NANOSECONDS, null);
+    }
+
+    public <T> Future<T> submit(Callable<T> task) {
+        return schedule(task, 0, TimeUnit.NANOSECONDS, null);
+    }
+      
+    public Future<?> submit(Runnable task, ManagedTaskListener listener) {
+        return schedule(task, null, 0, TimeUnit.NANOSECONDS, listener);
+    }
+
+    public <T> Future<T> submit(Runnable task, T result, ManagedTaskListener listener) {
+        return schedule(task, result, 0, TimeUnit.NANOSECONDS, listener);
+    }
+        
+    public <T> Future<T> submit(Callable<T> task, ManagedTaskListener listener) {
+        return schedule(task, 0, TimeUnit.NANOSECONDS, listener);
+    }
+    
+    /* 
+     * This is called by invokeAll/invokeAny functions.     
+     */
+    protected void executeTask(ManagedFutureTask<?> task) {  
+        // ManagedFutureTask will get wrapped in ScheduledFutureTask
+        delayedExecute(task);
+    }
+    
+    // Policy methods
+    
+    /**
+     * Set policy on whether to continue executing existing periodic
+     * tasks even when this executor has been <tt>shutdown</tt>. In
+     * this case, these tasks will only terminate upon
+     * <tt>shutdownNow</tt>, or after setting the policy to
+     * <tt>false</tt> when already shutdown. This value is by default
+     * false.
+     * @param value if true, continue after shutdown, else don't.
+     * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
+     */
+    public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
+        continueExistingPeriodicTasksAfterShutdown = value;
+        if (!value && isShutdown())
+            cancelUnwantedTasks();
+    }
+
+    /**
+     * Get the policy on whether to continue executing existing
+     * periodic tasks even when this executor has been
+     * <tt>shutdown</tt>. In this case, these tasks will only
+     * terminate upon <tt>shutdownNow</tt> or after setting the policy
+     * to <tt>false</tt> when already shutdown. This value is by
+     * default false.
+     * @return true if will continue after shutdown.
+     * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
+     */
+    public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
+        return continueExistingPeriodicTasksAfterShutdown;
+    }
+
+    /**
+     * Set policy on whether to execute existing delayed
+     * tasks even when this executor has been <tt>shutdown</tt>. In
+     * this case, these tasks will only terminate upon
+     * <tt>shutdownNow</tt>, or after setting the policy to
+     * <tt>false</tt> when already shutdown. This value is by default
+     * true.
+     * @param value if true, execute after shutdown, else don't.
+     * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
+     */
+    public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
+        executeExistingDelayedTasksAfterShutdown = value;
+        if (!value && isShutdown())
+            cancelUnwantedTasks();
+    }
+
+    /**
+     * Get policy on whether to execute existing delayed
+     * tasks even when this executor has been <tt>shutdown</tt>. In
+     * this case, these tasks will only terminate upon
+     * <tt>shutdownNow</tt>, or after setting the policy to
+     * <tt>false</tt> when already shutdown. This value is by default
+     * true.
+     * @return true if will execute after shutdown.
+     * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
+     */
+    public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
+        return executeExistingDelayedTasksAfterShutdown;
+    }
+
+
+    /**
+     * Initiates an orderly shutdown in which previously submitted
+     * tasks are executed, but no new tasks will be accepted. If the
+     * <tt>ExecuteExistingDelayedTasksAfterShutdownPolicy</tt> has
+     * been set <tt>false</tt>, existing delayed tasks whose delays
+     * have not yet elapsed are cancelled. And unless the
+     * <tt>ContinueExistingPeriodicTasksAfterShutdownPolicy</tt> has
+     * been set <tt>true</tt>, future executions of existing periodic
+     * tasks will be cancelled.
+     */
+    public void shutdown() {
+        cancelUnwantedTasks();
+        super.shutdown();
+    }
+
+    /**
+     * Attempts to stop all actively executing tasks, halts the
+     * processing of waiting tasks, and returns a list of the tasks that were
+     * awaiting execution. 
+     *  
+     * <p>There are no guarantees beyond best-effort attempts to stop
+     * processing actively executing tasks.  This implementation
+     * cancels tasks via {@link Thread#interrupt}, so if any tasks mask or
+     * fail to respond to interrupts, they may never terminate.
+     *
+     * @return list of tasks that never commenced execution.  Each
+     * element of this list is a {@link ScheduledFuture},
+     * including those tasks submitted using <tt>execute</tt>, which
+     * are for scheduling purposes used as the basis of a zero-delay
+     * <tt>ScheduledFuture</tt>.
+     */
+    public List<Runnable> shutdownNow() {
+        return super.shutdownNow();
+    }
+
+    /**
+     * Returns the task queue used by this executor.  Each element of
+     * this queue is a {@link ScheduledFuture}, including those
+     * tasks submitted using <tt>execute</tt> which are for scheduling
+     * purposes used as the basis of a zero-delay
+     * <tt>ScheduledFuture</tt>. Iteration over this queue is
+     * <em>not</em> guaranteed to traverse tasks in the order in
+     * which they will execute.
+     *
+     * @return the task queue
+     */
+    public BlockingQueue<Runnable> getQueue() {
+        return super.getQueue();
+    }
+
+    /**
+     * An annoying wrapper class to convince generics compiler to
+     * use a DelayQueue<ScheduledFutureTask> as a BlockingQueue<Runnable>
+     */ 
+    private static class DelayedWorkQueue 
+        extends AbstractCollection<Runnable> 
+        implements BlockingQueue<Runnable> {
+        
+        private final DelayQueue<ScheduledFutureTask> dq = new DelayQueue<ScheduledFutureTask>();
+        public Runnable poll() { return dq.poll(); }
+        public Runnable peek() { return dq.peek(); }
+        public Runnable take() throws InterruptedException { return dq.take(); }
+        public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
+            return dq.poll(timeout, unit);
+        }
+
+        public boolean add(Runnable x) { return dq.add((ScheduledFutureTask)x); }
+        public boolean offer(Runnable x) { return dq.offer((ScheduledFutureTask)x); }
+        public void put(Runnable x)  {
+            dq.put((ScheduledFutureTask)x); 
+        }
+        public boolean offer(Runnable x, long timeout, TimeUnit unit) {
+            return dq.offer((ScheduledFutureTask)x, timeout, unit);
+        }
+
+        public Runnable remove() { return dq.remove(); }
+        public Runnable element() { return dq.element(); }
+        public void clear() { dq.clear(); }
+        public int drainTo(Collection<? super Runnable> c) { return dq.drainTo(c); }
+        public int drainTo(Collection<? super Runnable> c, int maxElements) { 
+            return dq.drainTo(c, maxElements); 
+        }
+
+        public int remainingCapacity() { return dq.remainingCapacity(); }
+        public boolean remove(Object x) { return dq.remove(x); }
+        public boolean contains(Object x) { return dq.contains(x); }
+        public int size() { return dq.size(); }
+        public boolean isEmpty() { return dq.isEmpty(); }
+        public Object[] toArray() { return dq.toArray(); }
+        public <T> T[] toArray(T[] array) { return dq.toArray(array); }
+        public Iterator<Runnable> iterator() { 
+            return new Iterator<Runnable>() {
+                private Iterator<ScheduledFutureTask> it = dq.iterator();
+                public boolean hasNext() { return it.hasNext(); }
+                public Runnable next() { return it.next(); }
+                public void remove() {  it.remove(); }
+            };
+        }
+    }
+   
+}

Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/AbstractManagedScheduledExecutorService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ComponentManagedExecutorService.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ComponentManagedExecutorService.java?rev=634792&view=auto
==============================================================================
--- geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ComponentManagedExecutorService.java (added)
+++ geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ComponentManagedExecutorService.java Fri Mar  7 11:56:14 2008
@@ -0,0 +1,172 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.geronimo.concurrent.executor;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.util.concurrent.ManagedTaskListener;
+import javax.util.concurrent.ManagedThreadFactory;
+
+import org.apache.geronimo.concurrent.ManagedContext;
+import org.apache.geronimo.concurrent.ManagedContextHandler;
+import org.apache.geronimo.concurrent.thread.ManagedThreadFactoryUtils;
+
+/**
+ * Component-managed implementation of {@link ManagedExecutorService}. 
+ * In component-managed implementation {@link ManagedTaskListener} callbacks
+ * must execute within the context of the thread that created the executor. 
+ * <BR>
+ * Some key methods are overridden to set the right context on the thread submitting
+ * the tasks. 
+ */
+public class ComponentManagedExecutorService 
+    extends AbstractManagedExecutorService {
+
+    private ManagedContext managedContext;
+
+    public ComponentManagedExecutorService(int corePoolSize,
+                                           int maximumPoolSize,
+                                           long keepAliveTime,
+                                           TimeUnit unit,
+                                           BlockingQueue<Runnable> workQueue,
+                                           ManagedThreadFactory threadFactory,
+                                           ManagedContextHandler contextHandler) {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
+              workQueue, threadFactory);
+        
+        // save context now
+        this.managedContext = ManagedContext.captureContext(contextHandler);
+        
+        setThreadFactory(ManagedThreadFactoryUtils.createEmbeddedThreadFactory(threadFactory, 
+                                                                               this.managedContext));        
+    }
+                
+    protected ManagedContext getManagedContext() {
+        return this.managedContext;
+    }
+            
+    protected void executeTask(ManagedFutureTask<?> task) { 
+        task.setSetContextOnRun(false);
+        super.executeTask(task);
+    }
+    
+    /*
+     * TODO: some of these functions could be optimized a bit. When listener == null
+     *       the context does not have to be set on the current thread since 
+     *       there is no listener methods to call.
+     */
+    
+    public <T> T invokeAny(Collection<Callable<T>> tasks, 
+                           ManagedTaskListener listener)
+        throws InterruptedException, 
+               ExecutionException {
+        if (tasks == null) {
+            throw new NullPointerException();
+        }
+        
+        Map<String, Object> threadContext = this.managedContext.set();
+        try {
+            return super.invokeAny(tasks, listener);
+        } finally {
+            this.managedContext.unset(threadContext);
+        }
+    }
+    
+    public <T> T invokeAny(Collection<Callable<T>> tasks,
+                           long timeout,
+                           TimeUnit unit,
+                           ManagedTaskListener listener) 
+        throws InterruptedException,
+               ExecutionException, 
+               TimeoutException {
+        if (tasks == null) {
+            throw new NullPointerException();
+        }
+        
+        Map<String, Object> threadContext = this.managedContext.set();
+        try {
+            return super.invokeAny(tasks, timeout, unit, listener);
+        } finally {
+            this.managedContext.unset(threadContext);
+        }
+    }
+    public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks, 
+                                         ManagedTaskListener listener)
+            throws InterruptedException {
+        if (tasks == null) {
+            throw new NullPointerException();
+        }
+        
+        Map<String, Object> threadContext = this.managedContext.set();
+        try {
+            return super.invokeAll(tasks, listener);
+        } finally {
+            this.managedContext.unset(threadContext);
+        }
+    }
+
+    public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks,
+                                         long timeout,
+                                         TimeUnit unit,
+                                         ManagedTaskListener listener) throws InterruptedException {
+        if (tasks == null || unit == null) {
+            throw new NullPointerException();
+        }
+        
+        Map<String, Object> threadContext = this.managedContext.set();
+        try {
+            return super.invokeAll(tasks, timeout, unit, listener);
+        } finally {
+            this.managedContext.unset(threadContext);
+        }
+    }
+    
+    public <T> Future<T> submit(Runnable task, T result, ManagedTaskListener listener) {
+        if (task == null) {
+            throw new NullPointerException();
+        }
+        
+        Map<String, Object> threadContext = this.managedContext.set();
+        try {
+            return super.submit(task, result, listener);
+        } finally {
+            this.managedContext.unset(threadContext);
+        }
+    }
+        
+    public <T> Future<T> submit(Callable<T> task, ManagedTaskListener listener) {
+        if (task == null) {
+            throw new NullPointerException();
+        }
+
+        Map<String, Object> threadContext = this.managedContext.set();
+        try {
+            return super.submit(task, listener);
+        } finally {
+            this.managedContext.unset(threadContext); 
+        }
+    }
+        
+}

Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ComponentManagedExecutorService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ComponentManagedExecutorService.java
------------------------------------------------------------------------------
    svn:executable = 

Added: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ComponentManagedScheduledExecutorService.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ComponentManagedScheduledExecutorService.java?rev=634792&view=auto
==============================================================================
--- geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ComponentManagedScheduledExecutorService.java (added)
+++ geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ComponentManagedScheduledExecutorService.java Fri Mar  7 11:56:14 2008
@@ -0,0 +1,245 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.geronimo.concurrent.executor;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.util.concurrent.ManagedScheduledExecutorService;
+import javax.util.concurrent.ManagedTaskListener;
+import javax.util.concurrent.ManagedThreadFactory;
+import javax.util.concurrent.Trigger;
+
+import org.apache.geronimo.concurrent.ManagedContext;
+import org.apache.geronimo.concurrent.ManagedContextHandler;
+import org.apache.geronimo.concurrent.thread.ManagedThreadFactoryUtils;
+
+/**
+ * Component-managed implementation of {@link ManagedScheduledExecutorService}.
+ * In component-managed implementation {@link ManagedTaskListener} callbacks
+ * must execute within the context of the thread that created the executor. <BR>
+ * Some key methods are overridden to set the right context on the thread
+ * submitting the tasks.
+ */
+public class ComponentManagedScheduledExecutorService 
+    extends AbstractManagedScheduledExecutorService {
+
+    private ManagedContext managedContext;
+
+    public ComponentManagedScheduledExecutorService(int corePoolSize,
+                                                    ManagedThreadFactory threadFactory,
+                                                    ManagedContextHandler contextHandler) {
+        super(corePoolSize, threadFactory);
+        
+        // save context now
+        this.managedContext = ManagedContext.captureContext(contextHandler);
+        
+        setThreadFactory(ManagedThreadFactoryUtils.createEmbeddedThreadFactory(threadFactory, 
+                                                                               this.managedContext));        
+    }
+  
+    protected ManagedContext getManagedContext() {
+        return this.managedContext;
+    }
+    
+    protected void preExecute(ManagedFutureTask<?> task) {
+        task.setSetContextOnRun(false);
+        super.preExecute(task);
+    }
+    
+    /*
+     * TODO: some of these functions could be optimized a bit. When listener == null
+     *       the context does not have to be set on the current thread since 
+     *       there is no listener methods to call.
+     */
+    
+    public <T> T invokeAny(Collection<Callable<T>> tasks, 
+                           ManagedTaskListener listener)
+        throws InterruptedException, 
+               ExecutionException {
+        if (tasks == null) {
+            throw new NullPointerException();
+        }
+        
+        Map<String, Object> threadContext = this.managedContext.set();
+        try {
+            return super.invokeAny(tasks, listener);
+        } finally {
+            this.managedContext.unset(threadContext);
+        }
+    }
+    
+    public <T> T invokeAny(Collection<Callable<T>> tasks,
+                           long timeout,
+                           TimeUnit unit,
+                           ManagedTaskListener listener) 
+        throws InterruptedException,
+               ExecutionException, 
+               TimeoutException {
+        if (tasks == null) {
+            throw new NullPointerException();
+        }
+        
+        Map<String, Object> threadContext = this.managedContext.set();
+        try {
+            return super.invokeAny(tasks, timeout, unit, listener);
+        } finally {
+            this.managedContext.unset(threadContext);
+        }
+    }
+    public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks, 
+                                         ManagedTaskListener listener)
+            throws InterruptedException {
+        if (tasks == null) {
+            throw new NullPointerException();
+        }
+        
+        Map<String, Object> threadContext = this.managedContext.set();
+        try {
+            return super.invokeAll(tasks, listener);
+        } finally {
+            this.managedContext.unset(threadContext);
+        }
+    }
+
+    public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks,
+                                         long timeout,
+                                         TimeUnit unit,
+                                         ManagedTaskListener listener) throws InterruptedException {
+        if (tasks == null || unit == null) {
+            throw new NullPointerException();
+        }
+        
+        Map<String, Object> threadContext = this.managedContext.set();
+        try {
+            return super.invokeAll(tasks, timeout, unit, listener);
+        } finally {
+            this.managedContext.unset(threadContext);
+        }
+    }
+    
+    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
+                                           long delay,
+                                           TimeUnit unit,
+                                           ManagedTaskListener listener) {
+        if (callable == null || unit == null) {
+            throw new NullPointerException();
+        }
+
+        Map<String, Object> threadContext = this.managedContext.set();
+        try {
+            return super.schedule(callable, delay, unit, listener);
+        } finally {
+            this.managedContext.unset(threadContext);
+        }
+    }
+
+    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
+                                           Trigger trigger,
+                                           ManagedTaskListener listener) {
+        if (callable == null || trigger == null) {
+            throw new NullPointerException();
+        }
+
+        Map<String, Object> threadContext = this.managedContext.set();
+        try {
+            return super.schedule(callable, trigger, listener);
+        } finally {
+            this.managedContext.unset(threadContext);
+        }
+    }
+          
+    protected <T> ScheduledFuture<T> schedule(Runnable command,
+                                              T result,
+                                              long delay,
+                                              TimeUnit unit,
+                                              ManagedTaskListener listener) {
+        if (command == null || unit == null) {
+            throw new NullPointerException();
+        }
+
+        Map<String, Object> threadContext = this.managedContext.set();
+        try {
+            return super.schedule(command, result, delay, unit, listener);
+        } finally {
+            this.managedContext.unset(threadContext);
+        }
+    }
+
+    public ScheduledFuture<?> schedule(Runnable command,
+                                       Trigger trigger,
+                                       ManagedTaskListener listener) {
+        if (command == null || trigger == null) {
+            throw new NullPointerException();
+        }
+
+        Map<String, Object> threadContext = this.managedContext.set();
+        try {
+            return super.schedule(command, trigger, listener);
+        } finally {
+            this.managedContext.unset(threadContext);
+        }
+    }
+
+    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
+                                                  long initialDelay,
+                                                  long period,
+                                                  TimeUnit unit,
+                                                  ManagedTaskListener listener) {
+        if (command == null || unit == null) {
+            throw new NullPointerException();
+        }
+        if (period <= 0) {
+            throw new IllegalArgumentException();
+        }
+
+        Map<String, Object> threadContext = this.managedContext.set();
+        try {
+            return super.scheduleAtFixedRate(command, initialDelay, period, unit, listener);
+        } finally {
+            this.managedContext.unset(threadContext);
+        }
+    }
+
+    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
+                                                     long initialDelay,
+                                                     long delay,
+                                                     TimeUnit unit,
+                                                     ManagedTaskListener listener) {
+        if (command == null || unit == null) {
+            throw new NullPointerException();
+        }
+        if (delay <= 0) {
+            throw new IllegalArgumentException();
+        }
+
+        Map<String, Object> threadContext = this.managedContext.set();
+        try {
+            return super.scheduleAtFixedRate(command, initialDelay, delay, unit, listener);
+        } finally {
+            this.managedContext.unset(threadContext);
+        }
+    }
+
+}

Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ComponentManagedScheduledExecutorService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ComponentManagedScheduledExecutorService.java
------------------------------------------------------------------------------
    svn:executable = 

Added: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedExecutorCompletionService.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedExecutorCompletionService.java?rev=634792&view=auto
==============================================================================
--- geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedExecutorCompletionService.java (added)
+++ geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedExecutorCompletionService.java Fri Mar  7 11:56:14 2008
@@ -0,0 +1,96 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/*
+ * This class is based on and borrows code from java.util.concurrent.ExecutorCompletionService 
+ * class in Apache Harmony.
+ */
+package org.apache.geronimo.concurrent.executor;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.geronimo.concurrent.ManagedContext;
+import org.apache.geronimo.concurrent.ManagedTaskListenerSupport;
+
+public class ManagedExecutorCompletionService<V> {
+    private final AbstractManagedExecutorService executor;
+    private final BlockingQueue<Future<V>> completionQueue;
+    
+    private ManagedContext managedContext;
+    private ManagedTaskListenerSupport listenerSupport;
+    
+    /**
+     * FutureTask extension to enqueue upon completion
+     */
+    private class QueueingManagedFuture extends ManagedFutureTask<V> {
+        QueueingManagedFuture(Callable<V> callable, 
+                              ManagedContext managedContext,
+                              ManagedTaskListenerSupport listenerSupport) { 
+            super(callable, managedContext, listenerSupport);
+        }
+        protected void done() { completionQueue.add(this); }
+    }
+
+    public ManagedExecutorCompletionService(AbstractManagedExecutorService executor,
+                                            ManagedContext managedContext,
+                                            ManagedTaskListenerSupport listenerSupport) {
+        this(executor, new LinkedBlockingQueue<Future<V>>(), managedContext, listenerSupport);
+    }
+
+    public ManagedExecutorCompletionService(AbstractManagedExecutorService executor,
+                                            BlockingQueue<Future<V>> completionQueue,
+                                            ManagedContext managedContext,
+                                            ManagedTaskListenerSupport listenerSupport) {
+        if (executor == null || completionQueue == null) {
+            throw new NullPointerException();
+        }
+        this.executor = executor;
+        this.completionQueue = completionQueue;
+        this.managedContext = managedContext;
+        this.listenerSupport = listenerSupport;
+    }
+
+    public Future<V> submit(Callable<V> task) {
+        if (task == null) {
+            throw new NullPointerException();
+        }
+        
+        QueueingManagedFuture managedFuture = 
+            new QueueingManagedFuture(task, this.managedContext, this.listenerSupport);        
+        this.executor.executeTask(managedFuture);
+        
+        return managedFuture;
+    }
+
+    public Future<V> take() throws InterruptedException {
+        return completionQueue.take();
+    }
+
+    public Future<V> poll() {
+        return completionQueue.poll();
+    }
+
+    public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
+        return completionQueue.poll(timeout, unit);
+    }
+    
+}
+

Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedExecutorCompletionService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedExecutorCompletionService.java
------------------------------------------------------------------------------
    svn:executable = 

Added: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedExecutorServiceFacade.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedExecutorServiceFacade.java?rev=634792&view=auto
==============================================================================
--- geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedExecutorServiceFacade.java (added)
+++ geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedExecutorServiceFacade.java Fri Mar  7 11:56:14 2008
@@ -0,0 +1,175 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.geronimo.concurrent.executor;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.util.concurrent.ManagedExecutorService;
+import javax.util.concurrent.ManagedTaskListener;
+
+/**
+ * A facade class for ManagedExecutorService. If facade is configured as serverManaged
+ * all lifecycle operations of this class will throw {@link #IllegalArgumentException}. 
+ */
+public class ManagedExecutorServiceFacade 
+    implements ManagedExecutorService {
+    
+    protected ManagedExecutorService executor;
+    protected boolean serverManaged;
+
+    public ManagedExecutorServiceFacade(ManagedExecutorService executor, 
+                                        boolean serverManaged) {
+        if (executor == null) {
+            throw new NullPointerException();
+        }
+        this.executor = executor;
+        this.serverManaged = serverManaged;
+    }
+    
+    public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks, 
+                                         ManagedTaskListener listener)
+        throws InterruptedException {
+        return this.executor.invokeAll(tasks, listener);
+    }
+
+    public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks,
+                                         long timeout,
+                                         TimeUnit unit,
+                                         ManagedTaskListener listener) 
+        throws InterruptedException {
+        return this.executor.invokeAll(tasks, timeout, unit, listener);
+    }
+
+    public <T> T invokeAny(Collection<Callable<T>> tasks, 
+                           ManagedTaskListener listener)
+        throws InterruptedException, 
+               ExecutionException {
+        return this.executor.invokeAny(tasks, listener);
+    }
+
+    public <T> T invokeAny(Collection<Callable<T>> tasks,
+                           long timeout,
+                           TimeUnit unit,
+                           ManagedTaskListener listener) 
+        throws InterruptedException,
+               ExecutionException, 
+               TimeoutException {
+        return this.executor.invokeAny(tasks, timeout, unit, listener);
+    }
+
+    public Future<?> submit(Runnable command, ManagedTaskListener listener) {
+        return this.executor.submit(command, listener);
+    }
+
+    public <T> Future<T> submit(Callable<T> callable, ManagedTaskListener listener) {
+        return this.executor.submit(callable, listener);
+    }
+
+    public <T> Future<T> submit(Runnable command, T result, ManagedTaskListener listener) {
+        return this.executor.submit(command, result, listener);
+    }
+
+    public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks) 
+        throws InterruptedException {
+        return this.executor.invokeAll(tasks);
+    }
+
+    public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks, 
+                                         long timeout, 
+                                         TimeUnit unit)
+        throws InterruptedException {
+        return this.executor.invokeAll(tasks, timeout, unit);
+    }
+
+    public <T> T invokeAny(Collection<Callable<T>> tasks) 
+        throws InterruptedException,
+               ExecutionException {
+        return this.executor.invokeAny(tasks);
+    }
+
+    public <T> T invokeAny(Collection<Callable<T>> tasks, 
+                           long timeout, 
+                           TimeUnit unit)
+        throws InterruptedException, 
+               ExecutionException, 
+               TimeoutException {
+        return this.executor.invokeAny(tasks, timeout, unit);
+    }
+
+    public <T> Future<T> submit(Callable<T> task) {
+        return this.executor.submit(task);
+    }
+
+    public Future<?> submit(Runnable task) {
+        return this.executor.submit(task);
+    }
+
+    public <T> Future<T> submit(Runnable task, T result) {
+        return this.executor.submit(task, result);
+    }
+
+    public void execute(Runnable command) {
+        this.executor.execute(command);
+    }
+        
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+        if (this.serverManaged) {
+            throw new IllegalArgumentException();
+        } else {
+            return this.executor.awaitTermination(timeout, unit);
+        }
+    }
+    
+    public boolean isShutdown() {
+        if (this.serverManaged) {
+            throw new IllegalArgumentException();
+        } else {
+            return this.executor.isShutdown();
+        }
+    }
+
+    public boolean isTerminated() {
+        if (this.serverManaged) {
+            throw new IllegalArgumentException();
+        } else {
+            return this.executor.isTerminated();
+        }
+    }
+
+    public void shutdown() {
+        if (this.serverManaged) {
+            throw new IllegalArgumentException();
+        } else {
+            this.executor.shutdown();
+        }
+    }
+
+    public List<Runnable> shutdownNow() {
+        if (this.serverManaged) {
+            throw new IllegalArgumentException();
+        } else {
+            return this.executor.shutdownNow();
+        }
+    }
+   
+}

Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedExecutorServiceFacade.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedExecutorServiceFacade.java
------------------------------------------------------------------------------
    svn:executable = 

Added: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedFutureTask.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedFutureTask.java?rev=634792&view=auto
==============================================================================
--- geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedFutureTask.java (added)
+++ geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedFutureTask.java Fri Mar  7 11:56:14 2008
@@ -0,0 +1,183 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.geronimo.concurrent.executor;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.geronimo.concurrent.ManagedContext;
+import org.apache.geronimo.concurrent.ManagedTask;
+import org.apache.geronimo.concurrent.ManagedTaskListenerSupport;
+import org.apache.geronimo.concurrent.ManagedTaskUtils;
+import org.apache.geronimo.concurrent.harmony.FutureTask;
+import org.apache.geronimo.concurrent.thread.ManagedThread;
+
+public class ManagedFutureTask<V> extends FutureTask<V> implements ManagedTask {
+
+    protected final static Log LOG = LogFactory.getLog(ManagedFutureTask.class);
+    
+    protected ManagedContext managedContext;
+    protected ManagedTaskListenerSupport listenerSupport;
+    
+    private Object task;
+    private boolean associateTaskWithThread;
+    protected boolean setContextOnRun = true;
+        
+    public ManagedFutureTask(Callable<V> callable, 
+                             ManagedContext managedContext,
+                             ManagedTaskListenerSupport listener) {                             
+        super(callable);
+        this.task = callable;
+        this.managedContext = managedContext;
+        this.listenerSupport = listener;
+        // always associate the task with the thread
+        this.associateTaskWithThread = true;
+        init();
+    }
+    
+    public ManagedFutureTask(Runnable runnable, V result,
+                             ManagedContext managedContext,
+                             ManagedTaskListenerSupport listener) {                             
+        super(runnable, result);
+        this.task = runnable;
+        this.managedContext = managedContext;
+        this.listenerSupport = listener;
+        // if the runnable is ManagedFutureTask, do not associate it with the thread
+        // in this instance since the runnable will already do that 
+        this.associateTaskWithThread = !(runnable instanceof ManagedFutureTask);
+        init();
+    }
+    
+    private void init() {
+        if (this.listenerSupport != null && this.managedContext == null) {
+            throw new IllegalArgumentException("ManagedContext must be non-null if listener is specifed");
+        }
+    }
+    
+    boolean isSetContextOnRun() {
+        return this.setContextOnRun;
+    }
+
+    /**
+     * Sets if the context should be applied when the {@link #run() method is executed.
+     * By default and in the server-managed executors the context is applied. 
+     * In component-managed executors the context is not applied as the right context is
+     * already set by the thread.
+     */
+    void setSetContextOnRun(boolean setContextOnRun) {
+        this.setContextOnRun = setContextOnRun;
+    }
+
+    public ManagedTaskListenerSupport getManagedTaskListenerSupport() {
+        return this.listenerSupport;
+    }
+    
+    public ManagedContext getManagedContext() {
+        return this.managedContext;
+    }
+    
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {        
+        boolean result = super.cancel(mayInterruptIfRunning);
+        if (result && this.listenerSupport != null) {
+            // always set the context
+            Map<String, Object> threadContext = this.managedContext.set();
+            try {
+                this.listenerSupport.taskAborted(this, new CancellationException());
+            } finally {
+                this.managedContext.unset(threadContext);
+            }
+        }
+        return result;        
+    }
+
+    @Override
+    public void run() {
+        if (this.setContextOnRun && this.managedContext != null) {
+            Map<String, Object> threadContext = null;
+            try {
+                threadContext = this.managedContext.set();
+            } catch (RuntimeException e) {
+                LOG.warn("Failed to apply context to thread for task " + this + ": " + 
+                         e.getMessage() + ". Cancelling task");        
+                cancelFutureTask(false);
+                return;
+            }
+            try {
+                super.run();
+            } finally {
+                this.managedContext.unset(threadContext);
+            }
+        } else {
+            super.run();
+        }
+    }
+    
+    protected boolean cancelFutureTask(boolean mayInterruptIfRunning) {
+        return super.cancel(mayInterruptIfRunning);
+    }
+    
+    @Override
+    protected void taskStart() {
+        if (this.listenerSupport != null) {
+            this.listenerSupport.taskStarting(this);
+        }
+        
+        if (this.associateTaskWithThread) {
+            // associate task with the thread
+            Thread thread = Thread.currentThread();
+            if (thread instanceof ManagedThread) {
+                ManagedThread managedThread = (ManagedThread)thread;
+                managedThread.startTask(this);
+            } else {
+                LOG.warn("taskStart was not called on ManagedThread: " + thread);
+            }
+        }
+    }
+    
+    @Override
+    protected void taskDone(Throwable exception) {
+        if (this.listenerSupport != null) {
+            this.listenerSupport.taskDone(this, exception);
+        }
+        
+        if (this.associateTaskWithThread) {
+            // de-associate task with the thread
+            Thread thread = Thread.currentThread();
+            if (thread instanceof ManagedThread) {
+                ManagedThread managedThread = (ManagedThread)thread;
+                managedThread.endTask();
+            }
+        }
+    }
+
+    public boolean cancel() {
+        return cancel(true);
+    }
+
+    public String getIdentityDescription(String locale) {
+        return ManagedTaskUtils.getTaskDescription(this.task, locale);
+    }
+
+    public String getIdentityName() {
+        return ManagedTaskUtils.getTaskName(this.task);
+    }
+    
+}

Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedFutureTask.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedFutureTask.java
------------------------------------------------------------------------------
    svn:executable = 

Added: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedScheduledExecutorServiceFacade.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedScheduledExecutorServiceFacade.java?rev=634792&view=auto
==============================================================================
--- geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedScheduledExecutorServiceFacade.java (added)
+++ geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedScheduledExecutorServiceFacade.java Fri Mar  7 11:56:14 2008
@@ -0,0 +1,109 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.geronimo.concurrent.executor;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import javax.util.concurrent.ManagedScheduledExecutorService;
+import javax.util.concurrent.ManagedTaskListener;
+import javax.util.concurrent.Trigger;
+
+/**
+ * A facade class for ManagedScheduledExecutorService. If facade is configured as 
+ * serverManaged all lifecycle operations of this class will throw 
+ * {@link #IllegalArgumentException}. 
+ */
+public class ManagedScheduledExecutorServiceFacade 
+    extends ManagedExecutorServiceFacade
+    implements ManagedScheduledExecutorService {
+    
+    public ManagedScheduledExecutorServiceFacade(ManagedScheduledExecutorService executor, 
+                                                 boolean serverManaged) {
+        super(executor, serverManaged);
+    }
+
+    private ManagedScheduledExecutorService getExecutor() {
+        return (ManagedScheduledExecutorService)this.executor;
+    }
+    
+    public ScheduledFuture<?> schedule(Runnable command, 
+                                       Trigger trigger, 
+                                       ManagedTaskListener listener) {
+        return getExecutor().schedule(command, trigger, listener);
+    }
+
+    public <V> ScheduledFuture<V> schedule(Callable<V> callable, 
+                                           Trigger trigger, 
+                                           ManagedTaskListener listener) {
+        return getExecutor().schedule(callable, trigger, listener);
+    }
+
+    public ScheduledFuture<?> schedule(Runnable command,
+                                       long initialDelay,
+                                       TimeUnit unit,
+                                       ManagedTaskListener listener) {
+        return getExecutor().schedule(command, initialDelay, unit, listener);
+    }
+
+    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
+                                           long initialDelay,
+                                           TimeUnit unit,
+                                           ManagedTaskListener listener) {
+        return getExecutor().schedule(callable, initialDelay, unit, listener);
+    }
+
+    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
+                                                  long initialDelay,
+                                                  long period,
+                                                  TimeUnit unit,
+                                                  ManagedTaskListener listener) {
+        return getExecutor().scheduleAtFixedRate(command, initialDelay, period, unit, listener);
+    }
+
+    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
+                                                     long initialDelay,
+                                                     long delay,
+                                                     TimeUnit unit,
+                                                     ManagedTaskListener listener) {
+        return getExecutor().scheduleWithFixedDelay(command, initialDelay, delay, unit, listener);
+    }
+
+    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+        return getExecutor().schedule(command, delay, unit);
+    }
+
+    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+        return getExecutor().schedule(callable, delay, unit);
+    }
+
+    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
+                                                  long initialDelay,
+                                                  long period,
+                                                  TimeUnit unit) {
+        return getExecutor().scheduleAtFixedRate(command, initialDelay, period, unit);
+    }
+
+    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
+                                                     long initialDelay,
+                                                     long delay,
+                                                     TimeUnit unit) {
+        return getExecutor().scheduleWithFixedDelay(command, initialDelay, delay, unit);
+    }
+          
+}

Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedScheduledExecutorServiceFacade.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedScheduledExecutorServiceFacade.java
------------------------------------------------------------------------------
    svn:executable = 

Added: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ServerManagedExecutorService.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ServerManagedExecutorService.java?rev=634792&view=auto
==============================================================================
--- geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ServerManagedExecutorService.java (added)
+++ geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ServerManagedExecutorService.java Fri Mar  7 11:56:14 2008
@@ -0,0 +1,47 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.geronimo.concurrent.executor;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.geronimo.concurrent.ManagedContext;
+import org.apache.geronimo.concurrent.ManagedContextHandler;
+
+public class ServerManagedExecutorService 
+    extends AbstractManagedExecutorService {
+
+    protected ManagedContextHandler contextHandler;
+    
+    public ServerManagedExecutorService(int corePoolSize,
+                                        int maximumPoolSize,
+                                        long keepAliveTime,
+                                        TimeUnit unit,
+                                        BlockingQueue<Runnable> workQueue,
+                                        ThreadFactory threadFactory,
+                                        ManagedContextHandler contextHandler) {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
+              workQueue, threadFactory);
+        this.contextHandler = contextHandler;
+    }
+          
+    protected ManagedContext getManagedContext() {
+        return ManagedContext.captureContext(this.contextHandler);
+    }
+        
+}

Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ServerManagedExecutorService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ServerManagedExecutorService.java
------------------------------------------------------------------------------
    svn:executable = 

Added: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ServerManagedScheduledExecutorService.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ServerManagedScheduledExecutorService.java?rev=634792&view=auto
==============================================================================
--- geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ServerManagedScheduledExecutorService.java (added)
+++ geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ServerManagedScheduledExecutorService.java Fri Mar  7 11:56:14 2008
@@ -0,0 +1,40 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.geronimo.concurrent.executor;
+
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.geronimo.concurrent.ManagedContext;
+import org.apache.geronimo.concurrent.ManagedContextHandler;
+
+public class ServerManagedScheduledExecutorService 
+    extends AbstractManagedScheduledExecutorService {
+
+    protected ManagedContextHandler contextHandler;
+    
+    public ServerManagedScheduledExecutorService(int corePoolSize,
+                                                 ThreadFactory threadFactory,
+                                                 ManagedContextHandler contextHandler) {
+        super(corePoolSize, threadFactory);
+        this.contextHandler = contextHandler;
+    }
+    
+    protected ManagedContext getManagedContext() {
+        return ManagedContext.captureContext(this.contextHandler);
+    }
+        
+}

Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ServerManagedScheduledExecutorService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ServerManagedScheduledExecutorService.java
------------------------------------------------------------------------------
    svn:executable =