You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/12/04 09:57:21 UTC

svn commit: r887117 [1/2] - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/java/org/apache/activemq/apollo/ activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-broker/src/test/java/org/apache/activemq/broker/...

Author: chirino
Date: Fri Dec  4 08:56:50 2009
New Revision: 887117

URL: http://svn.apache.org/viewvc?rev=887117&view=rev
Log:
refacorting the advanced package a bit.


Added:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/QueueSupport.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdancedDispatchSPI.java
      - copied, changed from r886965, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdancedDispatchSystem.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPool.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatchContext.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java
      - copied, changed from r886965, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSystem.java
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AbstractDispatchObject.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AbstractPooledDispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdancedDispatchSystem.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ExecutionLoadBalancer.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/IDispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledPriorityDispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PriorityDispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/QueueSupport.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SerialDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/AbstractDispatchObject.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/QueueSupport.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSystem.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/Dispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/DispatcherXml.java
    activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/AbstractTestConnection.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java Fri Dec  4 08:56:50 2009
@@ -25,7 +25,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.Service;
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
 import org.apache.activemq.transport.DispatchableTransport;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
@@ -42,7 +42,7 @@
     protected int inputResumeThreshold = 512 * 1024;
     protected boolean useAsyncWriteThread = true;
 
-    private IDispatcher dispatcher;
+    private Dispatcher dispatcher;
     private final AtomicBoolean stopping = new AtomicBoolean();
     private ExecutorService blockingWriter;
     private ExceptionListener exceptionListener;
@@ -170,11 +170,11 @@
         this.priorityLevels = priorityLevels;
     }
 
