You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by ga...@apache.org on 2008/03/07 20:56:28 UTC
svn commit: r634792 [3/9] - in /geronimo/sandbox/concurrent: ./
concurrent-deployer/ concurrent-deployer/src/ concurrent-deployer/src/main/
concurrent-deployer/src/main/plan/ concurrent/ concurrent/src/
concurrent/src/main/ concurrent/src/main/plan/ ge...
Added: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/AbstractManagedScheduledExecutorService.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/AbstractManagedScheduledExecutorService.java?rev=634792&view=auto
==============================================================================
--- geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/AbstractManagedScheduledExecutorService.java (added)
+++ geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/AbstractManagedScheduledExecutorService.java Fri Mar 7 11:56:14 2008
@@ -0,0 +1,695 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This class is based on and borrows code from java.util.concurrent.ScheduledThreadPoolExecutor
+ * class in Apache Harmony.
+ */
+package org.apache.geronimo.concurrent.executor;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.util.concurrent.ManagedScheduledExecutorService;
+import javax.util.concurrent.ManagedTaskListener;
+import javax.util.concurrent.Trigger;
+
+import org.apache.geronimo.concurrent.ManagedContext;
+import org.apache.geronimo.concurrent.ManagedTaskListenerSupport;
+
+public abstract class AbstractManagedScheduledExecutorService
+ extends AbstractManagedExecutorService
+ implements ManagedScheduledExecutorService {
+
+ /**
+ * False if should cancel/suppress periodic tasks on shutdown.
+ */
+ private volatile boolean continueExistingPeriodicTasksAfterShutdown;
+
+ /**
+ * False if should cancel non-periodic tasks on shutdown.
+ */
+ private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
+
+ /**
+ * Sequence number to break scheduling ties, and in turn to
+ * guarantee FIFO order among tied entries.
+ */
+ private static final AtomicLong sequencer = new AtomicLong(0);
+
+ private class ScheduledFutureTask<V>
+ extends ManagedFutureTask<V> implements ScheduledFuture<V> {
+
+ /** Sequence number to break ties FIFO */
+ private final long sequenceNumber;
+
+ private final Trigger trigger;
+
+ private Date lastActualRunTime;
+ private Date lastScheduledRunTime;
+ private Date lastCompleteTime;
+
+ ScheduledFutureTask(Runnable r,
+ V result,
+ ManagedContext managedContext,
+ ManagedTaskListenerSupport listener,
+ Date triggerTime) {
+ super(r, result, managedContext, listener);
+ this.lastScheduledRunTime = triggerTime;
+ this.trigger = null;
+ this.sequenceNumber = sequencer.getAndIncrement();
+ }
+
+ ScheduledFutureTask(Runnable r,
+ V result,
+ ManagedContext managedContext,
+ ManagedTaskListenerSupport listener,
+ Trigger trigger) {
+ super(r, result, managedContext, listener);
+ this.trigger = trigger;
+ this.lastScheduledRunTime =
+ trigger.getNextRunTime(this, new Date(), null, null, null);
+ this.sequenceNumber = sequencer.getAndIncrement();
+ }
+
+ ScheduledFutureTask(Callable<V> callable,
+ ManagedContext managedContext,
+ ManagedTaskListenerSupport listener,
+ Date triggerTime) {
+ super(callable, managedContext, listener);
+ this.lastScheduledRunTime = triggerTime;
+ this.trigger = null;
+ this.sequenceNumber = sequencer.getAndIncrement();
+ }
+
+ ScheduledFutureTask(Callable<V> callable,
+ ManagedContext managedContext,
+ ManagedTaskListenerSupport listener,
+ Trigger trigger) {
+ super(callable, managedContext, listener);
+ this.trigger = trigger;
+ this.lastScheduledRunTime =
+ trigger.getNextRunTime(this, new Date(), null, null, null);
+ this.sequenceNumber = sequencer.getAndIncrement();
+ }
+
+ public long getDelay(TimeUnit unit) {
+ long d = unit.convert(this.lastScheduledRunTime.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ return d;
+ }
+
+ public int compareTo(Delayed other) {
+ if (other == this) // compare zero ONLY if same object
+ return 0;
+ ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
+ long diff = this.lastScheduledRunTime.getTime() - x.lastScheduledRunTime.getTime();
+ if (diff < 0)
+ return -1;
+ else if (diff > 0)
+ return 1;
+ else if (sequenceNumber < x.sequenceNumber)
+ return -1;
+ else
+ return 1;
+ }
+
+ /**
+ * Returns true if this is a periodic (not a one-shot) action.
+ * @return true if periodic
+ */
+ boolean isPeriodic() {
+ return this.trigger != null;
+ }
+
+ /**
+ * Run a periodic task
+ */
+ private void runPeriodicSub() {
+ boolean ok = false;
+ if (isCancelled()) {
+ ok = false;
+ if (this.listenerSupport != null) {
+ this.listenerSupport.taskDone(this, null);
+ }
+ } else {
+ if (this.trigger.skipRun(this, this.lastScheduledRunTime)) {
+ // skip run, task is rescheduled only if not cancelled
+ ok = super.setSkipped();
+ if (this.listenerSupport != null) {
+ this.listenerSupport.taskDone(this, null);
+ }
+ } else {
+ // run now
+ this.lastActualRunTime = new Date();
+ ok = super.runAndReset();
+ this.lastCompleteTime = new Date();
+ }
+ }
+ boolean down = isShutdown();
+ // Reschedule if not cancelled and not shutdown or policy allows
+ if (ok && (!down ||
+ (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
+ !isTerminating()))) {
+ this.lastScheduledRunTime =
+ this.trigger.getNextRunTime(this,
+ new Date(),
+ this.lastActualRunTime,
+ this.lastScheduledRunTime,
+ this.lastCompleteTime);
+ if (this.lastScheduledRunTime != null) {
+ // reschedule the task
+ if (this.listenerSupport != null) {
+ this.listenerSupport.taskSubmitted(this);
+ }
+ AbstractManagedScheduledExecutorService.super.getQueue().add(this);
+ }
+ } else if (down) {
+ // This might have been the final executed delayed
+ // task. Wake up threads to check.
+ interruptIdleWorkers();
+ }
+ }
+
+ private void runPeriodic() {
+ if (this.setContextOnRun && this.managedContext != null) {
+ Map<String, Object> threadContext = null;
+ try {
+ threadContext = this.managedContext.set();
+ } catch (RuntimeException e) {
+ LOG.warn("Failed to apply context to thread for task " + this + ": " +
+ e.getMessage() + ". Cancelling task");
+ cancelFutureTask(false);
+ return;
+ }
+ try {
+ runPeriodicSub();
+ } finally {
+ this.managedContext.unset(threadContext);
+ }
+ } else {
+ runPeriodicSub();
+ }
+ }
+
+ /**
+ * Overrides FutureTask version so as to reset/requeue if periodic.
+ */
+ public void run() {
+ if (isPeriodic()) {
+ runPeriodic();
+ } else {
+ super.run();
+ }
+ }
+ }
+
+ public AbstractManagedScheduledExecutorService(int corePoolSize,
+ ThreadFactory threadFactory) {
+ super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
+ new DelayedWorkQueue(), threadFactory);
+ }
+
+ /**
+ * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
+ */
+ protected void delayedExecute(ScheduledFutureTask<?> task) {
+ preExecute(task);
+ super.getQueue().add(task);
+ }
+
+ protected void delayedExecute(ManagedFutureTask<?> task) {
+ preExecute(task);
+ // wrap ManagedFutureTask into ScheduledFutureTask
+ Date triggerTime = new Date();
+ ScheduledFutureTask<?> t =
+ new ScheduledFutureTask<Object>(task, null, null, null, triggerTime);
+ super.getQueue().add(t);
+ }
+
+ protected void preExecute(ManagedFutureTask<?> task) {
+ ManagedTaskListenerSupport listenerSupport = task.getManagedTaskListenerSupport();
+
+ if (listenerSupport != null) {
+ listenerSupport.taskSubmitted(task);
+ }
+
+ if (isShutdown()) {
+ try {
+ reject(task);
+ } catch (RejectedExecutionException exception) {
+ if (listenerSupport != null) {
+ listenerSupport.taskDone(task, exception);
+ }
+ throw exception;
+ }
+ return;
+ }
+
+ // Prestart a thread if necessary. We cannot prestart it
+ // running the task because the task (probably) shouldn't be
+ // run yet, so thread will just idle until delay elapses.
+ if (getPoolSize() < getCorePoolSize()) {
+ prestartCoreThread();
+ }
+ }
+
+ /**
+ * Cancel and clear the queue of all tasks that should not be run
+ * due to shutdown policy.
+ */
+ private void cancelUnwantedTasks() {
+ boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
+ boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
+ if (!keepDelayed && !keepPeriodic)
+ super.getQueue().clear();
+ else if (keepDelayed || keepPeriodic) {
+ Object[] entries = super.getQueue().toArray();
+ for (int i = 0; i < entries.length; ++i) {
+ Object e = entries[i];
+ if (e instanceof ScheduledFutureTask) {
+ ScheduledFutureTask<?> t = (ScheduledFutureTask<?>)e;
+ if (t.isPeriodic()? !keepPeriodic : !keepDelayed)
+ t.cancel(false);
+ }
+ }
+ entries = null;
+ purge();
+ }
+ }
+
+ public boolean remove(Runnable task) {
+ if (!(task instanceof ScheduledFutureTask))
+ return false;
+ return getQueue().remove(task);
+ }
+
+ // *** schedule commands ***
+
+ public ScheduledFuture<?> schedule(Runnable command,
+ long delay,
+ TimeUnit unit) {
+ return schedule(command, null, delay, unit, null);
+ }
+
+ public ScheduledFuture<?> schedule(Runnable command,
+ long delay,
+ TimeUnit unit,
+ ManagedTaskListener listener) {
+ return schedule(command, null, delay, unit, listener);
+ }
+
+ protected <T> ScheduledFuture<T> schedule(Runnable command,
+ T result,
+ long delay,
+ TimeUnit unit,
+ ManagedTaskListener listener) {
+ if (command == null || unit == null) {
+ throw new NullPointerException();
+ }
+ Date triggerTime = new Date(System.currentTimeMillis() + unit.toMillis(delay));
+ ManagedTaskListenerSupport listenerSupport = getManagedTaskListenerSupport(listener);
+ ManagedContext managedContext = getManagedContext();
+ ScheduledFutureTask<T> t = new ScheduledFutureTask<T>(command,
+ result,
+ managedContext,
+ listenerSupport,
+ triggerTime);
+ delayedExecute(t);
+ return t;
+ }
+
+ public ScheduledFuture<?> schedule(Runnable command,
+ Trigger trigger,
+ ManagedTaskListener listener) {
+ if (command == null || trigger == null) {
+ throw new NullPointerException();
+ }
+ ManagedTaskListenerSupport listenerSupport = getManagedTaskListenerSupport(listener);
+ ManagedContext managedContext = getManagedContext();
+ ScheduledFutureTask<?> t = new ScheduledFutureTask<Boolean>(command,
+ null,
+ managedContext,
+ listenerSupport,
+ trigger);
+ delayedExecute(t);
+ return t;
+ }
+
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable,
+ long delay,
+ TimeUnit unit) {
+ return schedule(callable, delay, unit, null);
+ }
+
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable,
+ long delay,
+ TimeUnit unit,
+ ManagedTaskListener listener) {
+ if (callable == null || unit == null) {
+ throw new NullPointerException();
+ }
+ if (delay < 0) delay = 0;
+ Date triggerTime = new Date(System.currentTimeMillis() + unit.toMillis(delay));
+ ManagedTaskListenerSupport listenerSupport = getManagedTaskListenerSupport(listener);
+ ManagedContext managedContext = getManagedContext();
+ ScheduledFutureTask<V> t = new ScheduledFutureTask<V>(callable,
+ managedContext,
+ listenerSupport,
+ triggerTime);
+ delayedExecute(t);
+ return t;
+ }
+
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable,
+ Trigger trigger,
+ ManagedTaskListener listener) {
+ if (callable == null || trigger == null) {
+ throw new NullPointerException();
+ }
+ ManagedTaskListenerSupport listenerSupport = getManagedTaskListenerSupport(listener);
+ ManagedContext managedContext = getManagedContext();
+ ScheduledFutureTask<V> t = new ScheduledFutureTask<V>(callable,
+ managedContext,
+ listenerSupport,
+ trigger);
+ delayedExecute(t);
+ return t;
+ }
+
+ // *** scheduleAt* commands *****
+
+ private static class PeriodicTrigger implements Trigger {
+
+ private long period;
+ private long initialDelay;
+ private boolean rate;
+
+ public PeriodicTrigger(long initialDelay, long period, boolean rate) {
+ this.initialDelay = initialDelay;
+ this.period = period;
+ this.rate = rate;
+ }
+
+ public Date getNextRunTime(Future<?> future,
+ Date baseTime,
+ Date lastActualRunTime,
+ Date lastScheduledRunTime,
+ Date lastCompleteTime) {
+ long nextRunTime;
+ if (lastScheduledRunTime == null) {
+ nextRunTime = baseTime.getTime() + this.initialDelay;
+ } else {
+ if (this.rate) {
+ nextRunTime = lastScheduledRunTime.getTime() + this.period;
+ } else {
+ nextRunTime = baseTime.getTime() + this.period;
+ }
+ }
+ return new Date(nextRunTime);
+ }
+
+ public boolean skipRun(Future<?> arg0, Date arg1) {
+ return false;
+ }
+
+ }
+
+ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
+ long initialDelay,
+ long period,
+ TimeUnit unit) {
+ return scheduleAtFixedRate(command, initialDelay, period, unit, null);
+ }
+
+ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
+ long initialDelay,
+ long period,
+ TimeUnit unit,
+ ManagedTaskListener listener) {
+ if (command == null || unit == null) {
+ throw new NullPointerException();
+ }
+ if (period <= 0) {
+ throw new IllegalArgumentException();
+ }
+ if (initialDelay < 0) initialDelay = 0;
+ PeriodicTrigger trigger =
+ new PeriodicTrigger(unit.toMillis(initialDelay), unit.toMillis(period), true);
+ return schedule(command, trigger, listener);
+ }
+
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
+ long initialDelay,
+ long delay,
+ TimeUnit unit) {
+ return scheduleWithFixedDelay(command, initialDelay, delay, unit, null);
+ }
+
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
+ long initialDelay,
+ long delay,
+ TimeUnit unit,
+ ManagedTaskListener listener) {
+ if (command == null || unit == null) {
+ throw new NullPointerException();
+ }
+ if (delay <= 0) {
+ throw new IllegalArgumentException();
+ }
+ if (initialDelay < 0) initialDelay = 0;
+ PeriodicTrigger trigger =
+ new PeriodicTrigger(unit.toMillis(initialDelay), unit.toMillis(delay), false);
+ return schedule(command, trigger, listener);
+ }
+
+
+ // Override ExecutorService methods
+
+ public void execute(Runnable command) {
+ schedule(command, null, 0, TimeUnit.NANOSECONDS, null);
+ }
+
+ public Future<?> submit(Runnable task) {
+ return schedule(task, null, 0, TimeUnit.NANOSECONDS, null);
+ }
+
+ public <T> Future<T> submit(Runnable task, T result) {
+ return schedule(task, result, 0, TimeUnit.NANOSECONDS, null);
+ }
+
+ public <T> Future<T> submit(Callable<T> task) {
+ return schedule(task, 0, TimeUnit.NANOSECONDS, null);
+ }
+
+ public Future<?> submit(Runnable task, ManagedTaskListener listener) {
+ return schedule(task, null, 0, TimeUnit.NANOSECONDS, listener);
+ }
+
+ public <T> Future<T> submit(Runnable task, T result, ManagedTaskListener listener) {
+ return schedule(task, result, 0, TimeUnit.NANOSECONDS, listener);
+ }
+
+ public <T> Future<T> submit(Callable<T> task, ManagedTaskListener listener) {
+ return schedule(task, 0, TimeUnit.NANOSECONDS, listener);
+ }
+
+ /*
+ * This is called by invokeAll/invokeAny functions.
+ */
+ protected void executeTask(ManagedFutureTask<?> task) {
+ // ManagedFutureTask will get wrapped in ScheduledFutureTask
+ delayedExecute(task);
+ }
+
+ // Policy methods
+
+ /**
+ * Set policy on whether to continue executing existing periodic
+ * tasks even when this executor has been <tt>shutdown</tt>. In
+ * this case, these tasks will only terminate upon
+ * <tt>shutdownNow</tt>, or after setting the policy to
+ * <tt>false</tt> when already shutdown. This value is by default
+ * false.
+ * @param value if true, continue after shutdown, else don't.
+ * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
+ */
+ public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
+ continueExistingPeriodicTasksAfterShutdown = value;
+ if (!value && isShutdown())
+ cancelUnwantedTasks();
+ }
+
+ /**
+ * Get the policy on whether to continue executing existing
+ * periodic tasks even when this executor has been
+ * <tt>shutdown</tt>. In this case, these tasks will only
+ * terminate upon <tt>shutdownNow</tt> or after setting the policy
+ * to <tt>false</tt> when already shutdown. This value is by
+ * default false.
+ * @return true if will continue after shutdown.
+ * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
+ */
+ public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
+ return continueExistingPeriodicTasksAfterShutdown;
+ }
+
+ /**
+ * Set policy on whether to execute existing delayed
+ * tasks even when this executor has been <tt>shutdown</tt>. In
+ * this case, these tasks will only terminate upon
+ * <tt>shutdownNow</tt>, or after setting the policy to
+ * <tt>false</tt> when already shutdown. This value is by default
+ * true.
+ * @param value if true, execute after shutdown, else don't.
+ * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
+ */
+ public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
+ executeExistingDelayedTasksAfterShutdown = value;
+ if (!value && isShutdown())
+ cancelUnwantedTasks();
+ }
+
+ /**
+ * Get policy on whether to execute existing delayed
+ * tasks even when this executor has been <tt>shutdown</tt>. In
+ * this case, these tasks will only terminate upon
+ * <tt>shutdownNow</tt>, or after setting the policy to
+ * <tt>false</tt> when already shutdown. This value is by default
+ * true.
+ * @return true if will execute after shutdown.
+ * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
+ */
+ public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
+ return executeExistingDelayedTasksAfterShutdown;
+ }
+
+
+ /**
+ * Initiates an orderly shutdown in which previously submitted
+ * tasks are executed, but no new tasks will be accepted. If the
+ * <tt>ExecuteExistingDelayedTasksAfterShutdownPolicy</tt> has
+ * been set <tt>false</tt>, existing delayed tasks whose delays
+ * have not yet elapsed are cancelled. And unless the
+ * <tt>ContinueExistingPeriodicTasksAfterShutdownPolicy</tt> has
+ * been set <tt>true</tt>, future executions of existing periodic
+ * tasks will be cancelled.
+ */
+ public void shutdown() {
+ cancelUnwantedTasks();
+ super.shutdown();
+ }
+
+ /**
+ * Attempts to stop all actively executing tasks, halts the
+ * processing of waiting tasks, and returns a list of the tasks that were
+ * awaiting execution.
+ *
+ * <p>There are no guarantees beyond best-effort attempts to stop
+ * processing actively executing tasks. This implementation
+ * cancels tasks via {@link Thread#interrupt}, so if any tasks mask or
+ * fail to respond to interrupts, they may never terminate.
+ *
+ * @return list of tasks that never commenced execution. Each
+ * element of this list is a {@link ScheduledFuture},
+ * including those tasks submitted using <tt>execute</tt>, which
+ * are for scheduling purposes used as the basis of a zero-delay
+ * <tt>ScheduledFuture</tt>.
+ */
+ public List<Runnable> shutdownNow() {
+ return super.shutdownNow();
+ }
+
+ /**
+ * Returns the task queue used by this executor. Each element of
+ * this queue is a {@link ScheduledFuture}, including those
+ * tasks submitted using <tt>execute</tt> which are for scheduling
+ * purposes used as the basis of a zero-delay
+ * <tt>ScheduledFuture</tt>. Iteration over this queue is
+ * <em>not</em> guaranteed to traverse tasks in the order in
+ * which they will execute.
+ *
+ * @return the task queue
+ */
+ public BlockingQueue<Runnable> getQueue() {
+ return super.getQueue();
+ }
+
+ /**
+ * An annoying wrapper class to convince generics compiler to
+ * use a DelayQueue<ScheduledFutureTask> as a BlockingQueue<Runnable>
+ */
+ private static class DelayedWorkQueue
+ extends AbstractCollection<Runnable>
+ implements BlockingQueue<Runnable> {
+
+ private final DelayQueue<ScheduledFutureTask> dq = new DelayQueue<ScheduledFutureTask>();
+ public Runnable poll() { return dq.poll(); }
+ public Runnable peek() { return dq.peek(); }
+ public Runnable take() throws InterruptedException { return dq.take(); }
+ public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
+ return dq.poll(timeout, unit);
+ }
+
+ public boolean add(Runnable x) { return dq.add((ScheduledFutureTask)x); }
+ public boolean offer(Runnable x) { return dq.offer((ScheduledFutureTask)x); }
+ public void put(Runnable x) {
+ dq.put((ScheduledFutureTask)x);
+ }
+ public boolean offer(Runnable x, long timeout, TimeUnit unit) {
+ return dq.offer((ScheduledFutureTask)x, timeout, unit);
+ }
+
+ public Runnable remove() { return dq.remove(); }
+ public Runnable element() { return dq.element(); }
+ public void clear() { dq.clear(); }
+ public int drainTo(Collection<? super Runnable> c) { return dq.drainTo(c); }
+ public int drainTo(Collection<? super Runnable> c, int maxElements) {
+ return dq.drainTo(c, maxElements);
+ }
+
+ public int remainingCapacity() { return dq.remainingCapacity(); }
+ public boolean remove(Object x) { return dq.remove(x); }
+ public boolean contains(Object x) { return dq.contains(x); }
+ public int size() { return dq.size(); }
+ public boolean isEmpty() { return dq.isEmpty(); }
+ public Object[] toArray() { return dq.toArray(); }
+ public <T> T[] toArray(T[] array) { return dq.toArray(array); }
+ public Iterator<Runnable> iterator() {
+ return new Iterator<Runnable>() {
+ private Iterator<ScheduledFutureTask> it = dq.iterator();
+ public boolean hasNext() { return it.hasNext(); }
+ public Runnable next() { return it.next(); }
+ public void remove() { it.remove(); }
+ };
+ }
+ }
+
+}
Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/AbstractManagedScheduledExecutorService.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ComponentManagedExecutorService.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ComponentManagedExecutorService.java?rev=634792&view=auto
==============================================================================
--- geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ComponentManagedExecutorService.java (added)
+++ geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ComponentManagedExecutorService.java Fri Mar 7 11:56:14 2008
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.concurrent.executor;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.util.concurrent.ManagedTaskListener;
+import javax.util.concurrent.ManagedThreadFactory;
+
+import org.apache.geronimo.concurrent.ManagedContext;
+import org.apache.geronimo.concurrent.ManagedContextHandler;
+import org.apache.geronimo.concurrent.thread.ManagedThreadFactoryUtils;
+
+/**
+ * Component-managed implementation of {@link ManagedExecutorService}.
+ * In component-managed implementation {@link ManagedTaskListener} callbacks
+ * must execute within the context of the thread that created the executor.
+ * <BR>
+ * Some key methods are overridden to set the right context on the thread submitting
+ * the tasks.
+ */
+public class ComponentManagedExecutorService
+ extends AbstractManagedExecutorService {
+
+ private ManagedContext managedContext;
+
+ public ComponentManagedExecutorService(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ ManagedThreadFactory threadFactory,
+ ManagedContextHandler contextHandler) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
+ workQueue, threadFactory);
+
+ // save context now
+ this.managedContext = ManagedContext.captureContext(contextHandler);
+
+ setThreadFactory(ManagedThreadFactoryUtils.createEmbeddedThreadFactory(threadFactory,
+ this.managedContext));
+ }
+
+ protected ManagedContext getManagedContext() {
+ return this.managedContext;
+ }
+
+ protected void executeTask(ManagedFutureTask<?> task) {
+ task.setSetContextOnRun(false);
+ super.executeTask(task);
+ }
+
+ /*
+ * TODO: some of these functions could be optimized a bit. When listener == null
+ * the context does not have to be set on the current thread since
+ * there is no listener methods to call.
+ */
+
+ public <T> T invokeAny(Collection<Callable<T>> tasks,
+ ManagedTaskListener listener)
+ throws InterruptedException,
+ ExecutionException {
+ if (tasks == null) {
+ throw new NullPointerException();
+ }
+
+ Map<String, Object> threadContext = this.managedContext.set();
+ try {
+ return super.invokeAny(tasks, listener);
+ } finally {
+ this.managedContext.unset(threadContext);
+ }
+ }
+
+ public <T> T invokeAny(Collection<Callable<T>> tasks,
+ long timeout,
+ TimeUnit unit,
+ ManagedTaskListener listener)
+ throws InterruptedException,
+ ExecutionException,
+ TimeoutException {
+ if (tasks == null) {
+ throw new NullPointerException();
+ }
+
+ Map<String, Object> threadContext = this.managedContext.set();
+ try {
+ return super.invokeAny(tasks, timeout, unit, listener);
+ } finally {
+ this.managedContext.unset(threadContext);
+ }
+ }
+ public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks,
+ ManagedTaskListener listener)
+ throws InterruptedException {
+ if (tasks == null) {
+ throw new NullPointerException();
+ }
+
+ Map<String, Object> threadContext = this.managedContext.set();
+ try {
+ return super.invokeAll(tasks, listener);
+ } finally {
+ this.managedContext.unset(threadContext);
+ }
+ }
+
+ public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks,
+ long timeout,
+ TimeUnit unit,
+ ManagedTaskListener listener) throws InterruptedException {
+ if (tasks == null || unit == null) {
+ throw new NullPointerException();
+ }
+
+ Map<String, Object> threadContext = this.managedContext.set();
+ try {
+ return super.invokeAll(tasks, timeout, unit, listener);
+ } finally {
+ this.managedContext.unset(threadContext);
+ }
+ }
+
+ public <T> Future<T> submit(Runnable task, T result, ManagedTaskListener listener) {
+ if (task == null) {
+ throw new NullPointerException();
+ }
+
+ Map<String, Object> threadContext = this.managedContext.set();
+ try {
+ return super.submit(task, result, listener);
+ } finally {
+ this.managedContext.unset(threadContext);
+ }
+ }
+
+ public <T> Future<T> submit(Callable<T> task, ManagedTaskListener listener) {
+ if (task == null) {
+ throw new NullPointerException();
+ }
+
+ Map<String, Object> threadContext = this.managedContext.set();
+ try {
+ return super.submit(task, listener);
+ } finally {
+ this.managedContext.unset(threadContext);
+ }
+ }
+
+}
Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ComponentManagedExecutorService.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ComponentManagedExecutorService.java
------------------------------------------------------------------------------
svn:executable =
Added: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ComponentManagedScheduledExecutorService.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ComponentManagedScheduledExecutorService.java?rev=634792&view=auto
==============================================================================
--- geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ComponentManagedScheduledExecutorService.java (added)
+++ geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ComponentManagedScheduledExecutorService.java Fri Mar 7 11:56:14 2008
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.concurrent.executor;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.util.concurrent.ManagedScheduledExecutorService;
+import javax.util.concurrent.ManagedTaskListener;
+import javax.util.concurrent.ManagedThreadFactory;
+import javax.util.concurrent.Trigger;
+
+import org.apache.geronimo.concurrent.ManagedContext;
+import org.apache.geronimo.concurrent.ManagedContextHandler;
+import org.apache.geronimo.concurrent.thread.ManagedThreadFactoryUtils;
+
+/**
+ * Component-managed implementation of {@link ManagedScheduledExecutorService}.
+ * In component-managed implementation {@link ManagedTaskListener} callbacks
+ * must execute within the context of the thread that created the executor. <BR>
+ * Some key methods are overridden to set the right context on the thread
+ * submitting the tasks.
+ */
+public class ComponentManagedScheduledExecutorService
+ extends AbstractManagedScheduledExecutorService {
+
+ private ManagedContext managedContext;
+
+ public ComponentManagedScheduledExecutorService(int corePoolSize,
+ ManagedThreadFactory threadFactory,
+ ManagedContextHandler contextHandler) {
+ super(corePoolSize, threadFactory);
+
+ // save context now
+ this.managedContext = ManagedContext.captureContext(contextHandler);
+
+ setThreadFactory(ManagedThreadFactoryUtils.createEmbeddedThreadFactory(threadFactory,
+ this.managedContext));
+ }
+
+ protected ManagedContext getManagedContext() {
+ return this.managedContext;
+ }
+
+ protected void preExecute(ManagedFutureTask<?> task) {
+ task.setSetContextOnRun(false);
+ super.preExecute(task);
+ }
+
+ /*
+ * TODO: some of these functions could be optimized a bit. When listener == null
+ * the context does not have to be set on the current thread since
+ * there is no listener methods to call.
+ */
+
+ public <T> T invokeAny(Collection<Callable<T>> tasks,
+ ManagedTaskListener listener)
+ throws InterruptedException,
+ ExecutionException {
+ if (tasks == null) {
+ throw new NullPointerException();
+ }
+
+ Map<String, Object> threadContext = this.managedContext.set();
+ try {
+ return super.invokeAny(tasks, listener);
+ } finally {
+ this.managedContext.unset(threadContext);
+ }
+ }
+
+ public <T> T invokeAny(Collection<Callable<T>> tasks,
+ long timeout,
+ TimeUnit unit,
+ ManagedTaskListener listener)
+ throws InterruptedException,
+ ExecutionException,
+ TimeoutException {
+ if (tasks == null) {
+ throw new NullPointerException();
+ }
+
+ Map<String, Object> threadContext = this.managedContext.set();
+ try {
+ return super.invokeAny(tasks, timeout, unit, listener);
+ } finally {
+ this.managedContext.unset(threadContext);
+ }
+ }
+ public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks,
+ ManagedTaskListener listener)
+ throws InterruptedException {
+ if (tasks == null) {
+ throw new NullPointerException();
+ }
+
+ Map<String, Object> threadContext = this.managedContext.set();
+ try {
+ return super.invokeAll(tasks, listener);
+ } finally {
+ this.managedContext.unset(threadContext);
+ }
+ }
+
+ public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks,
+ long timeout,
+ TimeUnit unit,
+ ManagedTaskListener listener) throws InterruptedException {
+ if (tasks == null || unit == null) {
+ throw new NullPointerException();
+ }
+
+ Map<String, Object> threadContext = this.managedContext.set();
+ try {
+ return super.invokeAll(tasks, timeout, unit, listener);
+ } finally {
+ this.managedContext.unset(threadContext);
+ }
+ }
+
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable,
+ long delay,
+ TimeUnit unit,
+ ManagedTaskListener listener) {
+ if (callable == null || unit == null) {
+ throw new NullPointerException();
+ }
+
+ Map<String, Object> threadContext = this.managedContext.set();
+ try {
+ return super.schedule(callable, delay, unit, listener);
+ } finally {
+ this.managedContext.unset(threadContext);
+ }
+ }
+
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable,
+ Trigger trigger,
+ ManagedTaskListener listener) {
+ if (callable == null || trigger == null) {
+ throw new NullPointerException();
+ }
+
+ Map<String, Object> threadContext = this.managedContext.set();
+ try {
+ return super.schedule(callable, trigger, listener);
+ } finally {
+ this.managedContext.unset(threadContext);
+ }
+ }
+
+ protected <T> ScheduledFuture<T> schedule(Runnable command,
+ T result,
+ long delay,
+ TimeUnit unit,
+ ManagedTaskListener listener) {
+ if (command == null || unit == null) {
+ throw new NullPointerException();
+ }
+
+ Map<String, Object> threadContext = this.managedContext.set();
+ try {
+ return super.schedule(command, result, delay, unit, listener);
+ } finally {
+ this.managedContext.unset(threadContext);
+ }
+ }
+
+ public ScheduledFuture<?> schedule(Runnable command,
+ Trigger trigger,
+ ManagedTaskListener listener) {
+ if (command == null || trigger == null) {
+ throw new NullPointerException();
+ }
+
+ Map<String, Object> threadContext = this.managedContext.set();
+ try {
+ return super.schedule(command, trigger, listener);
+ } finally {
+ this.managedContext.unset(threadContext);
+ }
+ }
+
+ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
+ long initialDelay,
+ long period,
+ TimeUnit unit,
+ ManagedTaskListener listener) {
+ if (command == null || unit == null) {
+ throw new NullPointerException();
+ }
+ if (period <= 0) {
+ throw new IllegalArgumentException();
+ }
+
+ Map<String, Object> threadContext = this.managedContext.set();
+ try {
+ return super.scheduleAtFixedRate(command, initialDelay, period, unit, listener);
+ } finally {
+ this.managedContext.unset(threadContext);
+ }
+ }
+
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
+ long initialDelay,
+ long delay,
+ TimeUnit unit,
+ ManagedTaskListener listener) {
+ if (command == null || unit == null) {
+ throw new NullPointerException();
+ }
+ if (delay <= 0) {
+ throw new IllegalArgumentException();
+ }
+
+ Map<String, Object> threadContext = this.managedContext.set();
+ try {
+ return super.scheduleAtFixedRate(command, initialDelay, delay, unit, listener);
+ } finally {
+ this.managedContext.unset(threadContext);
+ }
+ }
+
+}
Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ComponentManagedScheduledExecutorService.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ComponentManagedScheduledExecutorService.java
------------------------------------------------------------------------------
svn:executable =
Added: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedExecutorCompletionService.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedExecutorCompletionService.java?rev=634792&view=auto
==============================================================================
--- geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedExecutorCompletionService.java (added)
+++ geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedExecutorCompletionService.java Fri Mar 7 11:56:14 2008
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This class is based on and borrows code from java.util.concurrent.ExecutorCompletionService
+ * class in Apache Harmony.
+ */
+package org.apache.geronimo.concurrent.executor;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.geronimo.concurrent.ManagedContext;
+import org.apache.geronimo.concurrent.ManagedTaskListenerSupport;
+
+public class ManagedExecutorCompletionService<V> {
+ private final AbstractManagedExecutorService executor;
+ private final BlockingQueue<Future<V>> completionQueue;
+
+ private ManagedContext managedContext;
+ private ManagedTaskListenerSupport listenerSupport;
+
+ /**
+ * FutureTask extension to enqueue upon completion
+ */
+ private class QueueingManagedFuture extends ManagedFutureTask<V> {
+ QueueingManagedFuture(Callable<V> callable,
+ ManagedContext managedContext,
+ ManagedTaskListenerSupport listenerSupport) {
+ super(callable, managedContext, listenerSupport);
+ }
+ protected void done() { completionQueue.add(this); }
+ }
+
+ public ManagedExecutorCompletionService(AbstractManagedExecutorService executor,
+ ManagedContext managedContext,
+ ManagedTaskListenerSupport listenerSupport) {
+ this(executor, new LinkedBlockingQueue<Future<V>>(), managedContext, listenerSupport);
+ }
+
+ public ManagedExecutorCompletionService(AbstractManagedExecutorService executor,
+ BlockingQueue<Future<V>> completionQueue,
+ ManagedContext managedContext,
+ ManagedTaskListenerSupport listenerSupport) {
+ if (executor == null || completionQueue == null) {
+ throw new NullPointerException();
+ }
+ this.executor = executor;
+ this.completionQueue = completionQueue;
+ this.managedContext = managedContext;
+ this.listenerSupport = listenerSupport;
+ }
+
+ public Future<V> submit(Callable<V> task) {
+ if (task == null) {
+ throw new NullPointerException();
+ }
+
+ QueueingManagedFuture managedFuture =
+ new QueueingManagedFuture(task, this.managedContext, this.listenerSupport);
+ this.executor.executeTask(managedFuture);
+
+ return managedFuture;
+ }
+
+ public Future<V> take() throws InterruptedException {
+ return completionQueue.take();
+ }
+
+ public Future<V> poll() {
+ return completionQueue.poll();
+ }
+
+ public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
+ return completionQueue.poll(timeout, unit);
+ }
+
+}
+
Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedExecutorCompletionService.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedExecutorCompletionService.java
------------------------------------------------------------------------------
svn:executable =
Added: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedExecutorServiceFacade.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedExecutorServiceFacade.java?rev=634792&view=auto
==============================================================================
--- geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedExecutorServiceFacade.java (added)
+++ geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedExecutorServiceFacade.java Fri Mar 7 11:56:14 2008
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.concurrent.executor;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.util.concurrent.ManagedExecutorService;
+import javax.util.concurrent.ManagedTaskListener;
+
+/**
+ * A facade class for ManagedExecutorService. If facade is configured as serverManaged
+ * all lifecycle operations of this class will throw {@link #IllegalArgumentException}.
+ */
+public class ManagedExecutorServiceFacade
+ implements ManagedExecutorService {
+
+ protected ManagedExecutorService executor;
+ protected boolean serverManaged;
+
+ public ManagedExecutorServiceFacade(ManagedExecutorService executor,
+ boolean serverManaged) {
+ if (executor == null) {
+ throw new NullPointerException();
+ }
+ this.executor = executor;
+ this.serverManaged = serverManaged;
+ }
+
+ public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks,
+ ManagedTaskListener listener)
+ throws InterruptedException {
+ return this.executor.invokeAll(tasks, listener);
+ }
+
+ public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks,
+ long timeout,
+ TimeUnit unit,
+ ManagedTaskListener listener)
+ throws InterruptedException {
+ return this.executor.invokeAll(tasks, timeout, unit, listener);
+ }
+
+ public <T> T invokeAny(Collection<Callable<T>> tasks,
+ ManagedTaskListener listener)
+ throws InterruptedException,
+ ExecutionException {
+ return this.executor.invokeAny(tasks, listener);
+ }
+
+ public <T> T invokeAny(Collection<Callable<T>> tasks,
+ long timeout,
+ TimeUnit unit,
+ ManagedTaskListener listener)
+ throws InterruptedException,
+ ExecutionException,
+ TimeoutException {
+ return this.executor.invokeAny(tasks, timeout, unit, listener);
+ }
+
+ public Future<?> submit(Runnable command, ManagedTaskListener listener) {
+ return this.executor.submit(command, listener);
+ }
+
+ public <T> Future<T> submit(Callable<T> callable, ManagedTaskListener listener) {
+ return this.executor.submit(callable, listener);
+ }
+
+ public <T> Future<T> submit(Runnable command, T result, ManagedTaskListener listener) {
+ return this.executor.submit(command, result, listener);
+ }
+
+ public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks)
+ throws InterruptedException {
+ return this.executor.invokeAll(tasks);
+ }
+
+ public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks,
+ long timeout,
+ TimeUnit unit)
+ throws InterruptedException {
+ return this.executor.invokeAll(tasks, timeout, unit);
+ }
+
+ public <T> T invokeAny(Collection<Callable<T>> tasks)
+ throws InterruptedException,
+ ExecutionException {
+ return this.executor.invokeAny(tasks);
+ }
+
+ public <T> T invokeAny(Collection<Callable<T>> tasks,
+ long timeout,
+ TimeUnit unit)
+ throws InterruptedException,
+ ExecutionException,
+ TimeoutException {
+ return this.executor.invokeAny(tasks, timeout, unit);
+ }
+
+ public <T> Future<T> submit(Callable<T> task) {
+ return this.executor.submit(task);
+ }
+
+ public Future<?> submit(Runnable task) {
+ return this.executor.submit(task);
+ }
+
+ public <T> Future<T> submit(Runnable task, T result) {
+ return this.executor.submit(task, result);
+ }
+
+ public void execute(Runnable command) {
+ this.executor.execute(command);
+ }
+
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ if (this.serverManaged) {
+ throw new IllegalArgumentException();
+ } else {
+ return this.executor.awaitTermination(timeout, unit);
+ }
+ }
+
+ public boolean isShutdown() {
+ if (this.serverManaged) {
+ throw new IllegalArgumentException();
+ } else {
+ return this.executor.isShutdown();
+ }
+ }
+
+ public boolean isTerminated() {
+ if (this.serverManaged) {
+ throw new IllegalArgumentException();
+ } else {
+ return this.executor.isTerminated();
+ }
+ }
+
+ public void shutdown() {
+ if (this.serverManaged) {
+ throw new IllegalArgumentException();
+ } else {
+ this.executor.shutdown();
+ }
+ }
+
+ public List<Runnable> shutdownNow() {
+ if (this.serverManaged) {
+ throw new IllegalArgumentException();
+ } else {
+ return this.executor.shutdownNow();
+ }
+ }
+
+}
Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedExecutorServiceFacade.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedExecutorServiceFacade.java
------------------------------------------------------------------------------
svn:executable =
Added: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedFutureTask.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedFutureTask.java?rev=634792&view=auto
==============================================================================
--- geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedFutureTask.java (added)
+++ geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedFutureTask.java Fri Mar 7 11:56:14 2008
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.concurrent.executor;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.geronimo.concurrent.ManagedContext;
+import org.apache.geronimo.concurrent.ManagedTask;
+import org.apache.geronimo.concurrent.ManagedTaskListenerSupport;
+import org.apache.geronimo.concurrent.ManagedTaskUtils;
+import org.apache.geronimo.concurrent.harmony.FutureTask;
+import org.apache.geronimo.concurrent.thread.ManagedThread;
+
+public class ManagedFutureTask<V> extends FutureTask<V> implements ManagedTask {
+
+ protected final static Log LOG = LogFactory.getLog(ManagedFutureTask.class);
+
+ protected ManagedContext managedContext;
+ protected ManagedTaskListenerSupport listenerSupport;
+
+ private Object task;
+ private boolean associateTaskWithThread;
+ protected boolean setContextOnRun = true;
+
+ public ManagedFutureTask(Callable<V> callable,
+ ManagedContext managedContext,
+ ManagedTaskListenerSupport listener) {
+ super(callable);
+ this.task = callable;
+ this.managedContext = managedContext;
+ this.listenerSupport = listener;
+ // always associate the task with the thread
+ this.associateTaskWithThread = true;
+ init();
+ }
+
+ public ManagedFutureTask(Runnable runnable, V result,
+ ManagedContext managedContext,
+ ManagedTaskListenerSupport listener) {
+ super(runnable, result);
+ this.task = runnable;
+ this.managedContext = managedContext;
+ this.listenerSupport = listener;
+ // if the runnable is ManagedFutureTask, do not associate it with the thread
+ // in this instance since the runnable will already do that
+ this.associateTaskWithThread = !(runnable instanceof ManagedFutureTask);
+ init();
+ }
+
+ private void init() {
+ if (this.listenerSupport != null && this.managedContext == null) {
+ throw new IllegalArgumentException("ManagedContext must be non-null if listener is specifed");
+ }
+ }
+
+ boolean isSetContextOnRun() {
+ return this.setContextOnRun;
+ }
+
+ /**
+ * Sets if the context should be applied when the {@link #run() method is executed.
+ * By default and in the server-managed executors the context is applied.
+ * In component-managed executors the context is not applied as the right context is
+ * already set by the thread.
+ */
+ void setSetContextOnRun(boolean setContextOnRun) {
+ this.setContextOnRun = setContextOnRun;
+ }
+
+ public ManagedTaskListenerSupport getManagedTaskListenerSupport() {
+ return this.listenerSupport;
+ }
+
+ public ManagedContext getManagedContext() {
+ return this.managedContext;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ boolean result = super.cancel(mayInterruptIfRunning);
+ if (result && this.listenerSupport != null) {
+ // always set the context
+ Map<String, Object> threadContext = this.managedContext.set();
+ try {
+ this.listenerSupport.taskAborted(this, new CancellationException());
+ } finally {
+ this.managedContext.unset(threadContext);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void run() {
+ if (this.setContextOnRun && this.managedContext != null) {
+ Map<String, Object> threadContext = null;
+ try {
+ threadContext = this.managedContext.set();
+ } catch (RuntimeException e) {
+ LOG.warn("Failed to apply context to thread for task " + this + ": " +
+ e.getMessage() + ". Cancelling task");
+ cancelFutureTask(false);
+ return;
+ }
+ try {
+ super.run();
+ } finally {
+ this.managedContext.unset(threadContext);
+ }
+ } else {
+ super.run();
+ }
+ }
+
+ protected boolean cancelFutureTask(boolean mayInterruptIfRunning) {
+ return super.cancel(mayInterruptIfRunning);
+ }
+
+ @Override
+ protected void taskStart() {
+ if (this.listenerSupport != null) {
+ this.listenerSupport.taskStarting(this);
+ }
+
+ if (this.associateTaskWithThread) {
+ // associate task with the thread
+ Thread thread = Thread.currentThread();
+ if (thread instanceof ManagedThread) {
+ ManagedThread managedThread = (ManagedThread)thread;
+ managedThread.startTask(this);
+ } else {
+ LOG.warn("taskStart was not called on ManagedThread: " + thread);
+ }
+ }
+ }
+
+ @Override
+ protected void taskDone(Throwable exception) {
+ if (this.listenerSupport != null) {
+ this.listenerSupport.taskDone(this, exception);
+ }
+
+ if (this.associateTaskWithThread) {
+ // de-associate task with the thread
+ Thread thread = Thread.currentThread();
+ if (thread instanceof ManagedThread) {
+ ManagedThread managedThread = (ManagedThread)thread;
+ managedThread.endTask();
+ }
+ }
+ }
+
+ public boolean cancel() {
+ return cancel(true);
+ }
+
+ public String getIdentityDescription(String locale) {
+ return ManagedTaskUtils.getTaskDescription(this.task, locale);
+ }
+
+ public String getIdentityName() {
+ return ManagedTaskUtils.getTaskName(this.task);
+ }
+
+}
Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedFutureTask.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedFutureTask.java
------------------------------------------------------------------------------
svn:executable =
Added: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedScheduledExecutorServiceFacade.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedScheduledExecutorServiceFacade.java?rev=634792&view=auto
==============================================================================
--- geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedScheduledExecutorServiceFacade.java (added)
+++ geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedScheduledExecutorServiceFacade.java Fri Mar 7 11:56:14 2008
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.concurrent.executor;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import javax.util.concurrent.ManagedScheduledExecutorService;
+import javax.util.concurrent.ManagedTaskListener;
+import javax.util.concurrent.Trigger;
+
+/**
+ * A facade class for ManagedScheduledExecutorService. If facade is configured as
+ * serverManaged all lifecycle operations of this class will throw
+ * {@link #IllegalArgumentException}.
+ */
+public class ManagedScheduledExecutorServiceFacade
+ extends ManagedExecutorServiceFacade
+ implements ManagedScheduledExecutorService {
+
+ public ManagedScheduledExecutorServiceFacade(ManagedScheduledExecutorService executor,
+ boolean serverManaged) {
+ super(executor, serverManaged);
+ }
+
+ private ManagedScheduledExecutorService getExecutor() {
+ return (ManagedScheduledExecutorService)this.executor;
+ }
+
+ public ScheduledFuture<?> schedule(Runnable command,
+ Trigger trigger,
+ ManagedTaskListener listener) {
+ return getExecutor().schedule(command, trigger, listener);
+ }
+
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable,
+ Trigger trigger,
+ ManagedTaskListener listener) {
+ return getExecutor().schedule(callable, trigger, listener);
+ }
+
+ public ScheduledFuture<?> schedule(Runnable command,
+ long initialDelay,
+ TimeUnit unit,
+ ManagedTaskListener listener) {
+ return getExecutor().schedule(command, initialDelay, unit, listener);
+ }
+
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable,
+ long initialDelay,
+ TimeUnit unit,
+ ManagedTaskListener listener) {
+ return getExecutor().schedule(callable, initialDelay, unit, listener);
+ }
+
+ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
+ long initialDelay,
+ long period,
+ TimeUnit unit,
+ ManagedTaskListener listener) {
+ return getExecutor().scheduleAtFixedRate(command, initialDelay, period, unit, listener);
+ }
+
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
+ long initialDelay,
+ long delay,
+ TimeUnit unit,
+ ManagedTaskListener listener) {
+ return getExecutor().scheduleWithFixedDelay(command, initialDelay, delay, unit, listener);
+ }
+
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+ return getExecutor().schedule(command, delay, unit);
+ }
+
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+ return getExecutor().schedule(callable, delay, unit);
+ }
+
+ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
+ long initialDelay,
+ long period,
+ TimeUnit unit) {
+ return getExecutor().scheduleAtFixedRate(command, initialDelay, period, unit);
+ }
+
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
+ long initialDelay,
+ long delay,
+ TimeUnit unit) {
+ return getExecutor().scheduleWithFixedDelay(command, initialDelay, delay, unit);
+ }
+
+}
Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedScheduledExecutorServiceFacade.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ManagedScheduledExecutorServiceFacade.java
------------------------------------------------------------------------------
svn:executable =
Added: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ServerManagedExecutorService.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ServerManagedExecutorService.java?rev=634792&view=auto
==============================================================================
--- geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ServerManagedExecutorService.java (added)
+++ geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ServerManagedExecutorService.java Fri Mar 7 11:56:14 2008
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.concurrent.executor;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.geronimo.concurrent.ManagedContext;
+import org.apache.geronimo.concurrent.ManagedContextHandler;
+
+public class ServerManagedExecutorService
+ extends AbstractManagedExecutorService {
+
+ protected ManagedContextHandler contextHandler;
+
+ public ServerManagedExecutorService(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ ThreadFactory threadFactory,
+ ManagedContextHandler contextHandler) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
+ workQueue, threadFactory);
+ this.contextHandler = contextHandler;
+ }
+
+ protected ManagedContext getManagedContext() {
+ return ManagedContext.captureContext(this.contextHandler);
+ }
+
+}
Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ServerManagedExecutorService.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ServerManagedExecutorService.java
------------------------------------------------------------------------------
svn:executable =
Added: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ServerManagedScheduledExecutorService.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ServerManagedScheduledExecutorService.java?rev=634792&view=auto
==============================================================================
--- geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ServerManagedScheduledExecutorService.java (added)
+++ geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ServerManagedScheduledExecutorService.java Fri Mar 7 11:56:14 2008
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.concurrent.executor;
+
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.geronimo.concurrent.ManagedContext;
+import org.apache.geronimo.concurrent.ManagedContextHandler;
+
+public class ServerManagedScheduledExecutorService
+ extends AbstractManagedScheduledExecutorService {
+
+ protected ManagedContextHandler contextHandler;
+
+ public ServerManagedScheduledExecutorService(int corePoolSize,
+ ThreadFactory threadFactory,
+ ManagedContextHandler contextHandler) {
+ super(corePoolSize, threadFactory);
+ this.contextHandler = contextHandler;
+ }
+
+ protected ManagedContext getManagedContext() {
+ return ManagedContext.captureContext(this.contextHandler);
+ }
+
+}
Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ServerManagedScheduledExecutorService.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/concurrent/geronimo-concurrent-core/src/main/java/org/apache/geronimo/concurrent/executor/ServerManagedScheduledExecutorService.java
------------------------------------------------------------------------------
svn:executable =