You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/02/20 15:23:26 UTC

svn commit: r746251 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/dispatch/ test/java/org/apache/activemq/flow/

Author: chirino
Date: Fri Feb 20 14:23:26 2009
New Revision: 746251

URL: http://svn.apache.org/viewvc?rev=746251&view=rev
Log:
Applying Colin's patch https://issues.apache.org/activemq/browse/AMQ-2129  Thanks!


Added:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/TimerHeap.java
Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java?rev=746251&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java Fri Feb 20 14:23:26 2009
@@ -0,0 +1,158 @@
+package org.apache.activemq.dispatch;
+
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+public abstract class AbstractPooledDispatcher<D extends IDispatcher> implements IDispatcher, PooledDispatcher<D> {
+    
+    private final String name;
+
+    private final ThreadLocal<D> dispatcher = new ThreadLocal<D>();
+    private final ThreadLocal<PooledDispatchContext<D>> dispatcherContext = new ThreadLocal<PooledDispatchContext<D>>();
+    private final ArrayList<D> dispatchers = new ArrayList<D>();
+
+    final AtomicBoolean started = new AtomicBoolean();
+    final AtomicBoolean shutdown = new AtomicBoolean();
+
+    private int roundRobinCounter = 0;
+    private final int size;
+
+    protected ExecutionLoadBalancer<D> loadBalancer;
+
+    protected AbstractPooledDispatcher(String name, int size) {
+        this.name = name;
+        this.size = size;
+        loadBalancer = new SimpleLoadBalancer<D>();
+    }
+
+    /**
+     * Subclasses should implement this to return a new dispatcher.
+     * 
+     * @param name
+     *            The name to assign the dispatcher.
+     * @param pool
+     *            The pool.
+     * @return The new dispathcer.
+     */
+    protected abstract D createDispatcher(String name, AbstractPooledDispatcher<D> pool) throws Exception;
+
+    /**
+     * @see org.apache.activemq.dispatch.IDispatcher#start()
+     */
+    public synchronized final void start() throws Exception {
+        loadBalancer.start();
+        if (started.compareAndSet(false, true)) {
+            // Create all the workers.
+            try {
+                for (int i = 0; i < size; i++) {
+                    D dispatacher = createDispatcher(name + "-" + (i + 1), this);
+
+                    dispatchers.add(dispatacher);
+                    dispatacher.start();
+                }
+            } catch (Exception e) {
+                shutdown();
+            }
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
+     */
+    public synchronized final void shutdown() throws InterruptedException {
+        shutdown.set(true);
+        boolean interrupted = false;
+        while (!dispatchers.isEmpty()) {
+            try {
+                dispatchers.get(dispatchers.size() - 1).shutdown();
+            } catch (InterruptedException ie) {
+                interrupted = true;
+                continue;
+            }
+            dispatchers.remove(dispatchers.size() - 1);
+
+        }
+        // Re-interrupt:
+        if (interrupted) {
+            Thread.currentThread().interrupt();
+        }
+
+        loadBalancer.stop();
+    }
+
+    public void setCurrentDispatchContext(PooledDispatchContext<D> context) {
+        dispatcherContext.set(context);
+    }
+
+    public PooledDispatchContext<D> getCurrentDispatchContext() {
+        return dispatcherContext.get();
+    }
+
+    /**
+     * Returns the currently executing dispatcher, or null if the current thread
+     * is not a dispatcher:
+     * 
+     * @return The currently executing dispatcher
+     */
+    public D getCurrentDispatcher() {
+        return dispatcher.get();
+    }
+
+    /**
+     * A Dispatcher must call this to indicate that is has started it's dispatch
+     * loop.
+     */
+    public void onDispatcherStarted(D d) {
+        dispatcher.set(d);
+        loadBalancer.addDispatcher(d);
+    }
+
+    public ExecutionLoadBalancer<D> getLoadBalancer() {
+        return loadBalancer;
+    }
+
+    /**
+     * A Dispatcher must call this when exiting it's dispatch loop
+     */
+    public void onDispatcherStopped(D d) {
+        loadBalancer.removeDispatcher(d);
+    }
+
+    protected D chooseDispatcher() {
+        D d = dispatcher.get();
+        if (d == null) {
+            synchronized (dispatchers) {
+                if (++roundRobinCounter >= size) {
+                    roundRobinCounter = 0;
+                }
+                return dispatchers.get(roundRobinCounter);
+            }
+        } else {
+            return d;
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable,
+     * long, java.util.concurrent.TimeUnit)
+     */
+    public void schedule(final Runnable runnable, long delay, TimeUnit timeUnit) {
+        chooseDispatcher().schedule(runnable, delay, timeUnit);
+    }
+
+    public DispatchContext register(Dispatchable dispatchable, String name) {
+        return chooseDispatcher().register(dispatchable, name);
+    }
+
+    public String toString() {
+        return name;
+    }
+
+}

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java?rev=746251&r1=746250&r2=746251&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java Fri Feb 20 14:23:26 2009
@@ -16,24 +16,23 @@
  */
 package org.apache.activemq.dispatch;
 
-import org.apache.activemq.dispatch.PooledDispatcher.PoolableDispatcher;
 import org.apache.activemq.dispatch.PooledDispatcher.PooledDispatchContext;
 
-public interface ExecutionLoadBalancer {
+public interface ExecutionLoadBalancer<D extends IDispatcher> {
 
-	public interface ExecutionTracker {
-		public void onDispatchRequest(PoolableDispatcher caller, PooledDispatchContext context);
+    public interface ExecutionTracker<D extends IDispatcher> {
+        public void onDispatchRequest(D caller, PooledDispatchContext<D> context);
 
-		public void close();
-	}
+        public void close();
+    }
 
-	public void addDispatcher(PoolableDispatcher dispatcher);
+    public void addDispatcher(D dispatcher);
 
-	public void removeDispatcher(PoolableDispatcher dispatcher);
+    public void removeDispatcher(D dispatcher);
 
-	public ExecutionTracker createExecutionTracker(PooledDispatchContext context);
+    public ExecutionTracker<D> createExecutionTracker(PooledDispatchContext<D> context);
 
-	public void start();
+    public void start();
 
-	public void stop();
+    public void stop();
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java?rev=746251&r1=746250&r2=746251&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java Fri Feb 20 14:23:26 2009
@@ -79,10 +79,10 @@
         public void close();
     }
 
-    class RunnableAdapter implements Dispatchable {
+    public class RunnableAdapter implements Dispatchable {
         final Runnable runnable;
 
-        RunnableAdapter(Runnable runnable) {
+        public RunnableAdapter(Runnable runnable) {
             this.runnable = runnable;
         }
 
@@ -117,7 +117,7 @@
     /**
      * Starts the dispatcher.
      */
-    public void start();
+    public void start() throws Exception;
 
     /**
      * Shuts down the dispatcher, this may result in previous dispatch requests

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java?rev=746251&r1=746250&r2=746251&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java Fri Feb 20 14:23:26 2009
@@ -16,86 +16,66 @@
  */
 package org.apache.activemq.dispatch;
 
+import org.apache.activemq.dispatch.ExecutionLoadBalancer.ExecutionTracker;
 import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
 
-public interface PooledDispatcher {
+public interface PooledDispatcher<D extends IDispatcher> {
 
-	/**
-	 * A {@link PooledDispatchContext}s can be moved between different
-	 * dispatchers.
-	 */
-	public interface PooledDispatchContext extends DispatchContext {
-		/**
-		 * Called to transfer a {@link PooledDispatchContext} to a new
-		 * Dispatcher.
-		 */
-		public void assignToNewDispatcher(PoolableDispatcher newDispatcher);
-
-		/**
-		 * A dispatcher must call this when it starts dispatch for this context
-		 */
-		public void startingDispatch();
-
-		/**
-		 * A dispatcher must call this when it has finished dispatching a
-		 * context
-		 */
-		public void finishedDispatch();
-
-		/**
-		 * Called by the dispatch thread to let the pooled context set any info
-		 * set by other threads.
-		 */
-		public void processForeignUpdates();
-	}
-
-	public interface PoolableDispatchContext extends DispatchContext {
-
-		public void setPooledDispatchContext(PooledDispatchContext context);
-
-		/**
-		 * Indicates that another thread has made an update to the dispatch
-		 * context.
-		 * 
-		 */
-		public void onForeignThreadUpdate();
-
-		public PoolableDispatcher getDispatcher();
-	}
-
-	/**
-	 * A PoolableDispatcher is one that can be owned by an
-	 * {@link PooledDispatcher}.
-	 */
-	public interface PoolableDispatcher extends IDispatcher {
-
-		/**
-		 * Indicates that another thread has made an update to the dispatch
-		 * context.
-		 * 
-		 */
-		public PoolableDispatchContext createPoolableDispatchContext(Dispatchable dispatchable, String name);
-	}
-
-	/**
-	 * This wraps the dispatch context into one that is load balanced by the
-	 * LoadBalancer
-	 * 
-	 * @param context
-	 *            The context to wrap.
-	 * @return
-	 */
-	public PooledDispatchContext createPooledDispatchContext(PoolableDispatchContext context);
-
-	/**
-	 * A Dispatcher must call this from it's dispatcher thread to indicate that
-	 * is has started it's dispatch has started.
-	 */
-	public void onDispatcherStarted(PoolableDispatcher dispatcher);
-
-	/**
-	 * A Dispatcher must call this from it's dispatcher thread when exiting it's
-	 * dispatch loop
-	 */
-	public void onDispatcherStopped(PoolableDispatcher dispatcher);
+    /**
+     * A {@link PooledDispatchContext}s can be moved between different
+     * dispatchers.
+     */
+    public interface PooledDispatchContext<D extends IDispatcher> extends DispatchContext {
+        /**
+         * Called to transfer a {@link PooledDispatchContext} to a new
+         * Dispatcher.
+         */
+        public void assignToNewDispatcher(D newDispatcher);
+
+        /**
+         * Gets the dispatcher to which this PooledDispatchContext currently
+         * belongs
+         * 
+         * @return
+         */
+        public D getDispatcher();
+
+        /**
+         * Gets the execution tracker for the context.
+         * 
+         * @return the execution tracker for the context:
+         */
+        public ExecutionTracker<D> getExecutionTracker();
+    }
+
+    /**
+     * A Dispatcher must call this from it's dispatcher thread to indicate that
+     * is has started it's dispatch has started.
+     */
+    public void onDispatcherStarted(D dispatcher);
+
+    /**
+     * A Dispatcher must call this from it's dispatcher thread when exiting it's
+     * dispatch loop
+     */
+    public void onDispatcherStopped(D dispatcher);
+
+    /**
+     * Returns the currently executing dispatcher, or null if the current thread
+     * is not a dispatcher:
+     * 
+     * @return The currently executing dispatcher
+     */
+    public D getCurrentDispatcher();
+
+    public void setCurrentDispatchContext(PooledDispatchContext<D> context);
+
+    public PooledDispatchContext<D> getCurrentDispatchContext();
+
+    /**
+     * Returns the load balancer for this dispatch pool.
+     * 
+     * @return
+     */
+    public ExecutionLoadBalancer<D> getLoadBalancer();
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java?rev=746251&r1=746250&r2=746251&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java Fri Feb 20 14:23:26 2009
@@ -16,449 +16,564 @@
  */
 package org.apache.activemq.dispatch;
 
-import java.util.LinkedList;
-import java.util.TreeMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.activemq.dispatch.PooledDispatcher.PoolableDispatchContext;
-import org.apache.activemq.dispatch.PooledDispatcher.PoolableDispatcher;
+import org.apache.activemq.dispatch.ExecutionLoadBalancer.ExecutionTracker;
 import org.apache.activemq.dispatch.PooledDispatcher.PooledDispatchContext;
 import org.apache.activemq.queue.Mapper;
 import org.apache.kahadb.util.LinkedNode;
 import org.apache.kahadb.util.LinkedNodeList;
 
-public class PriorityDispatcher implements Runnable, PoolableDispatcher {
+public class PriorityDispatcher<D extends PriorityDispatcher<D>> implements Runnable, IDispatcher {
 
-	private Thread thread;
-	private boolean running = false;
-	private boolean threaded = false;
-	private final int MAX_USER_PRIORITY;
-
-	static final ThreadLocal<PriorityDispatcher> dispatcher = new ThreadLocal<PriorityDispatcher>();
-
-	private final PooledDispatcher pooledDispatcher;
-
-	// The local dispatch queue:
-	private final PriorityLinkedList<PriorityDispatchContext> priorityQueue;
-
-	// Dispatch queue for requests from other threads:
-	private final LinkedNodeList<ForeignEvent>[] foreignQueue;
-	private static final int[] TOGGLE = new int[] { 1, 0 };
-	private int foreignToggle = 0;
-
-	// Timed Execution List
-	private final TimerHeap timerHeap = new TimerHeap();
-
-	private final String name;
-	private final AtomicBoolean foreignAvailable = new AtomicBoolean(false);
-	private final Semaphore foreignPermits = new Semaphore(0);
-
-	private final Mapper<Integer, PriorityDispatchContext> PRIORITY_MAPPER = new Mapper<Integer, PriorityDispatchContext>() {
-		public Integer map(PriorityDispatchContext element) {
-			return element.listPrio;
-		}
-	};
-
-	public PriorityDispatcher(String name, int priorities, PooledDispatcher pooledDispactcher) {
-		this.name = name;
-		MAX_USER_PRIORITY = priorities;
-		priorityQueue = new PriorityLinkedList<PriorityDispatchContext>(MAX_USER_PRIORITY + 1, PRIORITY_MAPPER);
-		foreignQueue = createForeignEventQueue();
-		for (int i = 0; i < 2; i++) {
-			foreignQueue[i] = new LinkedNodeList<ForeignEvent>();
-		}
-		this.pooledDispatcher = pooledDispactcher;
-	}
-
-	@SuppressWarnings("unchecked")
-	private LinkedNodeList<ForeignEvent>[] createForeignEventQueue() {
-		return new LinkedNodeList[2];
-	}
-
-	private abstract class ForeignEvent extends LinkedNode<ForeignEvent> {
-		public abstract void execute();
-
-		final void addToList() {
-			synchronized (foreignQueue) {
-				if (!this.isLinked()) {
-					foreignQueue[foreignToggle].addLast(this);
-					if (!foreignAvailable.getAndSet(true)) {
-						foreignPermits.release();
-					}
-				}
-			}
-		}
-	}
-
-	public boolean isThreaded() {
-		return threaded;
-	}
-
-	public void setThreaded(boolean threaded) {
-		this.threaded = threaded;
-	}
-
-	private class UpdateEvent extends ForeignEvent {
-		private final PriorityDispatchContext pdc;
-
-		UpdateEvent(PriorityDispatchContext pdc) {
-			this.pdc = pdc;
-		}
-
-		// Can only be called by the owner of this dispatch context:
-		public void execute() {
-			pdc.poolContext.processForeignUpdates();
-		}
-	}
-
-	class PriorityDispatchContext extends LinkedNode<PriorityDispatchContext> implements PoolableDispatchContext {
-		// The dispatchable target:
-		final Dispatchable dispatchable;
-		PooledDispatchContext poolContext;
-		// The name of this context:
-		final String name;
-		// list prio can only be updated in the thread of of this dispatcher:
-		int listPrio;
-		// The update events are used to update fields in the dispatch context
-		// from foreign threads:
-		final UpdateEvent updateEvent[] = new UpdateEvent[] { new UpdateEvent(this), new UpdateEvent(this) };
-
-		private PriorityDispatchContext(Dispatchable dispatchable, boolean persistent, String name) {
-			super();
-			this.dispatchable = dispatchable;
-			this.name = name;
-		}
-
-		// This can only be called on this thread
-		public final void requestDispatch() {
-			if (!isLinked()) {
-				priorityQueue.add(this, listPrio);
-			}
-			return;
-		}
-
-		// This can only be called on this thread
-		public final void updatePriority(int priority) {
-			if (priority != listPrio) {
-
-				listPrio = priority;
-				// If there is a priority change relink the context
-				// at the new priority:
-				if (isLinked()) {
-					unlink();
-					priorityQueue.add(this, listPrio);
-				}
-			}
-			return;
-
-		}
-
-		public void onForeignThreadUpdate() {
-			synchronized (foreignQueue) {
-				updateEvent[foreignToggle].addToList();
-			}
-		}
-
-		// This can only be called on this thread
-		public void close() {
-			if (isLinked()) {
-				unlink();
-			}
-			synchronized (foreignQueue) {
-				if (updateEvent[foreignToggle].isLinked()) {
-					updateEvent[foreignToggle].unlink();
-				}
-			}
-		}
-
-		/**
-		 * This can only be called by the owning dispatch thread:
-		 * 
-		 * @return False if the dispatchable has more work to do.
-		 */
-		public final boolean dispatch() {
-			return dispatchable.dispatch();
-		}
-
-		public String toString() {
-			return name;
-		}
-
-		public Dispatchable getDispatchable() {
-			return dispatchable;
-		}
-
-		public void setPooledDispatchContext(PooledDispatchContext context) {
-			this.poolContext = context;
-		}
-
-		public String getName() {
-			return name;
-		}
-
-		public PoolableDispatcher getDispatcher() {
-			return PriorityDispatcher.this;
-		}
-	}
-
-	public DispatchContext register(Dispatchable dispatchable, String name) {
-		return createPoolableDispatchContext(dispatchable, name);
-	}
-
-	public PoolableDispatchContext createPoolableDispatchContext(Dispatchable dispatchable, String name) {
-		return new PriorityDispatchContext(dispatchable, true, name);
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see org.apache.activemq.dispatch.IDispatcher#start()
-	 */
-	public synchronized final void start() {
-		if (thread == null) {
-			running = true;
-			thread = new Thread(this, name);
-			thread.start();
-		}
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
-	 */
-	public synchronized final void shutdown() throws InterruptedException {
-		if (thread != null) {
-			dispatch(new RunnableAdapter(new Runnable() {
-
-				public void run() {
-					running = false;
-				}
-
-			}), MAX_USER_PRIORITY + 1);
-			thread.interrupt();
-			thread.join();
-			thread = null;
-		}
-	}
-
-	public void run() {
-
-		// Inform the dispatcher that we have started:
-		pooledDispatcher.onDispatcherStarted(this);
-		dispatcher.set(this);
-		PriorityDispatchContext pdc;
-		try {
-			while (running) {
-				pdc = priorityQueue.poll();
-				// If no local work available wait for foreign work:
-				if (pdc == null) {
-					foreignPermits.acquire();
-				} else {
-					pdc.poolContext.startingDispatch();
-
-					while (!pdc.dispatch()) {
-						// If there is a higher priority dispatchable stop
-						// processing this one:
-						if (pdc.listPrio < priorityQueue.getHighestPriority()) {
-							// May have gotten relinked by the caller:
-							if (!pdc.isLinked()) {
-								priorityQueue.add(pdc, pdc.listPrio);
-							}
-							break;
-						}
-					}
-
-					pdc.poolContext.finishedDispatch();
-
-				}
-
-				// Execute delayed events:
-				timerHeap.executeReadyEvents();
-
-				// Check for foreign dispatch requests:
-				if (foreignAvailable.get()) {
-					LinkedNodeList<ForeignEvent> foreign;
-					synchronized (foreignQueue) {
-						// Swap foreign queues and drain permits;
-						foreign = foreignQueue[foreignToggle];
-						foreignToggle = TOGGLE[foreignToggle];
-						foreignAvailable.set(false);
-						foreignPermits.drainPermits();
-					}
-					while (true) {
-						ForeignEvent fe = foreign.getHead();
-						if (fe == null) {
-							break;
-						}
-
-						fe.unlink();
-						fe.execute();
-					}
-
-				}
-			}
-		} catch (InterruptedException e) {
-			return;
-		} catch (Throwable thrown) {
-			thrown.printStackTrace();
-		} finally {
-			pooledDispatcher.onDispatcherStopped(this);
-		}
-	}
-
-	class ThreadSafeDispatchContext implements PooledDispatchContext {
-		final PriorityDispatchContext delegate;
-
-		ThreadSafeDispatchContext(PriorityDispatchContext context) {
-			this.delegate = context;
-			delegate.setPooledDispatchContext(this);
-		}
-
-		public void finishedDispatch() {
-			// NOOP
-
-		}
-
-		public void startingDispatch() {
-			// Noop
-
-		}
-
-		public void close() {
-			// Noop this is always transient:
-		}
-
-		public void processForeignUpdates() {
-			requestDispatch();
-		}
-
-		public Dispatchable getDispatchable() {
-			return delegate.getDispatchable();
-		}
-
-		public void requestDispatch() {
-			if (dispatcher.get() == PriorityDispatcher.this) {
-				delegate.requestDispatch();
-			} else {
-				delegate.onForeignThreadUpdate();
-			}
-		}
-
-		public void updatePriority(int priority) {
-			throw new UnsupportedOperationException("Not implemented");
-		}
-
-		public String getName() {
-			return delegate.name;
-		}
-
-		public void assignToNewDispatcher(PoolableDispatcher newDispatcher) {
-		}
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see org.apache.activemq.dispatch.IDispatcher#dispatch(org.apache.activemq
-	 *      .dispatch.Dispatcher.Dispatchable)
-	 */
-	final void dispatch(Dispatchable dispatchable, int priority) {
-		ThreadSafeDispatchContext context = new ThreadSafeDispatchContext(new PriorityDispatchContext(dispatchable, false, name));
-		context.delegate.updatePriority(priority);
-		context.requestDispatch();
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see org.apache.activemq.dispatch.IDispatcher#createPriorityExecutor(int)
-	 */
-	public Executor createPriorityExecutor(final int priority) {
-
-		return new Executor() {
-
-			public void execute(final Runnable runnable) {
-				dispatch(new RunnableAdapter(runnable), priority);
-			}
-		};
-	}
-
-	public void execute(final Runnable runnable) {
-		dispatch(new RunnableAdapter(runnable), 0);
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable,
-	 *      long, java.util.concurrent.TimeUnit)
-	 */
-	public void schedule(final Runnable runnable, final long delay, final TimeUnit timeUnit) {
-		if (dispatcher.get() == this) {
-			timerHeap.add(runnable, delay, timeUnit);
-		} else {
-			new ForeignEvent() {
-				public void execute() {
-					timerHeap.add(runnable, delay, timeUnit);
-				}
-			}.addToList();
-		}
-	}
-
-	public String toString() {
-		return name;
-	}
-
-	private class TimerHeap {
-
-		final TreeMap<Long, LinkedList<Runnable>> timers = new TreeMap<Long, LinkedList<Runnable>>();
-
-		private void add(Runnable runnable, long delay, TimeUnit timeUnit) {
-
-			long nanoDelay = timeUnit.convert(delay, TimeUnit.NANOSECONDS);
-			long eTime = System.nanoTime() + nanoDelay;
-			LinkedList<Runnable> list = new LinkedList<Runnable>();
-			list.add(runnable);
-
-			LinkedList<Runnable> old = timers.put(eTime, list);
-			if (old != null) {
-				list.addAll(old);
-			}
-		}
-
-		private void executeReadyEvents() {
-			LinkedList<Runnable> ready = null;
-			if (timers.isEmpty()) {
-				return;
-			} else {
-				long now = System.nanoTime();
-				long first = timers.firstKey();
-				if (first > now) {
-					return;
-				}
-				ready = new LinkedList<Runnable>();
-
-				while (first < now) {
-					ready.addAll(timers.remove(first));
-					if (timers.isEmpty()) {
-						break;
-					}
-					first = timers.firstKey();
-
-				}
-			}
-
-			for (Runnable runnable : ready) {
-				try {
-					runnable.run();
-				} catch (Throwable thrown) {
-					thrown.printStackTrace();
-				}
-			}
-		}
-	}
+    private static final boolean DEBUG = false;
+    private Thread thread;
+    protected boolean running = false;
+    private boolean threaded = false;
+    private final int MAX_USER_PRIORITY;
+
+    // Set if this dispatcher is part of a dispatch pool:
+    protected final PooledDispatcher<D> pooledDispatcher;
+
+    // The local dispatch queue:
+    private final PriorityLinkedList<PriorityDispatchContext> priorityQueue;
+
+    // Dispatch queue for requests from other threads:
+    private final LinkedNodeList<ForeignEvent>[] foreignQueue;
+    private static final int[] TOGGLE = new int[] { 1, 0 };
+    private int foreignToggle = 0;
+
+    // Timed Execution List
+    protected final TimerHeap timerHeap = new TimerHeap();
+
+    private final String name;
+    private final AtomicBoolean foreignAvailable = new AtomicBoolean(false);
+    private final Semaphore foreignPermits = new Semaphore(0);
+
+    private final Mapper<Integer, PriorityDispatchContext> PRIORITY_MAPPER = new Mapper<Integer, PriorityDispatchContext>() {
+        public Integer map(PriorityDispatchContext element) {
+            return element.listPrio;
+        }
+    };
+
+    protected PriorityDispatcher(String name, int priorities, PooledDispatcher<D> pooledDispactcher) {
+        this.name = name;
+        MAX_USER_PRIORITY = priorities;
+        priorityQueue = new PriorityLinkedList<PriorityDispatchContext>(MAX_USER_PRIORITY + 1, PRIORITY_MAPPER);
+        foreignQueue = createForeignEventQueue();
+        for (int i = 0; i < 2; i++) {
+            foreignQueue[i] = new LinkedNodeList<ForeignEvent>();
+        }
+        this.pooledDispatcher = pooledDispactcher;
+    }
+
+    public static final IDispatcher createPriorityDispatcher(String name, int numPriorities) {
+        return new PriorityDispatcher(name, numPriorities, null);
+    }
+
+    public static final IDispatcher createPriorityDispatchPool(String name, final int numPriorities, int size) {
+        return new AbstractPooledDispatcher<PriorityDispatcher>(name, size) {
+
+            @Override
+            protected final PriorityDispatcher createDispatcher(String name, AbstractPooledDispatcher<PriorityDispatcher> pool) throws Exception {
+                // TODO Auto-generated method stub
+                return new PriorityDispatcher(name, numPriorities, this);
+            }
+
+            public final Executor createPriorityExecutor(final int priority) {
+                return new Executor() {
+                    public void execute(final Runnable runnable) {
+                        chooseDispatcher().dispatch(new RunnableAdapter(runnable), priority);
+                    }
+                };
+            }
+        };
+    }
+
+    @SuppressWarnings("unchecked")
+    private LinkedNodeList<ForeignEvent>[] createForeignEventQueue() {
+        return new LinkedNodeList[2];
+    }
+
+    protected abstract class ForeignEvent extends LinkedNode<ForeignEvent> {
+        public abstract void execute();
+
+        final void addToList() {
+            synchronized (foreignQueue) {
+                if (!this.isLinked()) {
+                    foreignQueue[foreignToggle].addLast(this);
+                    if (!foreignAvailable.getAndSet(true)) {
+                        wakeup();
+                    }
+                }
+            }
+        }
+    }
+
+    public boolean isThreaded() {
+        return threaded;
+    }
+
+    public void setThreaded(boolean threaded) {
+        this.threaded = threaded;
+    }
+
+    private class UpdateEvent extends ForeignEvent {
+        private final PriorityDispatchContext pdc;
+
+        UpdateEvent(PriorityDispatchContext pdc) {
+            this.pdc = pdc;
+        }
+
+        // Can only be called by the owner of this dispatch context:
+        public void execute() {
+            pdc.processForeignUpdates();
+        }
+    }
+
+    public DispatchContext register(Dispatchable dispatchable, String name) {
+        return new PriorityDispatchContext(dispatchable, true, name);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.dispatch.IDispatcher#start()
+     */
+    public synchronized final void start() {
+        if (thread == null) {
+            running = true;
+            thread = new Thread(this, name);
+            thread.start();
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
+     */
+    public synchronized final void shutdown() throws InterruptedException {
+        if (thread != null) {
+            dispatch(new RunnableAdapter(new Runnable() {
+
+                public void run() {
+                    running = false;
+                }
+
+            }), MAX_USER_PRIORITY + 1);
+            // thread.interrupt();
+            thread.join();
+            thread = null;
+        }
+    }
+
+    public void run() {
+
+        // Inform the dispatcher that we have started:
+        pooledDispatcher.onDispatcherStarted((D) this);
+        PriorityDispatchContext pdc;
+        try {
+            while (running) {
+                pdc = priorityQueue.poll();
+                // If no local work available wait for foreign work:
+                if (pdc == null) {
+                    waitForEvents();
+                } else {
+                    if (pdc.tracker != null) {
+                        pooledDispatcher.setCurrentDispatchContext(pdc);
+                    }
+
+                    while (!pdc.dispatch()) {
+                        // If there is a higher priority dispatchable stop
+                        // processing this one:
+                        if (pdc.listPrio < priorityQueue.getHighestPriority()) {
+                            // May have gotten relinked by the caller:
+                            if (!pdc.isLinked()) {
+                                priorityQueue.add(pdc, pdc.listPrio);
+                            }
+                            break;
+                        }
+                    }
+
+                    pooledDispatcher.setCurrentDispatchContext(null);
+                }
+
+                // Execute delayed events:
+                timerHeap.executeReadyTimers();
+
+                // Allow subclasses to do additional work:
+                dispatchHook();
+
+                // Check for foreign dispatch requests:
+                if (foreignAvailable.get()) {
+                    LinkedNodeList<ForeignEvent> foreign;
+                    synchronized (foreignQueue) {
+                        // Swap foreign queues and drain permits;
+                        foreign = foreignQueue[foreignToggle];
+                        foreignToggle = TOGGLE[foreignToggle];
+                        foreignAvailable.set(false);
+                        foreignPermits.drainPermits();
+                    }
+                    while (true) {
+                        ForeignEvent fe = foreign.getHead();
+                        if (fe == null) {
+                            break;
+                        }
+
+                        fe.unlink();
+                        fe.execute();
+                    }
+
+                }
+            }
+        } catch (InterruptedException e) {
+            return;
+        } catch (Throwable thrown) {
+            thrown.printStackTrace();
+        } finally {
+            pooledDispatcher.onDispatcherStopped((D) this);
+        }
+    }
+
+    /**
+     * Subclasses may override this to do do additional dispatch work:
+     */
+    protected void dispatchHook() throws Exception {
+
+    }
+
+    /**
+     * Subclasses may override this to implement another mechanism for wakeup.
+     * 
+     * @throws Exception
+     */
+    protected void waitForEvents() throws Exception {
+        foreignPermits.acquire();
+    }
+
+    /**
+     * Subclasses may override this to provide an alternative wakeup mechanism.
+     */
+    protected void wakeup() {
+        foreignPermits.release();
+    }
+
+    protected final void onForeignUdate(PriorityDispatchContext context) {
+        synchronized (foreignQueue) {
+
+            ForeignEvent fe = context.updateEvent[foreignToggle];
+            if (!fe.isLinked()) {
+                foreignQueue[foreignToggle].addLast(fe);
+                if (!foreignAvailable.getAndSet(true)) {
+                    wakeup();
+                }
+            }
+        }
+    }
+
+    protected final boolean removeDispatchContext(PriorityDispatchContext context) {
+        synchronized (foreignQueue) {
+
+            if (context.updateEvent[0].isLinked()) {
+                context.updateEvent[0].unlink();
+            }
+            if (context.updateEvent[1].isLinked()) {
+                context.updateEvent[1].unlink();
+            }
+            if (context.isLinked()) {
+                context.unlink();
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.activemq.dispatch.IDispatcher#dispatch(org.apache.activemq
+     * .dispatch.Dispatcher.Dispatchable)
+     */
+    public final void dispatch(Dispatchable dispatchable, int priority) {
+        PriorityDispatchContext context = new PriorityDispatchContext(dispatchable, false, name);
+        context.updatePriority(priority);
+        context.requestDispatch();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.dispatch.IDispatcher#createPriorityExecutor(int)
+     */
+    public Executor createPriorityExecutor(final int priority) {
+
+        return new Executor() {
+
+            public void execute(final Runnable runnable) {
+                dispatch(new RunnableAdapter(runnable), priority);
+            }
+        };
+    }
+
+    public void execute(final Runnable runnable) {
+        dispatch(new RunnableAdapter(runnable), 0);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable,
+     * long, java.util.concurrent.TimeUnit)
+     */
+    public void schedule(final Runnable runnable, final long delay, final TimeUnit timeUnit) {
+        if (getCurrentDispatcher() == this) {
+            timerHeap.add(runnable, delay, timeUnit);
+        } else {
+            new ForeignEvent() {
+                public void execute() {
+                    timerHeap.add(runnable, delay, timeUnit);
+                }
+            }.addToList();
+        }
+    }
+
+    public String toString() {
+        return name;
+    }
+
+    private final D getCurrentDispatcher() {
+        return pooledDispatcher.getCurrentDispatcher();
+    }
+
+    private final PooledDispatchContext<D> getCurrentDispatchContext() {
+        return pooledDispatcher.getCurrentDispatchContext();
+    }
+
+    /**
+     * 
+     */
+    protected class PriorityDispatchContext extends LinkedNode<PriorityDispatchContext> implements PooledDispatchContext<D> {
+        // The dispatchable target:
+        final Dispatchable dispatchable;
+        // The name of this context:
+        final String name;
+        // list prio can only be updated in the thread of of the owning
+        // dispatcher
+        protected int listPrio;
+
+        // The update events are used to update fields in the dispatch context
+        // from foreign threads:
+        final UpdateEvent updateEvent[];
+
+        private final ExecutionTracker<D> tracker;
+        private D currentOwner;
+        private D updateDispatcher = null;
+
+        private int priority;
+        private boolean dispatchRequested = false;
+        private boolean closed = false;
+
+        protected PriorityDispatchContext(Dispatchable dispatchable, boolean persistent, String name) {
+            this.dispatchable = dispatchable;
+            this.name = name;
+            this.currentOwner = (D) PriorityDispatcher.this;
+            if (persistent) {
+                this.tracker = pooledDispatcher.getLoadBalancer().createExecutionTracker((PooledDispatchContext<D>) this);
+            } else {
+                this.tracker = null;
+            }
+            updateEvent = createUpdateEvent();
+            updateEvent[0] = new UpdateEvent(this);
+            updateEvent[1] = new UpdateEvent(this);
+        }
+
+        @SuppressWarnings("unchecked")
+        private final PriorityDispatcher<D>.UpdateEvent[] createUpdateEvent() {
+            return new PriorityDispatcher.UpdateEvent[2];
+        }
+
+        /**
+         * Gets the execution tracker for the context.
+         * 
+         * @return the execution tracker for the context:
+         */
+        public ExecutionTracker<D> getExecutionTracker() {
+            return tracker;
+        }
+
+        /**
+         * This can only be called by the owning dispatch thread:
+         * 
+         * @return False if the dispatchable has more work to do.
+         */
+        public final boolean dispatch() {
+            return dispatchable.dispatch();
+        }
+
+        public final void assignToNewDispatcher(D newDispatcher) {
+            synchronized (this) {
+
+                // If we're already set to this dispatcher
+                if (newDispatcher == currentOwner) {
+                    if (updateDispatcher == null || updateDispatcher == newDispatcher) {
+                        return;
+                    }
+                }
+
+                updateDispatcher = newDispatcher;
+                if (DEBUG)
+                    System.out.println(getName() + " updating to " + updateDispatcher);
+            }
+            currentOwner.onForeignUdate(this);
+        }
+
+        public void requestDispatch() {
+
+            D callingDispatcher = getCurrentDispatcher();
+            if (tracker != null)
+                tracker.onDispatchRequest(callingDispatcher, getCurrentDispatchContext());
+
+            // Otherwise this is coming off another thread, so we need to
+            // synchronize
+            // to protect against ownership changes:
+            synchronized (this) {
+                // If the owner of this context is the calling thread, then
+                // delegate to the dispatcher.
+                if (currentOwner == callingDispatcher) {
+
+                    if (!isLinked()) {
+                        currentOwner.priorityQueue.add(this, listPrio);
+                    }
+                    return;
+                }
+
+                dispatchRequested = true;
+            }
+            // FIXME Thread safety!
+            currentOwner.onForeignUdate(this);
+        }
+
+        public void updatePriority(int priority) {
+            if (this.priority == priority) {
+                return;
+            }
+            D callingDispatcher = getCurrentDispatcher();
+
+            // Otherwise this is coming off another thread, so we need to
+            // synchronize to protect against ownership changes:
+            synchronized (this) {
+                this.priority = priority;
+
+                // If this is called by the owning dispatcher, then we go ahead
+                // and update:
+                if (currentOwner == callingDispatcher) {
+
+                    if (priority != listPrio) {
+
+                        listPrio = priority;
+                        // If there is a priority change relink the context
+                        // at the new priority:
+                        if (isLinked()) {
+                            unlink();
+                            currentOwner.priorityQueue.add(this, listPrio);
+                        }
+                    }
+                    return;
+                }
+            }
+            // FIXME Thread safety!
+            currentOwner.onForeignUdate(this);
+        }
+
+        public void processForeignUpdates() {
+            boolean ownerChange = false;
+            synchronized (this) {
+
+                if (closed) {
+                    close();
+                    return;
+                }
+
+                if (updateDispatcher != null) {
+                    if (DEBUG) {
+                        System.out.println("Assigning " + getName() + " to " + updateDispatcher);
+                    }
+                    if (currentOwner.removeDispatchContext(this)) {
+                        dispatchRequested = true;
+                    }
+                    currentOwner = updateDispatcher;
+                    updateDispatcher = null;
+                    ownerChange = true;
+                } else {
+                    updatePriority(priority);
+
+                    if (dispatchRequested) {
+                        dispatchRequested = false;
+                        requestDispatch();
+                    }
+                }
+            }
+
+            if (ownerChange) {
+                currentOwner.onForeignUdate(this);
+            }
+        }
+
+        /**
+         * May be overriden by subclass to additional work on dispatcher switch
+         * 
+         * @param oldDispatcher
+         *            The old dispatcher
+         * @param newDispatcher
+         *            The new Dispatcher
+         */
+        protected void switchedDispatcher(D oldDispatcher, D newDispatcher) {
+
+        }
+
+        public void close() {
+            tracker.close();
+            D callingDispatcher = getCurrentDispatcher();
+            synchronized (this) {
+                closed = true;
+
+                // If the owner of this context is the calling thread, then
+                // delegate to the dispatcher.
+                if (currentOwner == callingDispatcher) {
+                    if (isLinked()) {
+                        unlink();
+                    }
+                    // FIXME Deadlock potential!
+                    synchronized (foreignQueue) {
+                        if (updateEvent[foreignToggle].isLinked()) {
+                            updateEvent[foreignToggle].unlink();
+                        }
+                    }
+                }
+            }
+            currentOwner.onForeignUdate(this);
+        }
+
+        public final String toString() {
+            return getName();
+        }
+
+        public Dispatchable getDispatchable() {
+            return dispatchable;
+        }
+
+        public D getDispatcher() {
+            return currentOwner;
+        }
+
+        public String getName() {
+            return name;
+        }
+    }
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java?rev=746251&r1=746250&r2=746251&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java Fri Feb 20 14:23:26 2009
@@ -1,307 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.dispatch;
-
-import java.util.ArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.activemq.dispatch.ExecutionLoadBalancer.ExecutionTracker;
-
-public class PriorityPooledDispatcher implements IDispatcher, PooledDispatcher {
-	private final String name;
-
-	private static final ThreadLocal<PooledDispatchContext> dispatchContext = new ThreadLocal<PooledDispatchContext>();
-	private static final ThreadLocal<PoolableDispatcher> dispatcher = new ThreadLocal<PoolableDispatcher>();
-
-	private final ArrayList<PriorityDispatcher> dispatchers = new ArrayList<PriorityDispatcher>();
-
-	final AtomicBoolean started = new AtomicBoolean();
-	final AtomicBoolean shutdown = new AtomicBoolean();
-
-	private int roundRobinCounter = 0;
-	private final int size;
-	private final boolean DEBUG = false;
-
-	private final ExecutionLoadBalancer loadBalancer;
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see org.apache.activemq.dispatch.IDispatcher#createPriorityExecutor(int)
-	 */
-	public Executor createPriorityExecutor(final int priority) {
-		return new Executor() {
-			public void execute(final Runnable runnable) {
-				chooseDispatcher().dispatch(new RunnableAdapter(runnable), priority);
-			}
-		};
-	}
-
-	public PriorityPooledDispatcher(String name, int size, int priorities) {
-		this.name = name;
-		this.size = size;
-		loadBalancer = new SimpleLoadBalancer();
-		// Create all the workers.
-		for (int i = 0; i < size; i++) {
-			PriorityDispatcher dispatcher = new PriorityDispatcher(name + "-" + (i + 1), priorities, this);
-			dispatchers.add(dispatcher);
-		}
-	}
-
-	public DispatchContext register(Dispatchable dispatchable, String name) {
-		return createPooledDispatchContext(chooseDispatcher().createPoolableDispatchContext(dispatchable, name));
-	}
-
-	/**
-	 * @see org.apache.activemq.dispatch.IDispatcher#start()
-	 */
-	public synchronized final void start() {
-		loadBalancer.start();
-		if (started.compareAndSet(false, true)) {
-			// Create all the workers.
-			for (int i = 0; i < size; i++) {
-				dispatchers.get(i).start();
-			}
-		}
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
-	 */
-	public synchronized final void shutdown() throws InterruptedException {
-		shutdown.set(true);
-		for (PriorityDispatcher dispatcher : dispatchers) {
-			dispatcher.shutdown();
-		}
-		loadBalancer.stop();
-	}
-
-	private PriorityDispatcher chooseDispatcher() {
-		PriorityDispatcher d = PriorityDispatcher.dispatcher.get();
-		if (d == null) {
-			synchronized (dispatchers) {
-				if (++roundRobinCounter >= size) {
-					roundRobinCounter = 0;
-				}
-				return dispatchers.get(roundRobinCounter);
-			}
-		} else {
-			return d;
-		}
-	}
-
-	public void execute(final Runnable runnable) {
-		chooseDispatcher().dispatch(new RunnableAdapter(runnable), 0);
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable,
-	 *      long, java.util.concurrent.TimeUnit)
-	 */
-	public void schedule(final Runnable runnable, long delay, TimeUnit timeUnit) {
-		chooseDispatcher().schedule(runnable, delay, timeUnit);
-	}
-
-	public PooledDispatchContext createPooledDispatchContext(PoolableDispatchContext context) {
-		return new PriorityPooledDispatchContext(context);
-	}
-
-	/**
-	 * A Dispatcher must call this to indicate that is has started it's dispatch
-	 * loop.
-	 */
-	public void onDispatcherStarted(PoolableDispatcher d) {
-		dispatcher.set(d);
-		loadBalancer.addDispatcher(d);
-	}
-
-	/**
-	 * A Dispatcher must call this when exiting it's dispatch loop
-	 */
-	public void onDispatcherStopped(PoolableDispatcher d) {
-		loadBalancer.removeDispatcher(d);
-	}
-
-	/**
-	 * ExecutionGraphNode tracks dispatch information for a
-	 * MappableDispatchContext.
-	 * 
-	 */
-	public class PriorityPooledDispatchContext implements PooledDispatchContext {
-		private final ExecutionTracker tracker;
-
-		private PoolableDispatchContext context;
-		private PoolableDispatcher currentOwner;
-		private int priority;
-		private boolean dispatchRequested = false;
-		private PoolableDispatcher updateDispatcher = null;
-		private boolean closed = false;
-
-		PriorityPooledDispatchContext(PoolableDispatchContext context) {
-			this.context = context;
-			this.context.setPooledDispatchContext(this);
-			this.currentOwner = context.getDispatcher();
-			this.tracker = loadBalancer.createExecutionTracker(this);
-
-		}
-
-		public final void startingDispatch() {
-			dispatchContext.set(this);
-		}
-
-		public final void finishedDispatch() {
-			dispatchContext.set(null);
-		}
-
-		public final void assignToNewDispatcher(PoolableDispatcher newDispatcher) {
-			synchronized (this) {
-
-				// If we're already set to this dispatcher
-				if (newDispatcher == currentOwner) {
-					if (updateDispatcher == null || updateDispatcher == newDispatcher) {
-						return;
-					}
-				}
-
-				updateDispatcher = newDispatcher;
-				if (DEBUG)
-					System.out.println(getName() + " updating to " + context.getDispatcher());
-			}
-			context.onForeignThreadUpdate();
-		}
-
-		public void requestDispatch() {
-
-			PoolableDispatcher callingDispatcher = dispatcher.get();
-
-			tracker.onDispatchRequest(callingDispatcher, dispatchContext.get());
-
-			// Otherwise this is coming off another thread, so we need to
-			// synchronize
-			// to protect against ownership changes:
-			synchronized (this) {
-				// If the owner of this context is the calling thread, then
-				// delegate to the dispatcher.
-				if (currentOwner == callingDispatcher) {
-
-					context.requestDispatch();
-					return;
-				}
-
-				dispatchRequested = true;
-			}
-			context.onForeignThreadUpdate();
-		}
-
-		public void updatePriority(int priority) {
-			if (this.priority == priority) {
-				return;
-			}
-			// Otherwise this is coming off another thread, so we need to
-			// synchronize to protect against ownership changes:
-			synchronized (this) {
-				this.priority = priority;
-
-				IDispatcher callingDispatcher = dispatcher.get();
-
-				// If the owner of this context is the calling thread, then
-				// delegate to the dispatcher.
-				if (currentOwner == callingDispatcher) {
-
-					context.updatePriority(priority);
-					return;
-				}
-			}
-			context.onForeignThreadUpdate();
-		}
-
-		public void processForeignUpdates() {
-			boolean ownerChange = false;
-			synchronized (this) {
-
-				if (closed) {
-					context.close();
-					return;
-				}
-
-				if (updateDispatcher != null) {
-					// Close the old context:
-					if (DEBUG) {
-						System.out.println("Assigning " + getName() + " to " + updateDispatcher);
-					}
-					context.close();
-
-					currentOwner = updateDispatcher;
-					updateDispatcher = null;
-					context = currentOwner.createPoolableDispatchContext(context.getDispatchable(), context.getName());
-					dispatchRequested = true;
-					context.updatePriority(priority);
-					context.setPooledDispatchContext(this);
-					ownerChange = true;
-				} else {
-					context.updatePriority(priority);
-
-					if (dispatchRequested) {
-						context.requestDispatch();
-						dispatchRequested = false;
-					}
-				}
-			}
-
-			if (ownerChange) {
-				context.onForeignThreadUpdate();
-			}
-		}
-
-		public void close() {
-			tracker.close();
-			synchronized (this) {
-				IDispatcher callingDispatcher = dispatcher.get();
-
-				// If the owner of this context is the calling thread, then
-				// delegate to the dispatcher.
-				if (currentOwner == callingDispatcher) {
-					context.close();
-					return;
-				}
-			}
-			context.onForeignThreadUpdate();
-		}
-
-		public final String toString() {
-			return context.toString();
-		}
-
-		public Dispatchable getDispatchable() {
-			return context.getDispatchable();
-		}
-
-		public String getName() {
-			return context.getName();
-		}
-	}
-
-	public String toString() {
-		return name;
-	}
-}

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java?rev=746251&r1=746250&r2=746251&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java Fri Feb 20 14:23:26 2009
@@ -19,117 +19,119 @@
 import java.util.HashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.activemq.dispatch.PooledDispatcher.PoolableDispatcher;
 import org.apache.activemq.dispatch.PooledDispatcher.PooledDispatchContext;
 
-public class SimpleLoadBalancer implements ExecutionLoadBalancer {
+public class SimpleLoadBalancer<D extends IDispatcher> implements ExecutionLoadBalancer<D> {
 
-	private final boolean DEBUG = false;
+    private final boolean DEBUG = false;
 
-	SimpleLoadBalancer() {
-	}
+    public SimpleLoadBalancer() {
+    }
 
-	private class ExecutionStats {
-		final PooledDispatchContext target;
-		final PooledDispatchContext source;
-		int count;
-
-		ExecutionStats(PooledDispatchContext source, PooledDispatchContext target) {
-			this.target = target;
-			this.source = source;
-		}
-
-		public String toString() {
-			return "Connection from: " + source + " to " + target;
-		}
-	}
-
-	public void addDispatcher(PoolableDispatcher dispatcher) {
-
-	}
-
-	public void removeDispatcher(PoolableDispatcher dispatcher) {
-
-	}
-
-	public void start() {
-	}
-
-	public void stop() {
-	}
-
-	public ExecutionTracker createExecutionTracker(PooledDispatchContext context) {
-		return new SimpleExecutionTracker(context);
-	}
-
-	private class SimpleExecutionTracker implements ExecutionTracker {
-		private final HashMap<PooledDispatchContext, ExecutionStats> sources = new HashMap<PooledDispatchContext, ExecutionStats>();
-		private final PooledDispatchContext context;
-		private final AtomicInteger work = new AtomicInteger(0);
-
-		private PooledDispatchContext singleSource;
-		private PoolableDispatcher currentOwner;
-
-		SimpleExecutionTracker(PooledDispatchContext context) {
-			this.context = context;
-		}
-
-		/**
-		 * This method is called to track which dispatch contexts are requesting
-		 * dispatch for the target context represented by this node.
-		 * 
-		 * This method is not threadsafe, the caller must ensure serialized
-		 * access to this method.
-		 * 
-		 * @param callngDispatcher
-		 *            The calling dispatcher.
-		 * @param context
-		 *            the originating dispatch context
-		 * @return True if this method resulted in the dispatch request being
-		 *         assigned to another dispatcher.
-		 */
-		public void onDispatchRequest(PoolableDispatcher callingDispatcher, PooledDispatchContext callingContext) {
-
-			if (callingContext != null) {
-				// Make sure we are being called by another node:
-				if (callingContext == null || callingContext == context) {
-					return;
-				}
-
-				// Optimize for single source case:
-				if (singleSource != callingContext) {
-					if (singleSource == null && sources.isEmpty()) {
-						singleSource = callingContext;
-						ExecutionStats stats = new ExecutionStats(callingContext, context);
-						sources.put(callingContext, stats);
-
-						// If this context only has a single source
-						// assign it to that source to minimize contention:
-						if (callingDispatcher != currentOwner) {
-							currentOwner = callingDispatcher;
-							if (DEBUG)
-								System.out.println("Assigning: " + context + " to " + callingContext + "'s  dispatcher: " + callingDispatcher);
-							context.assignToNewDispatcher(callingDispatcher);
-						}
-
-					} else {
-
-						ExecutionStats stats = sources.get(callingContext);
-						if (stats == null) {
-							stats = new ExecutionStats(callingContext, context);
-							sources.put(callingContext, stats);
-						}
-
-						if (singleSource != null) {
-							singleSource = null;
-						}
-					}
-				}
-				work.incrementAndGet();
-			}
-		}
-
-		public void close() {
-		}
-	}
+    @SuppressWarnings("hiding")
+    private class ExecutionStats<D extends IDispatcher> {
+        final PooledDispatchContext<D> target;
+        final PooledDispatchContext<D> source;
+        int count;
+
+        ExecutionStats(PooledDispatchContext<D> source, PooledDispatchContext<D> target) {
+            this.target = target;
+            this.source = source;
+        }
+
+        public String toString() {
+            return "Connection from: " + source + " to " + target;
+        }
+    }
+
+    public void addDispatcher(D dispatcher) {
+
+    }
+
+    public void removeDispatcher(D dispatcher) {
+
+    }
+
+    public void start() {
+    }
+
+    public void stop() {
+    }
+
+    public ExecutionTracker<D> createExecutionTracker(PooledDispatchContext<D> context) {
+        return new SimpleExecutionTracker<D>(context);
+    }
+
+    private class SimpleExecutionTracker<D extends IDispatcher> implements ExecutionTracker<D> {
+        private final HashMap<PooledDispatchContext<D>, ExecutionStats<D>> sources = new HashMap<PooledDispatchContext<D>, ExecutionStats<D>>();
+        private final PooledDispatchContext<D> context;
+        private final AtomicInteger work = new AtomicInteger(0);
+
+        private PooledDispatchContext<D> singleSource;
+        private IDispatcher currentOwner;
+
+        SimpleExecutionTracker(PooledDispatchContext<D> context) {
+            this.context = context;
+            currentOwner = context.getDispatcher();
+        }
+
+        /**
+         * This method is called to track which dispatch contexts are requesting
+         * dispatch for the target context represented by this node.
+         * 
+         * This method is not threadsafe, the caller must ensure serialized
+         * access to this method.
+         * 
+         * @param callngDispatcher
+         *            The calling dispatcher.
+         * @param context
+         *            the originating dispatch context
+         * @return True if this method resulted in the dispatch request being
+         *         assigned to another dispatcher.
+         */
+        public void onDispatchRequest(D callingDispatcher, PooledDispatchContext<D> callingContext) {
+
+            if (callingContext != null) {
+                // Make sure we are being called by another node:
+                if (callingContext == null || callingContext == context) {
+                    return;
+                }
+
+                // Optimize for single source case:
+                if (singleSource != callingContext) {
+                    if (singleSource == null && sources.isEmpty()) {
+                        singleSource = callingContext;
+                        ExecutionStats<D> stats = new ExecutionStats<D>(callingContext, context);
+                        sources.put(callingContext, stats);
+
+                        // If this context only has a single source
+                        // assign it to that source to minimize contention:
+                        if (callingDispatcher != currentOwner) {
+                            if (DEBUG)
+                                System.out.println("Assigning: " + context + " to " + callingContext + "'s  dispatcher: " + callingDispatcher + " From: " + currentOwner);
+
+                            currentOwner = callingDispatcher;
+                            context.assignToNewDispatcher(callingDispatcher);
+                        }
+
+                    } else {
+
+                        ExecutionStats<D> stats = sources.get(callingContext);
+                        if (stats == null) {
+                            stats = new ExecutionStats<D>(callingContext, context);
+                            sources.put(callingContext, stats);
+                        }
+
+                        if (singleSource != null) {
+                            singleSource = null;
+                        }
+                    }
+                }
+                work.incrementAndGet();
+            }
+        }
+
+        public void close() {
+        }
+    }
 }

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/TimerHeap.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/TimerHeap.java?rev=746251&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/TimerHeap.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/TimerHeap.java Fri Feb 20 14:23:26 2009
@@ -0,0 +1,71 @@
+package org.apache.activemq.dispatch;
+
+import java.util.LinkedList;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+public class TimerHeap {
+    final TreeMap<Long, LinkedList<Runnable>> timers = new TreeMap<Long, LinkedList<Runnable>>();
+
+    public final void add(Runnable runnable, long delay, TimeUnit timeUnit) {
+
+        long nanoDelay = timeUnit.convert(delay, TimeUnit.NANOSECONDS);
+        long eTime = System.nanoTime() + nanoDelay;
+        LinkedList<Runnable> list = new LinkedList<Runnable>();
+        list.add(runnable);
+
+        LinkedList<Runnable> old = timers.put(eTime, list);
+        if (old != null) {
+            list.addAll(old);
+        }
+    }
+    
+    /**
+     * Returns the time of the next scheduled event. 
+     * @return -1 if there are no events, otherwise the time that the next timer should fire.
+     */
+    public final long timeToNext() {
+        if(timers.isEmpty())
+        {
+            return -1;
+        }
+        else
+        {
+            return Math.max(0, timers.firstKey() - System.nanoTime());
+        }
+    }
+
+    /**
+     * Executes ready timers.
+     */
+    public final void executeReadyTimers() {
+        LinkedList<Runnable> ready = null;
+        if (timers.isEmpty()) {
+            return;
+        } else {
+            long now = System.nanoTime();
+            long first = timers.firstKey();
+            if (first > now) {
+                return;
+            }
+            ready = new LinkedList<Runnable>();
+
+            while (first < now) {
+                ready.addAll(timers.remove(first));
+                if (timers.isEmpty()) {
+                    break;
+                }
+                first = timers.firstKey();
+
+            }
+        }
+
+        for (Runnable runnable : ready) {
+            try {
+                runnable.run();
+            } catch (Throwable thrown) {
+                thrown.printStackTrace();
+            }
+        }
+    }
+}

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java?rev=746251&r1=746250&r2=746251&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java Fri Feb 20 14:23:26 2009
@@ -37,14 +37,14 @@
         public boolean match(Message message);
     }
 
-    final Router router=  new Router();
-    
+    final Router router = new Router();
+
     final ArrayList<RemoteConnection> connections = new ArrayList<RemoteConnection>();
     final ArrayList<RemoteProducer> producers = new ArrayList<RemoteProducer>();
     final ArrayList<RemoteConsumer> consumers = new ArrayList<RemoteConsumer>();
     final ArrayList<BrokerConnection> brokerConnections = new ArrayList<BrokerConnection>();
     final HashMap<Destination, MockQueue> queues = new HashMap<Destination, MockQueue>();
-    
+
     private TransportServer transportServer;
     private String uri;
     private String name;
@@ -63,7 +63,6 @@
         }
     }
 
-
     public void addQueue(MockQueue queue) {
         router.bind(queue, queue.getDestination());
         queues.put(queue.getDestination(), queue);
@@ -111,17 +110,15 @@
     }
 
     final void startServices() throws Exception {
-        
+
+        dispatcher.start();
+
         transportServer = TransportFactory.bind(new URI(uri));
         transportServer.setAcceptListener(this);
-        if(transportServer instanceof DispatchableTransportServer)
-        {
-        	((DispatchableTransportServer)transportServer).setDispatcher(dispatcher);
+        if (transportServer instanceof DispatchableTransportServer) {
+            ((DispatchableTransportServer) transportServer).setDispatcher(dispatcher);
         }
         transportServer.start();
-        
-        
-        dispatcher.start();
 
         for (MockQueue queue : queues.values()) {
             queue.start();
@@ -130,7 +127,7 @@
         for (RemoteConsumer connection : consumers) {
             connection.start();
         }
-        
+
         for (RemoteProducer connection : producers) {
             connection.start();
         }
@@ -155,7 +152,7 @@
     }
 
     public void onAcceptError(Exception error) {
-        System.out.println("Accept error: "+error);
+        System.out.println("Accept error: " + error);
         error.printStackTrace();
     }
 
@@ -186,5 +183,5 @@
     public boolean isStopping() {
         return stopping.get();
     }
-    
+
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=746251&r1=746250&r2=746251&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java Fri Feb 20 14:23:26 2009
@@ -24,7 +24,7 @@
 import junit.framework.TestCase;
 
 import org.apache.activemq.dispatch.IDispatcher;
-import org.apache.activemq.dispatch.PriorityPooledDispatcher;
+import org.apache.activemq.dispatch.PriorityDispatcher;
 import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.flow.Commands.Destination.DestinationBean;
 import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
@@ -91,6 +91,8 @@
 
     @Override
     protected void setUp() throws Exception {
+        dispatcher = PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", Message.MAX_PRIORITY, asyncThreadPoolSize);
+        
         if( tcp ) {
             sendBrokerURI = "tcp://localhost:10000?wireFormat=proto";
             receiveBrokerURI = "tcp://localhost:20000?wireFormat=proto";
@@ -105,6 +107,21 @@
         }
     }
     
+    public void test_1_1_0() throws Exception {
+        producerCount = 1;
+        destCount = 1;
+
+        createConnections();
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+    
     public void test_1_1_1() throws Exception {
         producerCount = 1;
         destCount = 1;
@@ -340,9 +357,7 @@
 
     private void createConnections() throws IOException, URISyntaxException {
 
-        dispatcher = new PriorityPooledDispatcher("BrokerDispatcher", asyncThreadPoolSize, Message.MAX_PRIORITY);
         FlowController.setFlowExecutor(dispatcher.createPriorityExecutor(Message.MAX_PRIORITY));
-                
         if (multibroker) {
             sendBroker = createBroker("SendBroker", sendBrokerURI);
             rcvBroker = createBroker("RcvBroker", receiveBrokerURI);

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java?rev=746251&r1=746250&r2=746251&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java Fri Feb 20 14:23:26 2009
@@ -50,7 +50,7 @@
         if(transport instanceof DispatchableTransport)
         {
             DispatchableTransport dt = ((DispatchableTransport)transport);
-            dt.setName(name);
+            dt.setName(name + "-client-transport");
             dt.setDispatcher(getDispatcher());
         }
         super.setTransport(transport);
@@ -59,7 +59,7 @@
         transport.start();
         // Let the remote side know our name.
         transport.oneway(name);
-        dispatchContext = getDispatcher().register(this, name + "-producer");
+        dispatchContext = getDispatcher().register(this, name + "-client");
         dispatchContext.requestDispatch();
     }