-    public IDispatcher getDispatcher() {
+    public Dispatcher getDispatcher() {
         return dispatcher;
     }
 
-    public void setDispatcher(IDispatcher dispatcher) {
+    public void setDispatcher(Dispatcher dispatcher) {
         this.dispatcher = dispatcher;
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java Fri Dec  4 08:56:50 2009
@@ -26,8 +26,8 @@
 import org.apache.activemq.Service;
 import org.apache.activemq.apollo.Connection;
 import org.apache.activemq.dispatch.internal.advanced.DispatcherAware;
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher;
-import org.apache.activemq.dispatch.internal.advanced.PriorityDispatcher;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportAcceptListener;
 import org.apache.activemq.transport.TransportServer;
@@ -51,7 +51,7 @@
 
     private final LinkedHashMap<AsciiBuffer, VirtualHost> virtualHosts = new LinkedHashMap<AsciiBuffer, VirtualHost>();
     private VirtualHost defaultVirtualHost;
-    private IDispatcher dispatcher;
+    private Dispatcher dispatcher;
     private File dataDirectory;
 
     private final class BrokerAcceptListener implements TransportAcceptListener {
@@ -129,7 +129,7 @@
 		// apply some default configuration to this broker instance before it's started.
 		if( dispatcher == null ) {
 			int threads = Runtime.getRuntime().availableProcessors();
-			dispatcher = PriorityDispatcher.createPriorityDispatchPool("Broker: "+getDefaultVirtualHost().getHostName(), Broker.MAX_PRIORITY, threads);
+			dispatcher = DispatcherThread.createPriorityDispatchPool("Broker: "+getDefaultVirtualHost().getHostName(), Broker.MAX_PRIORITY, threads);
 		}
 		
 
@@ -376,10 +376,10 @@
     // /////////////////////////////////////////////////////////////////
     // Property Accessors
     // /////////////////////////////////////////////////////////////////
-    public IDispatcher getDispatcher() {
+    public Dispatcher getDispatcher() {
         return dispatcher;
     }
-    public void setDispatcher(IDispatcher dispatcher) {
+    public void setDispatcher(Dispatcher dispatcher) {
     	assertInConfigurationState();
         this.dispatcher = dispatcher;
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java Fri Dec  4 08:56:50 2009
@@ -41,7 +41,7 @@
 import org.apache.activemq.broker.store.Store.QueueRecord;
 import org.apache.activemq.broker.store.Store.Session;
 import org.apache.activemq.dispatch.internal.advanced.DispatcherAware;
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
 import org.apache.activemq.flow.AbstractLimitedFlowResource;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
@@ -71,7 +71,7 @@
     private final FlowController<OperationBase<?>> storeController;
     private final int FLUSH_QUEUE_SIZE = 10000 * 1024;
 
-    private IDispatcher dispatcher;
+    private Dispatcher dispatcher;
     private Thread flushThread;
     private AtomicBoolean running = new AtomicBoolean(false);
     private DatabaseListener listener;
@@ -1288,11 +1288,11 @@
         return store.allocateStoreTracking();
     }
 
-    public IDispatcher getDispatcher() {
+    public Dispatcher getDispatcher() {
         return dispatcher;
     }
 
-    public void setDispatcher(IDispatcher dispatcher) {
+    public void setDispatcher(Dispatcher dispatcher) {
         this.dispatcher = dispatcher;
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java Fri Dec  4 08:56:50 2009
@@ -24,7 +24,7 @@
 
 import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.broker.store.Store.QueueQueryResult;
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.PrioritySizeLimiter;
 import org.apache.activemq.flow.SizeLimiter;
@@ -52,7 +52,7 @@
     private static final boolean USE_PRIORITY_QUEUES = true;
 
     private BrokerDatabase database;
-    private IDispatcher dispatcher;
+    private Dispatcher dispatcher;
 
     private static HashMap<String, ProtocolHandler> protocolHandlers = new HashMap<String, ProtocolHandler>();
     private static final BrokerDatabase.MessageRecordMarshaller<MessageDelivery> MESSAGE_MARSHALLER = new BrokerDatabase.MessageRecordMarshaller<MessageDelivery>() {
@@ -226,7 +226,7 @@
         this.database = database;
     }
 
-    public void setDispatcher(IDispatcher dispatcher) {
+    public void setDispatcher(Dispatcher dispatcher) {
         this.dispatcher = dispatcher;
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Fri Dec  4 08:56:50 2009
@@ -30,8 +30,8 @@
 import org.apache.activemq.apollo.broker.Router;
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.broker.store.StoreFactory;
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher;
-import org.apache.activemq.dispatch.internal.advanced.PriorityDispatcher;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
 import org.apache.activemq.metric.MetricAggregator;
 import org.apache.activemq.metric.Period;
 import org.apache.activemq.transport.TransportFactory;
@@ -87,7 +87,7 @@
     protected Broker sendBroker;
     protected Broker rcvBroker;
     protected ArrayList<Broker> brokers = new ArrayList<Broker>();
-    protected IDispatcher dispatcher;
+    protected Dispatcher dispatcher;
     protected final AtomicLong msgIdGenerator = new AtomicLong();
     protected final AtomicBoolean stopping = new AtomicBoolean();
 
@@ -134,8 +134,8 @@
 
     protected abstract String getRemoteWireFormat();
 
-    protected IDispatcher createDispatcher() {
-        return PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", Broker.MAX_PRIORITY, asyncThreadPoolSize);
+    protected Dispatcher createDispatcher() {
+        return DispatcherThread.createPriorityDispatchPool("BrokerDispatcher", Broker.MAX_PRIORITY, asyncThreadPoolSize);
     }
 
     @Test

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java Fri Dec  4 08:56:50 2009
@@ -6,8 +6,8 @@
 import org.apache.activemq.apollo.Connection;
 import org.apache.activemq.apollo.broker.Destination;
 import org.apache.activemq.apollo.broker.MessageDelivery;
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher.Dispatchable;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher.DispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher.Dispatchable;
 import org.apache.activemq.flow.IFlowController;
 import org.apache.activemq.flow.IFlowSink;
 import org.apache.activemq.flow.ISinkController;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java Fri Dec  4 08:56:50 2009
@@ -27,8 +27,8 @@
 import org.apache.activemq.apollo.broker.MessageDelivery;
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.broker.store.StoreFactory;
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher;
-import org.apache.activemq.dispatch.internal.advanced.PriorityDispatcher;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
 import org.apache.activemq.queue.IQueue;
 
 /**
@@ -38,7 +38,7 @@
 public class SharedQueueTest extends TestCase {
 
 
-    IDispatcher dispatcher;
+    Dispatcher dispatcher;
     BrokerDatabase database;
     BrokerQueueStore queueStore;
     private static final boolean USE_KAHA_DB = true;
@@ -47,8 +47,8 @@
 
     protected ArrayList<IQueue<Long, MessageDelivery>> queues = new ArrayList<IQueue<Long, MessageDelivery>>();
 
-    protected IDispatcher createDispatcher() {
-        return PriorityDispatcher.createPriorityDispatchPool("TestDispatcher", Broker.MAX_PRIORITY, Runtime.getRuntime().availableProcessors());
+    protected Dispatcher createDispatcher() {
+        return DispatcherThread.createPriorityDispatchPool("TestDispatcher", Broker.MAX_PRIORITY, Runtime.getRuntime().availableProcessors());
     }
 
     protected int consumerStartDelay = 0;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java Fri Dec  4 08:56:50 2009
@@ -18,7 +18,7 @@
 
 import java.nio.channels.SelectableChannel;
 
-import org.apache.activemq.dispatch.internal.simple.SimpleDispatchSystem;
+import org.apache.activemq.dispatch.internal.simple.SimpleDispatchSPI;
 
 /**
  * 
@@ -26,38 +26,48 @@
  */
 public class DispatchSystem {
 
-    private static final SimpleDispatchSystem system = new SimpleDispatchSystem(Runtime.getRuntime().availableProcessors());
-    
-    static DispatchQueue getMainQueue() {
-        return system.getMainQueue();
-    }
-    
     public static enum DispatchQueuePriority {
         HIGH,
         DEFAULT,
         LOW;
     }
 
-    static public DispatchQueue getGlobalQueue(DispatchQueuePriority priority) {
-        return system.getGlobalQueue(priority);
+    static abstract public class DispatchSPI {
+        abstract public DispatchQueue getMainQueue();
+        abstract public DispatchQueue getGlobalQueue(DispatchQueuePriority priority);
+        abstract public DispatchQueue createQueue(String label);
+        abstract public void dispatchMain();
+        abstract public DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue);
+    }
+
+    public final static ThreadLocal<DispatchQueue> CURRENT_QUEUE = new ThreadLocal<DispatchQueue>();
+    static public DispatchQueue getCurrentQueue() {
+        return CURRENT_QUEUE.get();
+    }
+
+    private final static DispatchSPI spi = cretateDispatchSystemSPI();
+    private static DispatchSPI cretateDispatchSystemSPI() {
+        return new SimpleDispatchSPI(Runtime.getRuntime().availableProcessors());
     }
     
-    static DispatchQueue createQueue(String label) {
-        return system.createQueue(label);
+    static DispatchQueue getMainQueue() {
+        return spi.getMainQueue();
     }
     
-    static DispatchQueue getCurrentQueue() {
-        return system.getCurrentQueue();
+    static public DispatchQueue getGlobalQueue(DispatchQueuePriority priority) {
+        return spi.getGlobalQueue(priority);
+    }
+    
+    static DispatchQueue createQueue(String label) {
+        return spi.createQueue(label);
     }
     
     static void dispatchMain() {
-        system.dispatchMain();
+        spi.dispatchMain();
     }
 
     static DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue) {
-        return system.createSource(channel, interestOps, queue);
+        return spi.createSource(channel, interestOps, queue);
     }
-    
-    
 
 }

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java?rev=887117&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java Fri Dec  4 08:56:50 2009
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal;
+
+import org.apache.activemq.dispatch.DispatchObject;
+import org.apache.activemq.dispatch.DispatchQueue;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class AbstractDispatchObject implements DispatchObject {
+
+    protected Object context;
+    protected Runnable finalizer;
+    protected DispatchQueue targetQueue;
+
+    @SuppressWarnings("unchecked")
+    public <Context> Context getContext() {
+        return (Context) context;
+    }
+    
+    public <Context> void setContext(Context context) {
+        this.context = context;
+    }
+
+    public void setFinalizer(Runnable finalizer) {
+        this.finalizer = finalizer;
+    }
+
+    public void setTargetQueue(DispatchQueue targetQueue) {
+        this.targetQueue = targetQueue;
+    }
+
+
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/QueueSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/QueueSupport.java?rev=887117&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/QueueSupport.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/QueueSupport.java Fri Dec  4 08:56:50 2009
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class QueueSupport {
+
+    static public void dispatchApply(DispatchQueue queue, int itterations, final Runnable runnable) throws InterruptedException {
+        final CountDownLatch done = new CountDownLatch(itterations);
+        Runnable wrapper = new Runnable() {
+            public void run() {
+                try {
+                    runnable.run();
+                } finally {
+                    done.countDown();
+                }
+            }
+        };
+        for( int i=0; i < itterations; i++ ) { 
+            queue.dispatchAsync(wrapper);
+        }
+        done.await();
+    }
+    
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java?rev=887117&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java Fri Dec  4 08:56:50 2009
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.DispatchSystem;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class SerialDispatchQueue extends AbstractDispatchObject implements DispatchQueue, Runnable {
+
+    private final ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue<Runnable>();
+    final private String label;
+    final private AtomicInteger suspendCounter = new AtomicInteger();
+    final private AtomicLong size = new AtomicLong();
+    
+    static final ThreadLocal<DispatchQueue> CURRENT_QUEUE = new ThreadLocal<DispatchQueue>();
+
+    public SerialDispatchQueue(String label) {
+        this.label = label;
+    }
+
+    public String getLabel() {
+        return label;
+    }
+
+    public void resume() {
+        suspendCounter.decrementAndGet();
+    }
+
+    public void suspend() {
+        suspendCounter.incrementAndGet();
+    }
+
+    public void dispatchAfter(long delayMS, Runnable runnable) {
+        throw new RuntimeException("TODO: implement me.");
+    }
+
+    public void dispatchAsync(Runnable runnable) {
+        if( runnable == null ) {
+            throw new IllegalArgumentException();
+        }
+        long lastSize = size.incrementAndGet();
+        runnables.add(runnable);
+        if( targetQueue!=null && lastSize == 1 && suspendCounter.get()<=0 ) {
+            targetQueue.dispatchAsync(this);
+        }
+    }
+
+    public void run() {
+        DispatchQueue original = DispatchSystem.CURRENT_QUEUE.get();
+        DispatchSystem.CURRENT_QUEUE.set(this);
+        try {
+            Runnable runnable;
+            long lsize = size.get();
+            while( suspendCounter.get() <= 0 && lsize > 0 ) {
+                try {
+                    runnable = runnables.poll();
+                    if( runnable!=null ) {
+                        runnable.run();
+                        lsize = size.decrementAndGet();
+                    }
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                }
+            }
+        } finally {
+            DispatchSystem.CURRENT_QUEUE.set(original);
+        }
+    }
+
+    public void dispatchSync(Runnable runnable) throws InterruptedException {
+        dispatchApply(1, runnable);
+    }
+    
+    public void dispatchApply(int iterations, Runnable runnable) throws InterruptedException {
+        QueueSupport.dispatchApply(this, iterations, runnable);
+    }
+}

Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdancedDispatchSPI.java (from r886965, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdancedDispatchSystem.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdancedDispatchSPI.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdancedDispatchSPI.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdancedDispatchSystem.java&r1=886965&r2=887117&rev=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdancedDispatchSystem.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdancedDispatchSPI.java Fri Dec  4 08:56:50 2009
@@ -22,6 +22,8 @@
 import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.DispatchSource;
 import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
+import org.apache.activemq.dispatch.DispatchSystem.DispatchSPI;
+import org.apache.activemq.dispatch.internal.SerialDispatchQueue;
 
 import static org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority.*;
 
@@ -31,18 +33,16 @@
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class AdancedDispatchSystem {
-    
-    static final ThreadLocal<DispatchQueue> CURRENT_QUEUE = new ThreadLocal<DispatchQueue>();
-    
+public class AdancedDispatchSPI extends DispatchSPI {
+        
     final SerialDispatchQueue mainQueue = new SerialDispatchQueue("main");
     final GlobalDispatchQueue globalQueues[];
-    PooledPriorityDispatcher pooledDispatcher;
+    DispatcherPool pooledDispatcher;
 
     final AtomicLong globalQueuedRunnables = new AtomicLong();
     
-    public AdancedDispatchSystem(int size) throws Exception {
-        pooledDispatcher = new PooledPriorityDispatcher("default", size, 3);
+    public AdancedDispatchSPI(int size) throws Exception {
+        pooledDispatcher = new DispatcherPool("default", size, 3);
         pooledDispatcher.start();
 
         globalQueues = new GlobalDispatchQueue[3];
@@ -65,10 +65,6 @@
         return rc;
     }
     
-    public DispatchQueue getCurrentQueue() {
-        return CURRENT_QUEUE.get();
-    }
-    
     public void dispatchMain() {
         mainQueue.run();
     }

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatcher.java?rev=887117&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatcher.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatcher.java Fri Dec  4 08:56:50 2009
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal.advanced;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public interface Dispatcher extends Executor {
+
+    /**
+     * This interface is implemented by Dispatchable entities. A Dispatchable
+     * entity registers with an {@link Dispatcher} and is returned a
+     * {@link DispatchContext} which it can use to request the
+     * {@link Dispatcher} to invoke {@link Dispatchable#dispatch()}
+     * 
+     * {@link Dispatcher} guarantees that {@link #dispatch()} will never invoke
+     * dispatch concurrently unless the {@link Dispatchable} is registered with
+     * more than one {@link Dispatcher};
+     */
+    public interface Dispatchable {
+        public boolean dispatch();
+    }
+
+    /**
+     * Returned to callers registered with this dispathcer. Used by the caller
+     * to inform the dispatcher that it is ready for dispatch.
+     * 
+     * Note that DispatchContext is not safe for concurrent access by multiple
+     * threads.
+     */
+    public interface DispatchContext {
+        /**
+         * Once registered with a dispatcher, this can be called to request
+         * dispatch. The {@link Dispatchable} will remain in the dispatch queue
+         * until a subsequent call to {@link Dispatchable#dispatch()} returns
+         * false;
+         * 
+         * @throws RejectedExecutionException If the dispatcher has been shutdown.
+         */
+        public void requestDispatch() throws RejectedExecutionException;
+
+        /**
+         * This can be called to update the dispatch priority.
+         * 
+         * @param priority
+         */
+        public void updatePriority(int priority);
+
+        /**
+         * Gets the Dispatchable that this context represents.
+         * 
+         * @return The dispatchable
+         */
+        public Dispatchable getDispatchable();
+
+        /**
+         * Gets the name of the dispatch context
+         * 
+         * @return The dispatchable
+         */
+        public String getName();
+
+        /**
+         * This must be called to release any resource the dispatcher is holding
+         * on behalf of this context. Once called this {@link DispatchContext} should
+         * no longer be used. 
+         */
+        public void close(boolean sync);
+    }
+
+    public class RunnableAdapter implements Dispatchable, Runnable {
+        private Runnable runnable;
+
+        public RunnableAdapter() {
+            runnable = this;
+        }
+        public RunnableAdapter(Runnable runnable) {
+            this.runnable = runnable;
+        }
+
+        public boolean dispatch() {
+            runnable.run();
+            return true;
+        }
+
+        public void run() {
+        }
+    }
+
+    /**
+     * Registers a {@link Dispatchable} with this dispatcher, and returns a
+     * {@link DispatchContext} that the caller can use to request dispatch.
+     * 
+     * @param dispatchable
+     *            The {@link Dispatchable}
+     * @param name
+     *            An identifier for the dispatcher.
+     * @return A {@link DispatchContext} that can be used to request dispatch
+     */
+    public DispatchContext register(Dispatchable dispatchable, String name);
+
+    /**
+     * Gets the number of dispatch priorities. Dispatch priorities are 0 based, 
+     * so if the number of dispatch priorities is 10, the maxium is 9.
+     * @return the number of dispatch priorities.
+     */
+    public int getDispatchPriorities();
+    
+    /**
+     * Creates an executor that will execute its tasks at the specified
+     * priority.
+     * 
+     * @param priority
+     *            The priority
+     * @return A prioritized executor.
+     */
+    public Executor createPriorityExecutor(int priority);
+
+    /**
+     * Starts the dispatcher.
+     */
+    public void start() throws Exception;
+
+    /**
+     * Shuts down the dispatcher, this may result in previous dispatch requests
+     * going unserved.
+     */
+    public void shutdown() throws InterruptedException;
+
+    /**
+     * Schedules the given {@link Runnable} to be run at the specified time in
+     * the future on this {@link Dispatcher}.
+     * 
+     * @param runnable
+     *            The Runnable to execute
+     * @param delay
+     *            The delay
+     * @param timeUnit
+     *            The TimeUnit used to interpret delay.
+     */
+    public void schedule(final Runnable runnable, long delay, TimeUnit timeUnit);
+
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java Fri Dec  4 08:56:50 2009
@@ -8,6 +8,6 @@
  */
 public interface DispatcherAware {
 
-	public void setDispatcher(IDispatcher dispatcher);
+	public void setDispatcher(Dispatcher dispatcher);
 	
 }

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPool.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPool.java?rev=887117&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPool.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPool.java Fri Dec  4 08:56:50 2009
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal.advanced;
+
+import java.util.ArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class DispatcherPool implements Dispatcher {
+
+    private final String name;
+
+    private final ThreadLocal<DispatcherThread> dispatcher = new ThreadLocal<DispatcherThread>();
+    private final ThreadLocal<PooledDispatchContext> dispatcherContext = new ThreadLocal<PooledDispatchContext>();
+    private final ArrayList<DispatcherThread> dispatchers = new ArrayList<DispatcherThread>();
+
+    final AtomicBoolean started = new AtomicBoolean();
+    final AtomicBoolean shutdown = new AtomicBoolean();
+
+    private int roundRobinCounter = 0;
+    private int size;
+    private final int numPriorities;
+
+    protected LoadBalancer loadBalancer;
+
+    protected DispatcherPool(String name, int size, int numPriorities) {
+        this.name = name;
+        this.size = size;
+        this.numPriorities = numPriorities;
+        loadBalancer = new SimpleLoadBalancer();
+    }
+
+    /**
+     * Subclasses should implement this to return a new dispatcher.
+     * 
+     * @param name
+     *            The name to assign the dispatcher.
+     * @param pool
+     *            The pool.
+     * @return The new dispathcer.
+     */
+    protected DispatcherThread createDispatcher(String name, DispatcherPool pool) throws Exception {
+        return new DispatcherThread(name, numPriorities, this);
+    }
+
+    /**
+     * @see org.apache.activemq.dispatch.internal.advanced.Dispatcher#start()
+     */
+    public synchronized final void start() throws Exception {
+        loadBalancer.start();
+        if (started.compareAndSet(false, true)) {
+            // Create all the workers.
+            try {
+                for (int i = 0; i < size; i++) {
+                    DispatcherThread dispatacher = createDispatcher(name + "-" + (i + 1), this);
+                    dispatchers.add(dispatacher);
+                    dispatacher.start();
+                }
+            } catch (Exception e) {
+                shutdown();
+            }
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
+     */
+    public synchronized final void shutdown() throws InterruptedException {
+        shutdown.set(true);
+        boolean interrupted = false;
+        while (!dispatchers.isEmpty()) {
+            try {
+                dispatchers.get(dispatchers.size() - 1).shutdown();
+            } catch (InterruptedException ie) {
+                interrupted = true;
+                continue;
+            }
+        }
+        // Re-interrupt:
+        if (interrupted) {
+            Thread.currentThread().interrupt();
+        }
+
+        loadBalancer.stop();
+    }
+
+    public void setCurrentDispatchContext(PooledDispatchContext context) {
+        dispatcherContext.set(context);
+    }
+
+    public PooledDispatchContext getCurrentDispatchContext() {
+        return dispatcherContext.get();
+    }
+
+    /**
+     * Returns the currently executing dispatcher, or null if the current thread
+     * is not a dispatcher:
+     * 
+     * @return The currently executing dispatcher
+     */
+    public Dispatcher getCurrentDispatcher() {
+        return dispatcher.get();
+    }
+
+    /**
+     * A Dispatcher must call this to indicate that is has started it's dispatch
+     * loop.
+     */
+    public void onDispatcherStarted(DispatcherThread d) {
+        dispatcher.set(d);
+        loadBalancer.onDispatcherStarted(d);
+    }
+
+    public LoadBalancer getLoadBalancer() {
+        return loadBalancer;
+    }
+
+    /**
+     * A Dispatcher must call this when exiting it's dispatch loop
+     */
+    public void onDispatcherStopped(Dispatcher d) {
+        synchronized (dispatchers) {
+            if (dispatchers.remove(d)) {
+                size--;
+            }
+        }
+        loadBalancer.onDispatcherStopped(d);
+    }
+
+    protected DispatcherThread chooseDispatcher() {
+        DispatcherThread d = dispatcher.get();
+        if (d == null) {
+            synchronized (dispatchers) {
+                if(dispatchers.isEmpty())
+                {
+                    throw new RejectedExecutionException();
+                }
+                if (++roundRobinCounter >= size) {
+                    roundRobinCounter = 0;
+                }
+                return dispatchers.get(roundRobinCounter);
+            }
+        } else {
+            return d;
+        }
+    }
+
+    public DispatchContext register(Dispatchable dispatchable, String name) {
+        return chooseDispatcher().register(dispatchable, name);
+    }
+
+    public String toString() {
+        return name;
+    }
+
+	public String getName() {
+		return name;
+	}
+
+	public int getSize() {
+		return size;
+	}
+	
+    public final Executor createPriorityExecutor(final int priority) {
+        return new Executor() {
+            public void execute(final Runnable runnable) {
+                chooseDispatcher().dispatch(new RunnableAdapter(runnable), priority);
+            }
+
+        };
+    }
+
+    public int getDispatchPriorities() {
+        // TODO Auto-generated method stub
+        return numPriorities;
+    }
+
+    public void execute(Runnable command) {
+        chooseDispatcher().dispatch(new RunnableAdapter(command), 0);
+    }
+    
+    public void execute(Runnable command, int priority) {
+        chooseDispatcher().dispatch(new RunnableAdapter(command), priority);
+    }
+
+    public void schedule(final Runnable runnable, long delay, TimeUnit timeUnit) {
+        chooseDispatcher().schedule(runnable, delay, timeUnit);
+    }
+
+    public void schedule(final Runnable runnable, int priority, long delay, TimeUnit timeUnit) {
+        chooseDispatcher().schedule(runnable, priority, delay, timeUnit);
+    }
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java?rev=887117&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java Fri Dec  4 08:56:50 2009
@@ -0,0 +1,711 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal.advanced;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.dispatch.DispatchSystem;
+import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
+import org.apache.activemq.dispatch.internal.advanced.LoadBalancer.ExecutionTracker;
+import org.apache.activemq.util.Mapper;
+import org.apache.activemq.util.PriorityLinkedList;
+import org.apache.activemq.util.TimerHeap;
+import org.apache.activemq.util.list.LinkedNode;
+import org.apache.activemq.util.list.LinkedNodeList;
+
+public class DispatcherThread implements Runnable, Dispatcher {
+
+    private final ThreadDispatchQueue dispatchQueues[];
+    
+    private static final boolean DEBUG = false;
+    private Thread thread;
+    protected boolean running = false;
+    private boolean threaded = false;
+    protected final int MAX_USER_PRIORITY;
+    protected final HashSet<PriorityDispatchContext> contexts = new HashSet<PriorityDispatchContext>();
+
+    // Set if this dispatcher is part of a dispatch pool:
+    protected final DispatcherPool pooledDispatcher;
+
+    // The local dispatch queue:
+    protected final PriorityLinkedList<PriorityDispatchContext> priorityQueue;
+
+    // Dispatch queue for requests from other threads:
+    private final LinkedNodeList<ForeignEvent>[] foreignQueue;
+    private static final int[] TOGGLE = new int[] { 1, 0 };
+    private int foreignToggle = 0;
+
+    // Timed Execution List
+    protected final TimerHeap<Runnable> timerHeap = new TimerHeap<Runnable>() {
+        @Override
+        protected final void execute(Runnable ready) {
+            ready.run();
+        }
+    };
+
+    protected final String name;
+    private final AtomicBoolean foreignAvailable = new AtomicBoolean(false);
+    private final Semaphore foreignPermits = new Semaphore(0);
+
+    private final Mapper<Integer, PriorityDispatchContext> PRIORITY_MAPPER = new Mapper<Integer, PriorityDispatchContext>() {
+        public Integer map(PriorityDispatchContext element) {
+            return element.listPrio;
+        }
+    };
+
+    protected DispatcherThread(String name, int priorities, DispatcherPool pooledDispactcher) {
+        this.name = name;
+        
+        this.dispatchQueues = new ThreadDispatchQueue[3];
+        for (int i = 0; i < 3; i++) {
+            dispatchQueues[i] = new ThreadDispatchQueue(this, DispatchQueuePriority.values()[i]);
+        }
+
+        MAX_USER_PRIORITY = priorities - 1;
+        priorityQueue = new PriorityLinkedList<PriorityDispatchContext>(MAX_USER_PRIORITY + 1, PRIORITY_MAPPER);
+        foreignQueue = createForeignEventQueue();
+        for (int i = 0; i < 2; i++) {
+            foreignQueue[i] = new LinkedNodeList<ForeignEvent>();
+        }
+        this.pooledDispatcher = pooledDispactcher;
+    }
+
+    public static final Dispatcher createPriorityDispatcher(String name, int numPriorities) {
+        return new DispatcherThread(name, numPriorities, null);
+    }
+
+    public static final Dispatcher createPriorityDispatchPool(String name, final int numPriorities, int size) {
+        return new DispatcherPool(name, size, numPriorities);
+    }
+
+    @SuppressWarnings("unchecked")
+    private LinkedNodeList<ForeignEvent>[] createForeignEventQueue() {
+        return new LinkedNodeList[2];
+    }
+
+    protected abstract class ForeignEvent extends LinkedNode<ForeignEvent> {
+        public abstract void execute();
+
+        final void addToList() {
+            synchronized (foreignQueue) {
+                if (!this.isLinked()) {
+                    foreignQueue[foreignToggle].addLast(this);
+                    if (!foreignAvailable.getAndSet(true)) {
+                        wakeup();
+                    }
+                }
+            }
+        }
+    }
+
+    public boolean isThreaded() {
+        return threaded;
+    }
+
+    public void setThreaded(boolean threaded) {
+        this.threaded = threaded;
+    }
+
+    public int getDispatchPriorities() {
+        return MAX_USER_PRIORITY;
+    }
+
+    private class UpdateEvent extends ForeignEvent {
+        private final PriorityDispatchContext pdc;
+
+        UpdateEvent(PriorityDispatchContext pdc) {
+            this.pdc = pdc;
+        }
+
+        // Can only be called by the owner of this dispatch context:
+        public void execute() {
+            pdc.processForeignUpdates();
+        }
+    }
+
+    public DispatchContext register(Dispatchable dispatchable, String name) {
+        return new PriorityDispatchContext(dispatchable, true, name);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.dispatch.IDispatcher#start()
+     */
+    public synchronized final void start() {
+        if (thread == null) {
+            running = true;
+            thread = new Thread(this, name);
+            thread.start();
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
+     */
+    public void shutdown() throws InterruptedException {
+        Thread joinThread = null;
+        synchronized (this) {
+            if (thread != null) {
+                dispatchInternal(new RunnableAdapter() {
+                    public void run() {
+                        running = false;
+                    }
+                }, MAX_USER_PRIORITY + 1);
+                joinThread = thread;
+                thread = null;
+            }
+        }
+        if (joinThread != null) {
+            // thread.interrupt();
+            joinThread.join();
+        }
+    }
+
+    protected void cleanup() {
+        ArrayList<PriorityDispatchContext> toClose = null;
+        synchronized (this) {
+            running = false;
+            toClose = new ArrayList<PriorityDispatchContext>(contexts.size());
+            toClose.addAll(contexts);
+        }
+
+        for (PriorityDispatchContext context : toClose) {
+            context.close(false);
+        }
+    }
+
+    public void run() {
+
+        if (pooledDispatcher != null) {
+            // Inform the dispatcher that we have started:
+            pooledDispatcher.onDispatcherStarted((DispatcherThread) this);
+        }
+
+        PriorityDispatchContext pdc;
+        try {
+            final int MAX_DISPATCH_PER_LOOP = 20;
+            int processed = 0;
+
+            while (running) {
+                pdc = priorityQueue.poll();
+                // If no local work available wait for foreign work:
+                if (pdc == null) {
+                    waitForEvents();
+                } else {
+                    if( pdc.priority < dispatchQueues.length ) {
+                        DispatchSystem.CURRENT_QUEUE.set(dispatchQueues[pdc.priority]);
+                    }
+                    
+                    if (pdc.tracker != null) {
+                        pooledDispatcher.setCurrentDispatchContext(pdc);
+                    }
+
+                    while (!pdc.dispatch()) {
+                        processed++;
+                        if (processed > MAX_DISPATCH_PER_LOOP || pdc.listPrio < priorityQueue.getHighestPriority()) {
+                            // Give other dispatchables a shot:
+                            // May have gotten relinked by the caller:
+                            if (!pdc.isLinked()) {
+                                priorityQueue.add(pdc, pdc.listPrio);
+                            }
+                            break;
+                        }
+                    }
+
+                    if (pdc.tracker != null) {
+                        pooledDispatcher.setCurrentDispatchContext(null);
+                    }
+
+                    if (processed < MAX_DISPATCH_PER_LOOP) {
+                        continue;
+                    }
+                }
+
+                processed = 0;
+                // Execute delayed events:
+                timerHeap.executeReadyTimers();
+
+                // Allow subclasses to do additional work:
+                dispatchHook();
+
+                // Check for foreign dispatch requests:
+                if (foreignAvailable.get()) {
+                    LinkedNodeList<ForeignEvent> foreign;
+                    synchronized (foreignQueue) {
+                        // Swap foreign queues and drain permits;
+                        foreign = foreignQueue[foreignToggle];
+                        foreignToggle = TOGGLE[foreignToggle];
+                        foreignAvailable.set(false);
+                        foreignPermits.drainPermits();
+                    }
+                    while (true) {
+                        ForeignEvent fe = foreign.getHead();
+                        if (fe == null) {
+                            break;
+                        }
+
+                        fe.unlink();
+                        fe.execute();
+                    }
+
+                }
+            }
+        } catch (InterruptedException e) {
+            return;
+        } catch (Throwable thrown) {
+            thrown.printStackTrace();
+        } finally {
+            if (pooledDispatcher != null) {
+                pooledDispatcher.onDispatcherStopped((DispatcherThread) this);
+            }
+            cleanup();
+        }
+    }
+
+    /**
+     * Subclasses may override this to do do additional dispatch work:
+     */
+    protected void dispatchHook() throws Exception {
+
+    }
+
+    /**
+     * Subclasses may override this to implement another mechanism for wakeup.
+     * 
+     * @throws Exception
+     */
+    protected void waitForEvents() throws Exception {
+        long next = timerHeap.timeToNext(TimeUnit.NANOSECONDS);
+        if (next == -1) {
+            foreignPermits.acquire();
+        } else if (next > 0) {
+            foreignPermits.tryAcquire(next, TimeUnit.NANOSECONDS);
+        }
+    }
+
+    /**
+     * Subclasses may override this to provide an alternative wakeup mechanism.
+     */
+    protected void wakeup() {
+        foreignPermits.release();
+    }
+
+    protected final void onForeignUpdate(PriorityDispatchContext context) {
+        synchronized (foreignQueue) {
+
+            ForeignEvent fe = context.updateEvent[foreignToggle];
+            if (!fe.isLinked()) {
+                foreignQueue[foreignToggle].addLast(fe);
+                if (!foreignAvailable.getAndSet(true)) {
+                    wakeup();
+                }
+            }
+        }
+    }
+
+    protected final boolean removeDispatchContext(PriorityDispatchContext context) {
+        synchronized (foreignQueue) {
+
+            if (context.updateEvent[0].isLinked()) {
+                context.updateEvent[0].unlink();
+            }
+            if (context.updateEvent[1].isLinked()) {
+                context.updateEvent[1].unlink();
+            }
+        }
+
+        if (context.isLinked()) {
+            context.unlink();
+            return true;
+        }
+
+        synchronized (this) {
+            contexts.remove(context);
+        }
+
+        return false;
+    }
+
+    protected final boolean takeOwnership(PriorityDispatchContext context) {
+        synchronized (this) {
+            if (running) {
+                contexts.add(context);
+            } else {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    //Special dispatch method that allow high priority dispatch:
+    private final void dispatchInternal(Dispatchable dispatchable, int priority) {
+        PriorityDispatchContext context = new PriorityDispatchContext(dispatchable, false, name);
+        context.priority = priority;
+        context.requestDispatch();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.activemq.dispatch.IDispatcher#dispatch(org.apache.activemq
+     * .dispatch.Dispatcher.Dispatchable)
+     */
+    public final void dispatch(Dispatchable dispatchable, int priority) {
+        PriorityDispatchContext context = new PriorityDispatchContext(dispatchable, false, name);
+        context.updatePriority(priority);
+        context.requestDispatch();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.dispatch.IDispatcher#createPriorityExecutor(int)
+     */
+    public Executor createPriorityExecutor(final int priority) {
+
+        return new Executor() {
+
+            public void execute(final Runnable runnable) {
+                dispatch(new RunnableAdapter(runnable), priority);
+            }
+        };
+    }
+
+    public void execute(final Runnable runnable) {
+        dispatch(new RunnableAdapter(runnable), 0);
+    }
+    
+    public void execute(final Runnable runnable, int prio) {
+        dispatch(new RunnableAdapter(runnable), prio);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable,
+     * long, java.util.concurrent.TimeUnit)
+     */
+    public void schedule(final Runnable runnable, final long delay, final TimeUnit timeUnit) {
+        schedule(runnable, 0, delay, timeUnit);
+    }
+    
+    public void schedule(final Runnable runnable, final int prio, final long delay, final TimeUnit timeUnit) {
+        final Runnable wrapper = new Runnable() {
+            public void run() {
+                execute(runnable, prio);
+            }
+        };
+        if (getCurrentDispatcher() == this) {
+            timerHeap.addRelative(wrapper, delay, timeUnit);
+        } else {
+            new ForeignEvent() {
+                public void execute() {
+                    timerHeap.addRelative(wrapper, delay, timeUnit);
+                }
+            }.addToList();
+        }
+    }
+
+    public String toString() {
+        return name;
+    }
+
+    private final DispatcherThread getCurrentDispatcher() {
+        if (pooledDispatcher != null) {
+            return (DispatcherThread) pooledDispatcher.getCurrentDispatcher();
+        } else if (Thread.currentThread() == thread) {
+            return (DispatcherThread) this;
+        } else {
+            return null;
+        }
+
+    }
+
+    private final PooledDispatchContext getCurrentDispatchContext() {
+        return pooledDispatcher.getCurrentDispatchContext();
+    }
+
+    /**
+     * 
+     */
+    protected class PriorityDispatchContext extends LinkedNode<PriorityDispatchContext> implements PooledDispatchContext {
+        // The dispatchable target:
+        private final Dispatchable dispatchable;
+        // The name of this context:
+        final String name;
+        // list prio can only be updated in the thread of of the owning
+        // dispatcher
+        protected int listPrio;
+
+        // The update events are used to update fields in the dispatch context
+        // from foreign threads:
+        final UpdateEvent updateEvent[];
+
+        private final ExecutionTracker tracker;
+        protected DispatcherThread currentOwner;
+        private DispatcherThread updateDispatcher = null;
+
+        private int priority;
+        private boolean dispatchRequested = false;
+        private boolean closed = false;
+        final CountDownLatch closeLatch = new CountDownLatch(1);
+
+        protected PriorityDispatchContext(Dispatchable dispatchable, boolean persistent, String name) {
+            this.dispatchable = dispatchable;
+            this.name = name;
+            this.currentOwner = (DispatcherThread) DispatcherThread.this;
+            if (persistent && pooledDispatcher != null) {
+                this.tracker = pooledDispatcher.getLoadBalancer().createExecutionTracker((PooledDispatchContext) this);
+            } else {
+                this.tracker = null;
+            }
+            updateEvent = createUpdateEvent();
+            updateEvent[0] = new UpdateEvent(this);
+            updateEvent[1] = new UpdateEvent(this);
+            if (persistent) {
+                currentOwner.takeOwnership(this);
+            }
+        }
+
+        private final DispatcherThread.UpdateEvent[] createUpdateEvent() {
+            return new DispatcherThread.UpdateEvent[2];
+        }
+
+        /**
+         * Gets the execution tracker for the context.
+         * 
+         * @return the execution tracker for the context:
+         */
+        public ExecutionTracker getExecutionTracker() {
+            return tracker;
+        }
+
+        /**
+         * This can only be called by the owning dispatch thread:
+         * 
+         * @return False if the dispatchable has more work to do.
+         */
+        public final boolean dispatch() {
+            return dispatchable.dispatch();
+        }
+
+        public final void assignToNewDispatcher(Dispatcher newDispatcher) {
+            synchronized (this) {
+
+                // If we're already set to this dispatcher
+                if (newDispatcher == currentOwner) {
+                    if (updateDispatcher == null || updateDispatcher == newDispatcher) {
+                        return;
+                    }
+                }
+
+                updateDispatcher = (DispatcherThread) newDispatcher;
+                if (DEBUG)
+                    System.out.println(getName() + " updating to " + updateDispatcher);
+
+                currentOwner.onForeignUpdate(this);
+            }
+
+        }
+
+        public void requestDispatch() {
+
+            DispatcherThread callingDispatcher = getCurrentDispatcher();
+            if (tracker != null)
+                tracker.onDispatchRequest(callingDispatcher, getCurrentDispatchContext());
+
+            // Otherwise this is coming off another thread, so we need to
+            // synchronize
+            // to protect against ownership changes:
+            synchronized (this) {
+                // If the owner of this context is the calling thread, then
+                // delegate to the dispatcher.
+                if (currentOwner == callingDispatcher) {
+
+                    if (!currentOwner.running) {
+                        // TODO In the event that the current dispatcher
+                        // failed due to a runtime exception, we could
+                        // try to switch to a new dispatcher.
+                        throw new RejectedExecutionException();
+                    }
+                    if (!isLinked()) {
+                        currentOwner.priorityQueue.add(this, listPrio);
+                    }
+                    return;
+                }
+
+                dispatchRequested = true;
+                currentOwner.onForeignUpdate(this);
+            }
+        }
+
+        public void updatePriority(int priority) {
+
+            if (closed) {
+                return;
+            }
+
+            priority = Math.min(priority, MAX_USER_PRIORITY);
+
+            if (this.priority == priority) {
+                return;
+            }
+            DispatcherThread callingDispatcher = getCurrentDispatcher();
+
+            // Otherwise this is coming off another thread, so we need to
+            // synchronize to protect against ownership changes:
+            synchronized (this) {
+                if (closed) {
+                    return;
+                }
+                this.priority = priority;
+
+                // If this is called by the owning dispatcher, then we go ahead
+                // and update:
+                if (currentOwner == callingDispatcher) {
+
+                    if (priority != listPrio) {
+
+                        listPrio = priority;
+                        // If there is a priority change relink the context
+                        // at the new priority:
+                        if (isLinked()) {
+                            unlink();
+                            currentOwner.priorityQueue.add(this, listPrio);
+                        }
+                    }
+                    return;
+                }
+
+                currentOwner.onForeignUpdate(this);
+            }
+
+        }
+
+        public void processForeignUpdates() {
+            synchronized (this) {
+
+                if (closed) {
+                    close(false);
+                    return;
+                }
+
+                if (updateDispatcher != null && updateDispatcher.takeOwnership(this)) {
+                    if (DEBUG) {
+                        System.out.println("Assigning " + getName() + " to " + updateDispatcher);
+                    }
+
+                    if (currentOwner.removeDispatchContext(this)) {
+                        dispatchRequested = true;
+                    }
+
+                    updateDispatcher.onForeignUpdate(this);
+                    switchedDispatcher(currentOwner, updateDispatcher);
+                    currentOwner = updateDispatcher;
+                    updateDispatcher = null;
+
+                } else {
+                    updatePriority(priority);
+
+                    if (dispatchRequested) {
+                        dispatchRequested = false;
+                        requestDispatch();
+                    }
+                }
+            }
+        }
+
+        /**
+         * May be overriden by subclass to additional work on dispatcher switch
+         * 
+         * @param oldDispatcher The old dispatcher
+         * @param newDispatcher The new Dispatcher
+         */
+        protected void switchedDispatcher(DispatcherThread oldDispatcher, DispatcherThread newDispatcher) {
+
+        }
+
+        public boolean isClosed() {
+            return closed;
+        }
+
+        public void close(boolean sync) {
+            DispatcherThread callingDispatcher = getCurrentDispatcher();
+            // System.out.println(this + "Closing");
+            synchronized (this) {
+                closed = true;
+                // If the owner of this context is the calling thread, then
+                // delegate to the dispatcher.
+                if (currentOwner == callingDispatcher) {
+                    removeDispatchContext(this);
+                    closeLatch.countDown();
+                    return;
+                }
+            }
+
+            currentOwner.onForeignUpdate(this);
+            if (sync) {
+                boolean interrupted = false;
+                while (true) {
+                    try {
+                        closeLatch.await();
+                        break;
+                    } catch (InterruptedException e) {
+                        interrupted = true;
+                    }
+                }
+
+                if (interrupted) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+
+        public final String toString() {
+            return getName();
+        }
+
+        public Dispatchable getDispatchable() {
+            return dispatchable;
+        }
+
+        public DispatcherThread getDispatcher() {
+            return currentOwner;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+    }
+
+    public String getName() {
+        return name;
+    }
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java Fri Dec  4 08:56:50 2009
@@ -16,11 +16,11 @@
  */
 package org.apache.activemq.dispatch.internal.advanced;
 
-import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
+import org.apache.activemq.dispatch.internal.QueueSupport;
 
 /**
  * 
@@ -28,14 +28,16 @@
  */
 public class GlobalDispatchQueue implements DispatchQueue {
 
-    private final AdancedDispatchSystem system;
-    private Executor executor;
-    final String label;
+    private final String label;
+    private final AdancedDispatchSPI system;
+    private final DispatcherPool dispatcher;
+    private final DispatchQueuePriority priority;
     
-    public GlobalDispatchQueue(AdancedDispatchSystem system, DispatchQueuePriority priority) {
+    public GlobalDispatchQueue(AdancedDispatchSPI system, DispatchQueuePriority priority) {
         this.system = system;
+        this.priority = priority;
         this.label=priority.toString();
-        executor = this.system.pooledDispatcher.createPriorityExecutor(priority.ordinal());
+        this.dispatcher = this.system.pooledDispatcher;
     }
 
     public String getLabel() {
@@ -43,11 +45,11 @@
     }
 
     public void dispatchAsync(Runnable runnable) {
-        executor.execute(runnable);
+        dispatcher.execute(runnable, priority.ordinal());
     }
 
     public void dispatchAfter(long delayMS, Runnable runnable) {
-        this.system.pooledDispatcher.schedule(runnable, delayMS, TimeUnit.MILLISECONDS);
+        dispatcher.schedule(runnable, priority.ordinal(), delayMS, TimeUnit.MILLISECONDS);
     }
 
     public void dispatchSync(final Runnable runnable) throws InterruptedException {

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java?rev=887117&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java Fri Dec  4 08:56:50 2009
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal.advanced;
+
+
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher.DispatchContext;
+
+public interface LoadBalancer {
+
+    public interface ExecutionTracker {
+        
+        /**
+         * Should be called when a {@link DispatchContext#requestDispatch()} is called.
+         * This assists the load balancer in determining relationships between {@link DispatchContext}s
+         * @param caller The calling dispatcher
+         * @param context The context from which the dispatch is requested.
+         */
+        public void onDispatchRequest(Dispatcher caller, PooledDispatchContext context);
+
+        /**
+         * Must be called by the dispatcher when a {@link DispatchContext} is closed.
+         */
+        public void close();
+    }
+    
+    /**
+     * Must be called by a dispatch thread when it starts
+     * @param dispatcher The dispatcher
+     */
+    public void onDispatcherStarted(Dispatcher dispatcher);
+
+    /**
+     * Must be called by a dispatch thread when it stops
+     * @param dispatcher The dispatcher
+     */
+    public void onDispatcherStopped(Dispatcher dispatcher);
+
+    /**
+     * Gets an {@link ExecutionTracker} for the dispatch context. 
+     * @param context
+     * @return
+     */
+    public ExecutionTracker createExecutionTracker(PooledDispatchContext context);
+
+    /**
+     * Starts execution tracking
+     */
+    public void start();
+
+    /**
+     * Stops execution tracking
+     */
+    public void stop();
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatchContext.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatchContext.java?rev=887117&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatchContext.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatchContext.java Fri Dec  4 08:56:50 2009
@@ -0,0 +1,31 @@
+package org.apache.activemq.dispatch.internal.advanced;
+
+import org.apache.activemq.dispatch.internal.advanced.LoadBalancer.ExecutionTracker;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher.DispatchContext;
+
+/**
+ * A {@link PooledDispatchContext}s can be moved between different
+ * dispatchers.
+ */
+public interface PooledDispatchContext extends DispatchContext {
+    /**
+     * Called to transfer a {@link PooledDispatchContext} to a new
+     * Dispatcher.
+     */
+    public void assignToNewDispatcher(Dispatcher newDispatcher);
+
+    /**
+     * Gets the dispatcher to which this PooledDispatchContext currently
+     * belongs
+     * 
+     * @return
+     */
+    public Dispatcher getDispatcher();
+
+    /**
+     * Gets the execution tracker for the context.
+     * 
+     * @return the execution tracker for the context:
+     */
+    public ExecutionTracker getExecutionTracker();
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java Fri Dec  4 08:56:50 2009
@@ -22,16 +22,15 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.activemq.dispatch.internal.advanced.PooledDispatcher.PooledDispatchContext;
 
-public class SimpleLoadBalancer implements ExecutionLoadBalancer {
+public class SimpleLoadBalancer implements LoadBalancer {
 
     private final boolean DEBUG = false;
 
     //TODO: Added plumbing for periodic rebalancing which we should
     //consider implementing
     private static final boolean ENABLE_UPDATES = false;
-    private final ArrayList<IDispatcher> dispatchers = new ArrayList<IDispatcher>();
+    private final ArrayList<Dispatcher> dispatchers = new ArrayList<Dispatcher>();
 
     private AtomicBoolean running = new AtomicBoolean(false);
     private boolean needsUpdate = false;
@@ -86,7 +85,7 @@
         running.compareAndSet(true, false);
     }
 
-    public synchronized final void onDispatcherStarted(IDispatcher dispatcher) {
+    public synchronized final void onDispatcherStarted(Dispatcher dispatcher) {
         dispatchers.add(dispatcher);
         scheduleNext();
     }
@@ -94,7 +93,7 @@
     /**
      * A Dispatcher must call this when exiting it's dispatch loop
      */
-    public void onDispatcherStopped(IDispatcher dispatcher) {
+    public void onDispatcherStopped(Dispatcher dispatcher) {
         dispatchers.remove(dispatcher);
     }
 
@@ -123,7 +122,7 @@
         private final AtomicInteger work = new AtomicInteger(0);
 
         private PooledDispatchContext singleSource;
-        private IDispatcher currentOwner;
+        private Dispatcher currentOwner;
 
         SimpleExecutionTracker(PooledDispatchContext context) {
             this.context = context;
@@ -144,7 +143,7 @@
          * @return True if this method resulted in the dispatch request being
          *         assigned to another dispatcher.
          */
-        public void onDispatchRequest(IDispatcher callingDispatcher, PooledDispatchContext callingContext) {
+        public void onDispatchRequest(Dispatcher callingDispatcher, PooledDispatchContext callingContext) {
 
             if (callingContext != null) {
                 // Make sure we are being called by another node: