You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/12/03 22:33:01 UTC

svn commit: r886928 - in /activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src: main/java/org/apache/activemq/dispatch/ main/java/org/apache/activemq/dispatch/internal/ main/java/org/apache/activemq/dispatch/internal/advanced/ main/java/org/a...

Author: chirino
Date: Thu Dec  3 21:32:56 2009
New Revision: 886928

URL: http://svn.apache.org/viewvc?rev=886928&view=rev
Log:
adding in some gcd inspired dispatch interfaces.. 

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AbstractPooledDispatcher.java
      - copied, changed from r886891, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java
      - copied, changed from r886891, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ExecutionLoadBalancer.java
      - copied, changed from r886891, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/IDispatcher.java
      - copied, changed from r886891, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/IDispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatcher.java
      - copied, changed from r886891, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledPriorityDispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PriorityDispatcher.java
      - copied, changed from r886891, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java
      - copied, changed from r886891, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/AbstractDispatchObject.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/Dispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/QueueSupport.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSystem.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/IDispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java?rev=886928&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java Thu Dec  3 21:32:56 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface DispatchObject {
+    
+    public <Context> Context getContext();
+    public <Context> void setContext(Context context);
+    public void suspend();
+    public void resume();
+
+    public void setFinalizer(Runnable finalizer);
+    public void setTargetQueue(DispatchQueue queue);
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchQueue.java?rev=886928&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchQueue.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchQueue.java Thu Dec  3 21:32:56 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface DispatchQueue extends DispatchObject {
+
+    public void dispatchAsync(Runnable runnable);
+    public void dispatchSync(Runnable runnable) throws InterruptedException;
+    
+    public void dispatchAfter(long delayMS, Runnable runnable);
+    public void dispatchApply(int iterations, Runnable runnable) throws InterruptedException;
+    
+    String getLabel();
+
+                      
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java?rev=886928&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java Thu Dec  3 21:32:56 2009
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface DispatchSource extends DispatchObject {
+
+    public void cancel();
+    public boolean isCanceled();
+    
+    public long getData();
+    
+    public long getMask();
+    public void setMask(long mask);
+    
+    public void setCancelHandler(Runnable cancelHandler);
+    public void setEventHandler(Runnable eventHandler);
+    
+    
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java?rev=886928&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java Thu Dec  3 21:32:56 2009
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+import java.nio.channels.SelectableChannel;
+
+import org.apache.activemq.dispatch.internal.simple.SimpleDispatchSystem;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class DispatchSystem {
+
+    private static final SimpleDispatchSystem system = new SimpleDispatchSystem(Runtime.getRuntime().availableProcessors());
+    
+    static DispatchQueue getMainQueue() {
+        return system.getMainQueue();
+    }
+    
+    public static enum DispatchQueuePriority {
+        HIGH,
+        DEFAULT,
+        LOW;
+    }
+
+    static public DispatchQueue getGlobalQueue(DispatchQueuePriority priority) {
+        return system.getGlobalQueue(priority);
+    }
+    
+    static DispatchQueue createQueue(String label) {
+        return system.createQueue(label);
+    }
+    
+    static DispatchQueue getCurrentQueue() {
+        return system.getCurrentQueue();
+    }
+    
+    static void dispatchMain() {
+        system.dispatchMain();
+    }
+
+    static DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue) {
+        return system.createSource(channel, interestOps, queue);
+    }
+    
+    
+
+}

Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AbstractPooledDispatcher.java (from r886891, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AbstractPooledDispatcher.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AbstractPooledDispatcher.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java&r1=886891&r2=886928&rev=886928&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AbstractPooledDispatcher.java Thu Dec  3 21:32:56 2009
@@ -14,20 +14,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.dispatch;
+package org.apache.activemq.dispatch.internal.advanced;
 
 import java.util.ArrayList;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-public abstract class AbstractPooledDispatcher<D extends IDispatcher> implements IDispatcher, PooledDispatcher<D> {
+public abstract class AbstractPooledDispatcher implements IDispatcher, PooledDispatcher {
 
     private final String name;
 
-    private final ThreadLocal<D> dispatcher = new ThreadLocal<D>();
-    private final ThreadLocal<PooledDispatchContext<D>> dispatcherContext = new ThreadLocal<PooledDispatchContext<D>>();
-    private final ArrayList<D> dispatchers = new ArrayList<D>();
+    private final ThreadLocal<IDispatcher> dispatcher = new ThreadLocal<IDispatcher>();
+    private final ThreadLocal<PooledDispatchContext> dispatcherContext = new ThreadLocal<PooledDispatchContext>();
+    private final ArrayList<IDispatcher> dispatchers = new ArrayList<IDispatcher>();
 
     final AtomicBoolean started = new AtomicBoolean();
     final AtomicBoolean shutdown = new AtomicBoolean();
@@ -35,12 +35,12 @@
     private int roundRobinCounter = 0;
     private int size;
 
-    protected ExecutionLoadBalancer<D> loadBalancer;
+    protected ExecutionLoadBalancer loadBalancer;
 
     protected AbstractPooledDispatcher(String name, int size) {
         this.name = name;
         this.size = size;
-        loadBalancer = new SimpleLoadBalancer<D>();
+        loadBalancer = new SimpleLoadBalancer();
     }
 
     /**
@@ -52,10 +52,10 @@
      *            The pool.
      * @return The new dispathcer.
      */
-    protected abstract D createDispatcher(String name, AbstractPooledDispatcher<D> pool) throws Exception;
+    protected abstract IDispatcher createDispatcher(String name, AbstractPooledDispatcher pool) throws Exception;
 
     /**
-     * @see org.apache.activemq.dispatch.IDispatcher#start()
+     * @see org.apache.activemq.dispatch.internal.advanced.IDispatcher#start()
      */
     public synchronized final void start() throws Exception {
         loadBalancer.start();
@@ -63,7 +63,7 @@
             // Create all the workers.
             try {
                 for (int i = 0; i < size; i++) {
-                    D dispatacher = createDispatcher(name + "-" + (i + 1), this);
+                    IDispatcher dispatacher = createDispatcher(name + "-" + (i + 1), this);
 
                     dispatchers.add(dispatacher);
                     dispatacher.start();
@@ -98,11 +98,11 @@
         loadBalancer.stop();
     }
 
-    public void setCurrentDispatchContext(PooledDispatchContext<D> context) {
+    public void setCurrentDispatchContext(PooledDispatchContext context) {
         dispatcherContext.set(context);
     }
 
-    public PooledDispatchContext<D> getCurrentDispatchContext() {
+    public PooledDispatchContext getCurrentDispatchContext() {
         return dispatcherContext.get();
     }
 
@@ -112,7 +112,7 @@
      * 
      * @return The currently executing dispatcher
      */
-    public D getCurrentDispatcher() {
+    public IDispatcher getCurrentDispatcher() {
         return dispatcher.get();
     }
 
@@ -120,19 +120,19 @@
      * A Dispatcher must call this to indicate that is has started it's dispatch
      * loop.
      */
-    public void onDispatcherStarted(D d) {
+    public void onDispatcherStarted(IDispatcher d) {
         dispatcher.set(d);
         loadBalancer.onDispatcherStarted(d);
     }
 
-    public ExecutionLoadBalancer<D> getLoadBalancer() {
+    public ExecutionLoadBalancer getLoadBalancer() {
         return loadBalancer;
     }
 
     /**
      * A Dispatcher must call this when exiting it's dispatch loop
      */
-    public void onDispatcherStopped(D d) {
+    public void onDispatcherStopped(IDispatcher d) {
         synchronized (dispatchers) {
             if (dispatchers.remove(d)) {
                 size--;
@@ -141,8 +141,8 @@
         loadBalancer.onDispatcherStopped(d);
     }
 
-    protected D chooseDispatcher() {
-        D d = dispatcher.get();
+    protected IDispatcher chooseDispatcher() {
+        IDispatcher d = dispatcher.get();
         if (d == null) {
             synchronized (dispatchers) {
                 if(dispatchers.isEmpty())

Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java (from r886891, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java&r1=886891&r2=886928&rev=886928&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java Thu Dec  3 21:32:56 2009
@@ -1,4 +1,4 @@
-package org.apache.activemq.dispatch;
+package org.apache.activemq.dispatch.internal.advanced;
 
 /**
  * Handy interface to signal classes which would like an IDispatcher instance

Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ExecutionLoadBalancer.java (from r886891, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ExecutionLoadBalancer.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ExecutionLoadBalancer.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java&r1=886891&r2=886928&rev=886928&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ExecutionLoadBalancer.java Thu Dec  3 21:32:56 2009
@@ -14,15 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.dispatch;
+package org.apache.activemq.dispatch.internal.advanced;
 
 
-import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
-import org.apache.activemq.dispatch.PooledDispatcher.PooledDispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.IDispatcher.DispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.PooledDispatcher.PooledDispatchContext;
 
-public interface ExecutionLoadBalancer<D extends IDispatcher> {
+public interface ExecutionLoadBalancer {
 
-    public interface ExecutionTracker<D extends IDispatcher> {
+    public interface ExecutionTracker {
         
         /**
          * Should be called when a {@link DispatchContext#requestDispatch()} is called.
@@ -30,7 +30,7 @@
          * @param caller The calling dispatcher
          * @param context The context from which the dispatch is requested.
          */
-        public void onDispatchRequest(D caller, PooledDispatchContext<D> context);
+        public void onDispatchRequest(IDispatcher caller, PooledDispatchContext context);
 
         /**
          * Must be called by the dispatcher when a {@link DispatchContext} is closed.
@@ -42,20 +42,20 @@
      * Must be called by a dispatch thread when it starts
      * @param dispatcher The dispatcher
      */
-    public void onDispatcherStarted(D dispatcher);
+    public void onDispatcherStarted(IDispatcher dispatcher);
 
     /**
      * Must be called by a dispatch thread when it stops
      * @param dispatcher The dispatcher
      */
-    public void onDispatcherStopped(D dispatcher);
+    public void onDispatcherStopped(IDispatcher dispatcher);
 
     /**
      * Gets an {@link ExecutionTracker} for the dispatch context. 
      * @param context
      * @return
      */
-    public ExecutionTracker<D> createExecutionTracker(PooledDispatchContext<D> context);
+    public ExecutionTracker createExecutionTracker(PooledDispatchContext context);
 
     /**
      * Starts execution tracking

Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/IDispatcher.java (from r886891, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/IDispatcher.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/IDispatcher.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/IDispatcher.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/IDispatcher.java&r1=886891&r2=886928&rev=886928&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/IDispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/IDispatcher.java Thu Dec  3 21:32:56 2009
@@ -14,13 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.dispatch;
+package org.apache.activemq.dispatch.internal.advanced;
 
 import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 
-public interface IDispatcher extends Executor{
+public interface IDispatcher extends Executor {
 
     /**
      * This interface is implemented by Dispatchable entities. A Dispatchable

Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatcher.java (from r886891, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatcher.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatcher.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java&r1=886891&r2=886928&rev=886928&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatcher.java Thu Dec  3 21:32:56 2009
@@ -14,23 +14,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.dispatch;
+package org.apache.activemq.dispatch.internal.advanced;
 
-import org.apache.activemq.dispatch.ExecutionLoadBalancer.ExecutionTracker;
-import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.ExecutionLoadBalancer.ExecutionTracker;
+import org.apache.activemq.dispatch.internal.advanced.IDispatcher.DispatchContext;
 
-public interface PooledDispatcher<D extends IDispatcher> {
+public interface PooledDispatcher {
 
     /**
      * A {@link PooledDispatchContext}s can be moved between different
      * dispatchers.
      */
-    public interface PooledDispatchContext<D extends IDispatcher> extends DispatchContext {
+    public interface PooledDispatchContext extends DispatchContext {
         /**
          * Called to transfer a {@link PooledDispatchContext} to a new
          * Dispatcher.
          */
-        public void assignToNewDispatcher(D newDispatcher);
+        public void assignToNewDispatcher(IDispatcher newDispatcher);
 
         /**
          * Gets the dispatcher to which this PooledDispatchContext currently
@@ -38,27 +38,27 @@
          * 
          * @return
          */
-        public D getDispatcher();
+        public IDispatcher getDispatcher();
 
         /**
          * Gets the execution tracker for the context.
          * 
          * @return the execution tracker for the context:
          */
-        public ExecutionTracker<D> getExecutionTracker();
+        public ExecutionTracker getExecutionTracker();
     }
 
     /**
      * A Dispatcher must call this from it's dispatcher thread to indicate that
      * is has started it's dispatch has started.
      */
-    public void onDispatcherStarted(D dispatcher);
+    public void onDispatcherStarted(IDispatcher dispatcher);
 
     /**
      * A Dispatcher must call this from it's dispatcher thread when exiting it's
      * dispatch loop
      */
-    public void onDispatcherStopped(D dispatcher);
+    public void onDispatcherStopped(IDispatcher dispatcher);
 
     /**
      * Returns the currently executing dispatcher, or null if the current thread
@@ -66,16 +66,16 @@
      * 
      * @return The currently executing dispatcher
      */
-    public D getCurrentDispatcher();
+    public IDispatcher getCurrentDispatcher();
 
-    public void setCurrentDispatchContext(PooledDispatchContext<D> context);
+    public void setCurrentDispatchContext(PooledDispatchContext context);
 
-    public PooledDispatchContext<D> getCurrentDispatchContext();
+    public PooledDispatchContext getCurrentDispatchContext();
 
     /**
      * Returns the load balancer for this dispatch pool.
      * 
      * @return
      */
-    public ExecutionLoadBalancer<D> getLoadBalancer();
+    public ExecutionLoadBalancer getLoadBalancer();
 }

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledPriorityDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledPriorityDispatcher.java?rev=886928&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledPriorityDispatcher.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledPriorityDispatcher.java Thu Dec  3 21:32:56 2009
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.dispatch.internal.advanced;
+
+import java.util.concurrent.Executor;
+
+final class PooledPriorityDispatcher extends AbstractPooledDispatcher {
+    private final int numPriorities;
+
+    PooledPriorityDispatcher(String name, int size, int numPriorities) {
+        super(name, size);
+        this.numPriorities = numPriorities;
+    }
+
+    @Override
+    protected final PriorityDispatcher createDispatcher(String name, AbstractPooledDispatcher pool) throws Exception {
+        // TODO Auto-generated method stub
+        return new PriorityDispatcher(name, numPriorities, this);
+    }
+
+    public PriorityDispatcher chooseDispatcher() {
+        return ((PriorityDispatcher)super.chooseDispatcher());
+    }
+    
+    public final Executor createPriorityExecutor(final int priority) {
+        return new Executor() {
+            public void execute(final Runnable runnable) {
+                chooseDispatcher().dispatch(new RunnableAdapter(runnable), priority);
+            }
+
+        };
+    }
+
+    public int getDispatchPriorities() {
+        // TODO Auto-generated method stub
+        return numPriorities;
+    }
+
+    public void execute(Runnable command) {
+        chooseDispatcher().dispatch(new RunnableAdapter(command), 0);
+    }
+
+}
\ No newline at end of file

Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PriorityDispatcher.java (from r886891, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PriorityDispatcher.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PriorityDispatcher.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java&r1=886891&r2=886928&rev=886928&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PriorityDispatcher.java Thu Dec  3 21:32:56 2009
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.dispatch;
+package org.apache.activemq.dispatch.internal.advanced;
 
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -25,15 +25,15 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.activemq.dispatch.ExecutionLoadBalancer.ExecutionTracker;
-import org.apache.activemq.dispatch.PooledDispatcher.PooledDispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.ExecutionLoadBalancer.ExecutionTracker;
+import org.apache.activemq.dispatch.internal.advanced.PooledDispatcher.PooledDispatchContext;
 import org.apache.activemq.util.Mapper;
 import org.apache.activemq.util.PriorityLinkedList;
 import org.apache.activemq.util.TimerHeap;
 import org.apache.activemq.util.list.LinkedNode;
 import org.apache.activemq.util.list.LinkedNodeList;
 
-public class PriorityDispatcher<D extends PriorityDispatcher<D>> implements Runnable, IDispatcher {
+public class PriorityDispatcher implements Runnable, IDispatcher {
 
     private static final boolean DEBUG = false;
     private Thread thread;
@@ -43,7 +43,7 @@
     protected final HashSet<PriorityDispatchContext> contexts = new HashSet<PriorityDispatchContext>();
 
     // Set if this dispatcher is part of a dispatch pool:
-    protected final PooledDispatcher<D> pooledDispatcher;
+    protected final PooledDispatcher pooledDispatcher;
 
     // The local dispatch queue:
     protected final PriorityLinkedList<PriorityDispatchContext> priorityQueue;
@@ -71,7 +71,7 @@
         }
     };
 
-    protected PriorityDispatcher(String name, int priorities, PooledDispatcher<D> pooledDispactcher) {
+    protected PriorityDispatcher(String name, int priorities, PooledDispatcher pooledDispactcher) {
         this.name = name;
         MAX_USER_PRIORITY = priorities - 1;
         priorityQueue = new PriorityLinkedList<PriorityDispatchContext>(MAX_USER_PRIORITY + 1, PRIORITY_MAPPER);
@@ -87,31 +87,7 @@
     }
 
     public static final IDispatcher createPriorityDispatchPool(String name, final int numPriorities, int size) {
-        return new AbstractPooledDispatcher<PriorityDispatcher>(name, size) {
-
-            @Override
-            protected final PriorityDispatcher createDispatcher(String name, AbstractPooledDispatcher<PriorityDispatcher> pool) throws Exception {
-                // TODO Auto-generated method stub
-                return new PriorityDispatcher(name, numPriorities, this);
-            }
-
-            public final Executor createPriorityExecutor(final int priority) {
-                return new Executor() {
-                    public void execute(final Runnable runnable) {
-                        chooseDispatcher().dispatch(new RunnableAdapter(runnable), priority);
-                    }
-                };
-            }
-
-            public int getDispatchPriorities() {
-                // TODO Auto-generated method stub
-                return numPriorities;
-            }
-
-            public void execute(Runnable command) {
-                chooseDispatcher().dispatch(new RunnableAdapter(command), 0);
-            }
-        };
+        return new PooledPriorityDispatcher(name, size, numPriorities);
     }
 
     @SuppressWarnings("unchecked")
@@ -217,7 +193,7 @@
 
         if (pooledDispatcher != null) {
             // Inform the dispatcher that we have started:
-            pooledDispatcher.onDispatcherStarted((D) this);
+            pooledDispatcher.onDispatcherStarted((PriorityDispatcher) this);
         }
 
         PriorityDispatchContext pdc;
@@ -291,7 +267,7 @@
             thrown.printStackTrace();
         } finally {
             if (pooledDispatcher != null) {
-                pooledDispatcher.onDispatcherStopped((D) this);
+                pooledDispatcher.onDispatcherStopped((PriorityDispatcher) this);
             }
             cleanup();
         }
@@ -435,25 +411,25 @@
         return name;
     }
 
-    private final D getCurrentDispatcher() {
+    private final PriorityDispatcher getCurrentDispatcher() {
         if (pooledDispatcher != null) {
-            return pooledDispatcher.getCurrentDispatcher();
+            return (PriorityDispatcher) pooledDispatcher.getCurrentDispatcher();
         } else if (Thread.currentThread() == thread) {
-            return (D) this;
+            return (PriorityDispatcher) this;
         } else {
             return null;
         }
 
     }
 
-    private final PooledDispatchContext<D> getCurrentDispatchContext() {
+    private final PooledDispatchContext getCurrentDispatchContext() {
         return pooledDispatcher.getCurrentDispatchContext();
     }
 
     /**
      * 
      */
-    protected class PriorityDispatchContext extends LinkedNode<PriorityDispatchContext> implements PooledDispatchContext<D> {
+    protected class PriorityDispatchContext extends LinkedNode<PriorityDispatchContext> implements PooledDispatchContext {
         // The dispatchable target:
         private final Dispatchable dispatchable;
         // The name of this context:
@@ -466,9 +442,9 @@
         // from foreign threads:
         final UpdateEvent updateEvent[];
 
-        private final ExecutionTracker<D> tracker;
-        protected D currentOwner;
-        private D updateDispatcher = null;
+        private final ExecutionTracker tracker;
+        protected PriorityDispatcher currentOwner;
+        private PriorityDispatcher updateDispatcher = null;
 
         private int priority;
         private boolean dispatchRequested = false;
@@ -478,9 +454,9 @@
         protected PriorityDispatchContext(Dispatchable dispatchable, boolean persistent, String name) {
             this.dispatchable = dispatchable;
             this.name = name;
-            this.currentOwner = (D) PriorityDispatcher.this;
+            this.currentOwner = (PriorityDispatcher) PriorityDispatcher.this;
             if (persistent && pooledDispatcher != null) {
-                this.tracker = pooledDispatcher.getLoadBalancer().createExecutionTracker((PooledDispatchContext<D>) this);
+                this.tracker = pooledDispatcher.getLoadBalancer().createExecutionTracker((PooledDispatchContext) this);
             } else {
                 this.tracker = null;
             }
@@ -490,8 +466,7 @@
             currentOwner.takeOwnership(this);
         }
 
-        @SuppressWarnings("unchecked")
-        private final PriorityDispatcher<D>.UpdateEvent[] createUpdateEvent() {
+        private final PriorityDispatcher.UpdateEvent[] createUpdateEvent() {
             return new PriorityDispatcher.UpdateEvent[2];
         }
 
@@ -500,7 +475,7 @@
          * 
          * @return the execution tracker for the context:
          */
-        public ExecutionTracker<D> getExecutionTracker() {
+        public ExecutionTracker getExecutionTracker() {
             return tracker;
         }
 
@@ -513,7 +488,7 @@
             return dispatchable.dispatch();
         }
 
-        public final void assignToNewDispatcher(D newDispatcher) {
+        public final void assignToNewDispatcher(IDispatcher newDispatcher) {
             synchronized (this) {
 
                 // If we're already set to this dispatcher
@@ -523,7 +498,7 @@
                     }
                 }
 
-                updateDispatcher = newDispatcher;
+                updateDispatcher = (PriorityDispatcher) newDispatcher;
                 if (DEBUG)
                     System.out.println(getName() + " updating to " + updateDispatcher);
 
@@ -534,7 +509,7 @@
 
         public void requestDispatch() {
 
-            D callingDispatcher = getCurrentDispatcher();
+            PriorityDispatcher callingDispatcher = getCurrentDispatcher();
             if (tracker != null)
                 tracker.onDispatchRequest(callingDispatcher, getCurrentDispatchContext());
 
@@ -574,7 +549,7 @@
             if (this.priority == priority) {
                 return;
             }
-            D callingDispatcher = getCurrentDispatcher();
+            PriorityDispatcher callingDispatcher = getCurrentDispatcher();
 
             // Otherwise this is coming off another thread, so we need to
             // synchronize to protect against ownership changes:
@@ -647,7 +622,7 @@
          * @param newDispatcher
          *            The new Dispatcher
          */
-        protected void switchedDispatcher(D oldDispatcher, D newDispatcher) {
+        protected void switchedDispatcher(PriorityDispatcher oldDispatcher, PriorityDispatcher newDispatcher) {
 
         }
 
@@ -656,7 +631,7 @@
         }
 
         public void close(boolean sync) {
-            D callingDispatcher = getCurrentDispatcher();
+            PriorityDispatcher callingDispatcher = getCurrentDispatcher();
             // System.out.println(this + "Closing");
             synchronized (this) {
                 closed = true;
@@ -695,13 +670,14 @@
             return dispatchable;
         }
 
-        public D getDispatcher() {
+        public PriorityDispatcher getDispatcher() {
             return currentOwner;
         }
 
         public String getName() {
             return name;
         }
+
     }
 
 	public String getName() {

Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java (from r886891, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java&r1=886891&r2=886928&rev=886928&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java Thu Dec  3 21:32:56 2009
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.dispatch;
+package org.apache.activemq.dispatch.internal.advanced;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -22,16 +22,16 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.activemq.dispatch.PooledDispatcher.PooledDispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.PooledDispatcher.PooledDispatchContext;
 
-public class SimpleLoadBalancer<D extends IDispatcher> implements ExecutionLoadBalancer<D> {
+public class SimpleLoadBalancer implements ExecutionLoadBalancer {
 
     private final boolean DEBUG = false;
 
     //TODO: Added plumbing for periodic rebalancing which we should
     //consider implementing
     private static final boolean ENABLE_UPDATES = false;
-    private final ArrayList<D> dispatchers = new ArrayList<D>();
+    private final ArrayList<IDispatcher> dispatchers = new ArrayList<IDispatcher>();
 
     private AtomicBoolean running = new AtomicBoolean(false);
     private boolean needsUpdate = false;
@@ -86,7 +86,7 @@
         running.compareAndSet(true, false);
     }
 
-    public synchronized final void onDispatcherStarted(D dispatcher) {
+    public synchronized final void onDispatcherStarted(IDispatcher dispatcher) {
         dispatchers.add(dispatcher);
         scheduleNext();
     }
@@ -94,20 +94,20 @@
     /**
      * A Dispatcher must call this when exiting it's dispatch loop
      */
-    public void onDispatcherStopped(D dispatcher) {
+    public void onDispatcherStopped(IDispatcher dispatcher) {
         dispatchers.remove(dispatcher);
     }
 
-    public ExecutionTracker<D> createExecutionTracker(PooledDispatchContext<D> context) {
+    public ExecutionTracker createExecutionTracker(PooledDispatchContext context) {
         return new SimpleExecutionTracker(context);
     }
 
-    private static class ExecutionStats<D extends IDispatcher> {
-        final PooledDispatchContext<D> target;
-        final PooledDispatchContext<D> source;
+    private static class ExecutionStats {
+        final PooledDispatchContext target;
+        final PooledDispatchContext source;
         int count;
 
-        ExecutionStats(PooledDispatchContext<D> source, PooledDispatchContext<D> target) {
+        ExecutionStats(PooledDispatchContext source, PooledDispatchContext target) {
             this.target = target;
             this.source = source;
         }
@@ -117,15 +117,15 @@
         }
     }
 
-    private class SimpleExecutionTracker implements ExecutionTracker<D> {
-        private final HashMap<PooledDispatchContext<D>, ExecutionStats<D>> sources = new HashMap<PooledDispatchContext<D>, ExecutionStats<D>>();
-        private final PooledDispatchContext<D> context;
+    private class SimpleExecutionTracker implements ExecutionTracker {
+        private final HashMap<PooledDispatchContext, ExecutionStats> sources = new HashMap<PooledDispatchContext, ExecutionStats>();
+        private final PooledDispatchContext context;
         private final AtomicInteger work = new AtomicInteger(0);
 
-        private PooledDispatchContext<D> singleSource;
+        private PooledDispatchContext singleSource;
         private IDispatcher currentOwner;
 
-        SimpleExecutionTracker(PooledDispatchContext<D> context) {
+        SimpleExecutionTracker(PooledDispatchContext context) {
             this.context = context;
             currentOwner = context.getDispatcher();
         }
@@ -144,7 +144,7 @@
          * @return True if this method resulted in the dispatch request being
          *         assigned to another dispatcher.
          */
-        public void onDispatchRequest(D callingDispatcher, PooledDispatchContext<D> callingContext) {
+        public void onDispatchRequest(IDispatcher callingDispatcher, PooledDispatchContext callingContext) {
 
             if (callingContext != null) {
                 // Make sure we are being called by another node:
@@ -156,7 +156,7 @@
                 if (singleSource != callingContext) {
                     if (singleSource == null && sources.isEmpty()) {
                         singleSource = callingContext;
-                        ExecutionStats<D> stats = new ExecutionStats<D>(callingContext, context);
+                        ExecutionStats stats = new ExecutionStats(callingContext, context);
                         stats.count++;
                         sources.put(callingContext, stats);
 
@@ -172,9 +172,9 @@
 
                     } else {
 
-                        ExecutionStats<D> stats = sources.get(callingContext);
+                        ExecutionStats stats = sources.get(callingContext);
                         if (stats == null) {
-                            stats = new ExecutionStats<D>(callingContext, context);
+                            stats = new ExecutionStats(callingContext, context);
                             sources.put(callingContext, stats);
                         }
                         stats.count++;

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/AbstractDispatchObject.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/AbstractDispatchObject.java?rev=886928&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/AbstractDispatchObject.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/AbstractDispatchObject.java Thu Dec  3 21:32:56 2009
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal.simple;
+
+import org.apache.activemq.dispatch.DispatchObject;
+import org.apache.activemq.dispatch.DispatchQueue;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class AbstractDispatchObject implements DispatchObject {
+
+    protected Object context;
+    protected Runnable finalizer;
+    protected DispatchQueue targetQueue;
+
+    @SuppressWarnings("unchecked")
+    public <Context> Context getContext() {
+        return (Context) context;
+    }
+    
+    public <Context> void setContext(Context context) {
+        this.context = context;
+    }
+
+    public void setFinalizer(Runnable finalizer) {
+        this.finalizer = finalizer;
+    }
+
+    public void setTargetQueue(DispatchQueue targetQueue) {
+        this.targetQueue = targetQueue;
+    }
+
+
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/Dispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/Dispatcher.java?rev=886928&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/Dispatcher.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/Dispatcher.java Thu Dec  3 21:32:56 2009
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.dispatch.internal.simple;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+final public class Dispatcher extends Thread {
+    private final SimpleDispatchSystem system;
+    
+    public Dispatcher(SimpleDispatchSystem javaDispatchSystem, int ordinal) {
+        system = javaDispatchSystem;
+        setName("dispatcher:"+(ordinal+1));
+        setDaemon(true);
+        
+    }
+    
+    @Override
+    public void run() {
+        GlobalDispatchQueue[] dispatchQueues = system.globalQueues;
+        while( true ) {
+            for (GlobalDispatchQueue queue : dispatchQueues) {
+                SimpleDispatchSystem.CURRENT_QUEUE.set(queue);
+                ConcurrentLinkedQueue<Runnable> runnables = queue.runnables;
+                Runnable runnable;
+                while( (runnable = runnables.poll())!=null ) {
+                    system.globalQueuedRunnables.decrementAndGet();
+                    dispatch(runnable);
+                }
+            }
+            if( system.globalQueuedRunnables.get()==0 ) {
+                try {
+                    system.waitForWakeup();
+                } catch (InterruptedException e) {
+                    return;
+                }
+            }
+        }
+    }
+
+    private void dispatch(Runnable runnable) {
+        try {
+            runnable.run();
+        } catch (Throwable e) {
+            e.printStackTrace();
+        }
+    }
+   
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java?rev=886928&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java Thu Dec  3 21:32:56 2009
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal.simple;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class GlobalDispatchQueue implements DispatchQueue {
+
+    private final SimpleDispatchSystem system;
+    final String label;
+    final ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue<Runnable>();
+    
+    public GlobalDispatchQueue(SimpleDispatchSystem system, DispatchQueuePriority priority) {
+        this.system = system;
+        this.label=priority.toString();
+    }
+
+    public String getLabel() {
+        return label;
+    }
+
+    public void dispatchAsync(Runnable runnable) {
+        runnables.add(runnable);
+        system.wakeup();
+    }
+
+    public void dispatchAfter(long delayMS, Runnable runnable) {
+        throw new RuntimeException("TODO: implement me.");
+    }
+
+    public void dispatchSync(final Runnable runnable) throws InterruptedException {
+        dispatchApply(1, runnable);
+    }
+    
+    public void dispatchApply(int iterations, final Runnable runnable) throws InterruptedException {
+        QueueSupport.dispatchApply(this, iterations, runnable);
+    }
+
+    public void resume() {
+        throw new UnsupportedOperationException();
+    }
+
+    public void suspend() {
+        throw new UnsupportedOperationException();
+    }
+
+    public <Context> Context getContext() {
+        throw new UnsupportedOperationException();
+    }
+
+    public <Context> void setContext(Context context) {
+        throw new UnsupportedOperationException();
+    }
+
+    public void setFinalizer(Runnable finalizer) {
+        throw new UnsupportedOperationException();
+    }
+
+    public void setTargetQueue(DispatchQueue queue) {
+        throw new UnsupportedOperationException();
+    }
+
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/QueueSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/QueueSupport.java?rev=886928&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/QueueSupport.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/QueueSupport.java Thu Dec  3 21:32:56 2009
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal.simple;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class QueueSupport {
+
+    static public void dispatchApply(DispatchQueue queue, int itterations, final Runnable runnable) throws InterruptedException {
+        final CountDownLatch done = new CountDownLatch(itterations);
+        Runnable wrapper = new Runnable() {
+            public void run() {
+                try {
+                    runnable.run();
+                } finally {
+                    done.countDown();
+                }
+            }
+        };
+        for( int i=0; i < itterations; i++ ) { 
+            queue.dispatchAsync(wrapper);
+        }
+        done.await();
+    }
+    
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java?rev=886928&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java Thu Dec  3 21:32:56 2009
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal.simple;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class SerialDispatchQueue extends AbstractDispatchObject implements DispatchQueue, Runnable {
+
+    private final ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue<Runnable>();
+    final private String label;
+    final private AtomicInteger suspendCounter = new AtomicInteger();
+    final private AtomicLong size = new AtomicLong();
+
+    public SerialDispatchQueue(String label) {
+        this.label = label;
+    }
+
+    public String getLabel() {
+        return label;
+    }
+
+    public void resume() {
+        suspendCounter.decrementAndGet();
+    }
+
+    public void suspend() {
+        suspendCounter.incrementAndGet();
+    }
+
+    public void dispatchAfter(long delayMS, Runnable runnable) {
+        throw new RuntimeException("TODO: implement me.");
+    }
+
+    public void dispatchAsync(Runnable runnable) {
+        if( runnable == null ) {
+            throw new IllegalArgumentException();
+        }
+        long lastSize = size.incrementAndGet();
+        runnables.add(runnable);
+        if( targetQueue!=null && lastSize == 1 && suspendCounter.get()<=0 ) {
+            targetQueue.dispatchAsync(this);
+        }
+    }
+
+    public void run() {
+        DispatchQueue original = SimpleDispatchSystem.CURRENT_QUEUE.get();
+        SimpleDispatchSystem.CURRENT_QUEUE.set(this);
+        try {
+            Runnable runnable;
+            long lsize = size.get();
+            while( suspendCounter.get() <= 0 && lsize > 0 ) {
+                try {
+                    runnable = runnables.poll();
+                    if( runnable!=null ) {
+                        runnable.run();
+                        lsize = size.decrementAndGet();
+                    }
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                }
+            }
+        } finally {
+            SimpleDispatchSystem.CURRENT_QUEUE.set(original);
+        }
+    }
+
+    public void dispatchSync(Runnable runnable) throws InterruptedException {
+        dispatchApply(1, runnable);
+    }
+    
+    public void dispatchApply(int iterations, Runnable runnable) throws InterruptedException {
+        QueueSupport.dispatchApply(this, iterations, runnable);
+    }
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSystem.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSystem.java?rev=886928&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSystem.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSystem.java Thu Dec  3 21:32:56 2009
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal.simple;
+
+import java.nio.channels.SelectableChannel;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.DispatchSource;
+import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
+
+import static org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority.*;
+
+
+/**
+ * Implements a simple dispatch system.
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class SimpleDispatchSystem {
+    
+    static final ThreadLocal<DispatchQueue> CURRENT_QUEUE = new ThreadLocal<DispatchQueue>();
+    
+    final SerialDispatchQueue mainQueue = new SerialDispatchQueue("main");
+    final GlobalDispatchQueue globalQueues[]; 
+    final Dispatcher dispatchers[];
+
+    private final Object wakeupMutex = new Object();
+    final AtomicLong globalQueuedRunnables = new AtomicLong();
+    
+    public SimpleDispatchSystem(int size) {
+        globalQueues = new GlobalDispatchQueue[3];
+        for (int i = 0; i < 3; i++) {
+            globalQueues[i] = new GlobalDispatchQueue(this, DispatchQueuePriority.values()[i] );
+        }
+                                  
+        dispatchers = new Dispatcher[size];
+        for (int i = 0; i < size; i++) {
+            dispatchers[i] = new Dispatcher(this, i);
+            dispatchers[i].start();
+            
+        }
+    }
+    
+    public DispatchQueue getMainQueue() {
+        return mainQueue;
+    }
+    
+    public DispatchQueue getGlobalQueue(DispatchQueuePriority priority) {
+        return globalQueues[priority.ordinal()];
+    }
+    
+    public DispatchQueue createQueue(String label) {
+        SerialDispatchQueue rc = new SerialDispatchQueue(label);
+        rc.setTargetQueue(getGlobalQueue(DEFAULT));
+        return rc;
+    }
+    
+    public DispatchQueue getCurrentQueue() {
+        return CURRENT_QUEUE.get();
+    }
+    
+    public void dispatchMain() {
+        mainQueue.run();
+    }
+
+    public DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue) {
+        return null;
+    }
+
+    public void waitForWakeup() throws InterruptedException {
+        while( globalQueuedRunnables.get()==0 ) {
+            synchronized(wakeupMutex) {
+                wakeupMutex.wait();
+            }
+        }
+    }
+    
+    void wakeup() {
+        if( globalQueuedRunnables.incrementAndGet() < dispatchers.length ) {
+            synchronized(wakeupMutex) {
+                wakeupMutex.notify();
+            }
+        }
+    }
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java?rev=886928&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java Thu Dec  3 21:32:56 2009
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+import java.util.concurrent.CountDownLatch;
+
+import static java.lang.String.*;
+import static org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority.*;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class DispatchSystemTest {
+    
+    public static void main(String[] args) throws InterruptedException {
+        benchmark("private serial queue", DispatchSystem.createQueue("test"));
+        benchmark("global queue", DispatchSystem.getGlobalQueue(DEFAULT));
+    }
+
+    private static void benchmark(String name, DispatchQueue queue) throws InterruptedException {
+        // warm the JIT up..
+        benchmarkWork(queue, 100000);
+        
+        int iterations = 1000*1000*20;
+        long start = System.nanoTime();
+        benchmarkWork(queue, iterations);
+        long end = System.nanoTime();
+        
+        double durationMS = 1.0d*(end-start)/1000000d;
+        double rate = 1000d * iterations / durationMS;
+        
+        System.out.println(format("name: %s, duration: %,.3f ms, rate: %,.2f executions/sec", name, durationMS, rate));
+    }
+
+    private static void benchmarkWork(final DispatchQueue queue, int iterations) throws InterruptedException {
+        final CountDownLatch counter = new CountDownLatch(iterations);
+        Runnable task = new Runnable(){
+            public void run() {
+                counter.countDown();
+                if( counter.getCount()>0 ) {
+                    queue.dispatchAsync(this);
+                }
+            }
+        };
+        for (int i = 0; i < 1000; i++) {
+            queue.dispatchAsync(task);
+        }
+        counter.await();
+    }
+}