You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/12/16 21:06:10 UTC

svn commit: r891410 - in /activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src: main/java/org/apache/activemq/dispatch/ main/java/org/apache/activemq/dispatch/internal/ main/java/org/apache/activemq/dispatch/internal/advanced/ main/java/org/a...

Author: cmacnaug
Date: Wed Dec 16 20:06:09 2009
New Revision: 891410

URL: http://svn.apache.org/viewvc?rev=891410&view=rev
Log:
Some changes in support of an NIODispatchSource

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java
      - copied, changed from r890921, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/SelectableDispatchContext.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIOSourceHandler.java
      - copied, changed from r890887, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatcherThread.java
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatcherThread.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/SelectableDispatchContext.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SerialDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorBenchmark.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/AsmActorTest.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java?rev=891410&r1=891409&r2=891410&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java Wed Dec 16 20:06:09 2009
@@ -21,7 +21,7 @@
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 public interface DispatchSource extends DispatchObject {
-
+    
     public void cancel();
     public boolean isCanceled();
     

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java?rev=891410&r1=891409&r2=891410&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java Wed Dec 16 20:06:09 2009
@@ -70,10 +70,6 @@
         suspendCounter.incrementAndGet();
     }
 
-    public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
-        throw new RuntimeException("TODO: implement me.");
-    }
-
     public void execute(Runnable command) {
         dispatchAsync(command);
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java?rev=891410&r1=891409&r2=891410&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java Wed Dec 16 20:06:09 2009
@@ -5,7 +5,7 @@
 
 public class BaseRetained {
     
-    final protected AtomicInteger reatinCounter = new AtomicInteger(0);
+    final protected AtomicInteger retainCounter = new AtomicInteger(0);
     final protected ArrayList<Runnable> shutdownHandlers = new ArrayList<Runnable>();
 
     public void addShutdownWatcher(Runnable shutdownHandler) {
@@ -15,16 +15,21 @@
     }
     
     public void retain() {
-        if( reatinCounter.getAndIncrement() == 0 ) {
+        if( retainCounter.getAndIncrement() == 0 ) {
             startup();
         }
     }
 
     public void release() {
-        if( reatinCounter.decrementAndGet()==0 ) {
+        if( retainCounter.decrementAndGet()==0 ) {
             shutdown();
         }
     }
+    
+    protected boolean isShutdown()
+    {
+        return retainCounter.get() <= 0;
+    }
 
     /**
      * Subclasses should override if they want to do some startup processing. 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java?rev=891410&r1=891409&r2=891410&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java Wed Dec 16 20:06:09 2009
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.dispatch.internal.advanced;
 
+import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectableChannel;
 import java.util.ArrayList;
 import java.util.concurrent.Executor;
@@ -31,15 +32,15 @@
 import org.apache.activemq.dispatch.Dispatcher;
 import org.apache.activemq.dispatch.DispatcherConfig;
 import org.apache.activemq.dispatch.internal.BaseRetained;
+import org.apache.activemq.dispatch.internal.nio.NIODispatchSource;
 
 import static org.apache.activemq.dispatch.DispatchPriority.*;
 
-
 final public class AdvancedDispatcher extends BaseRetained implements Dispatcher {
 
     public final static ThreadLocal<DispatchQueue> CURRENT_QUEUE = new ThreadLocal<DispatchQueue>();
 
-    final SerialDispatchQueue mainQueue = new SerialDispatchQueue("main");
+    final SerialDispatchQueue mainQueue;
     final GlobalDispatchQueue globalQueues[];
     final AtomicLong globalQueuedRunnables = new AtomicLong();
 
@@ -54,6 +55,7 @@
     public AdvancedDispatcher(DispatcherConfig config) {
         this.size = config.getThreads();
         this.numPriorities = 3;
+        this.mainQueue = new SerialDispatchQueue(this, "main");
         globalQueues = new GlobalDispatchQueue[3];
         for (int i = 0; i < 3; i++) {
             globalQueues[i] = new GlobalDispatchQueue(this, DispatchPriority.values()[i]);
@@ -64,20 +66,26 @@
     /**
      * @see org.apache.activemq.dispatch.internal.advanced.DispatcherThread#start()
      */
-    protected void startup()  {
+    protected void startup() {
         loadBalancer.start();
         for (int i = 0; i < size; i++) {
-            DispatcherThread dispatacher = new DispatcherThread(this, ("dispatcher -" + (i + 1)), numPriorities);
-            dispatchers.add(dispatacher);
-            dispatacher.start();
+            DispatcherThread dispatacher;
+            try {
+                dispatacher = new DispatcherThread(this, ("dispatcher -" + (i + 1)), numPriorities);
+                dispatchers.add(dispatacher);
+                dispatacher.start();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
         }
     }
-    
+
     protected void shutdown() {
         Runnable countDown = new Runnable() {
             AtomicInteger shutdownCountDown = new AtomicInteger(dispatchers.size());
+
             public void run() {
-                if( shutdownCountDown.decrementAndGet()==0 ) {
+                if (shutdownCountDown.decrementAndGet() == 0) {
                     // Notify any registered shutdown watchers.
                     AdvancedDispatcher.super.shutdown();
                 }
@@ -119,8 +127,7 @@
         DispatcherThread d = DispatcherThread.CURRENT.get();
         if (d == null) {
             synchronized (dispatchers) {
-                if(dispatchers.isEmpty())
-                {
+                if (dispatchers.isEmpty()) {
                     throw new RejectedExecutionException();
                 }
                 if (++roundRobinCounter >= size) {
@@ -133,14 +140,14 @@
         }
     }
 
-//    public DispatchContext register(Runnable runnable, String name) {
-//        return chooseDispatcher().register(runnable, name);
-//    }
-
-	public int getSize() {
-		return size;
-	}
-	
+    // public DispatchContext register(Runnable runnable, String name) {
+    // return chooseDispatcher().register(runnable, name);
+    // }
+
+    public int getSize() {
+        return size;
+    }
+
     public final Executor createPriorityExecutor(final int priority) {
         return new Executor() {
             public void execute(final Runnable runnable) {
@@ -158,7 +165,7 @@
     public void execute(Runnable command) {
         chooseDispatcher().dispatch(command, 0);
     }
-    
+
     public void execute(Runnable command, int priority) {
         chooseDispatcher().dispatch(command, priority);
     }
@@ -170,11 +177,11 @@
     public void schedule(final Runnable runnable, int priority, long delay, TimeUnit timeUnit) {
         chooseDispatcher().schedule(runnable, priority, delay, timeUnit);
     }
-    
+
     public DispatchQueue getMainQueue() {
         return mainQueue;
     }
-    
+
     public DispatchQueue getGlobalQueue() {
         return getGlobalQueue(DEFAULT);
     }
@@ -182,19 +189,31 @@
     public DispatchQueue getGlobalQueue(DispatchPriority priority) {
         return globalQueues[priority.ordinal()];
     }
-    
+
     public DispatchQueue createSerialQueue(String label, DispatchOption... options) {
-        SerialDispatchQueue rc = new SerialDispatchQueue(label, options);
+        SerialDispatchQueue rc = new SerialDispatchQueue(this, label, options);
         rc.setTargetQueue(getGlobalQueue());
         return rc;
     }
-    
+
     public void dispatchMain() {
         mainQueue.run();
     }
 
     public DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue) {
-        return null;
+        NIODispatchSource source = new NIODispatchSource();
+        try {
+            source.setChannel(channel);
+        } catch (ClosedChannelException e) {
+            e.printStackTrace();
+        }
+        source.setMask(interestOps);
+        //Dispatch Source must be sticky so that it sticks to it's thread's selector:
+        if (!(queue.getOptions().contains(DispatchOption.STICK_TO_CALLER_THREAD) || queue.getOptions().contains(DispatchOption.STICK_TO_CALLER_THREAD))) {
+            throw new IllegalStateException("Source dispatch queue must be sticky");
+        }
+        source.setTargetQueue(queue);
+        return source;
     }
 
     public DispatchQueue getCurrentQueue() {
@@ -203,10 +222,10 @@
 
     public DispatchQueue getCurrentThreadQueue() {
         DispatcherThread thread = DispatcherThread.CURRENT.get();
-        if( thread==null ) {
+        if (thread == null) {
             return null;
         }
         return thread.currentDispatchQueue;
-    }    
+    }
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedQueue.java?rev=891410&r1=891409&r2=891410&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedQueue.java Wed Dec 16 20:06:09 2009
@@ -1,5 +1,12 @@
 package org.apache.activemq.dispatch.internal.advanced;
 
+import org.apache.activemq.dispatch.internal.simple.GlobalDispatchQueue;
+import org.apache.activemq.dispatch.internal.simple.SerialDispatchQueue;
+import org.apache.activemq.dispatch.internal.simple.ThreadDispatchQueue;
+
 public interface AdvancedQueue {
 
+    SerialDispatchQueue isSerialDispatchQueue();
+    ThreadDispatchQueue isThreadDispatchQueue();
+    GlobalDispatchQueue isGlobalDispatchQueue();
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java?rev=891410&r1=891409&r2=891410&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java Wed Dec 16 20:06:09 2009
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.dispatch.internal.advanced;
 
+import java.io.IOException;
+import java.nio.channels.SelectableChannel;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.concurrent.Executor;
@@ -24,6 +26,9 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.dispatch.DispatchPriority;
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.DispatchSource;
+import org.apache.activemq.dispatch.internal.nio.NIOSourceHandler;
 import org.apache.activemq.util.Mapper;
 import org.apache.activemq.util.PriorityLinkedList;
 import org.apache.activemq.util.TimerHeap;
@@ -73,10 +78,13 @@
             return element.listPrio;
         }
     };
+    
+
+    private final NIOSourceHandler nioHandler;
 
-    protected DispatcherThread(AdvancedDispatcher dispatcher, String name, int priorities) {
+    protected DispatcherThread(AdvancedDispatcher dispatcher, String name, int priorities) throws IOException {
         this.name = name;
-        
+        this.nioHandler = new NIOSourceHandler(this);
         this.dispatchQueues = new ThreadDispatchQueue[3];
         for (int i = 0; i < 3; i++) {
             dispatchQueues[i] = new ThreadDispatchQueue(this, DispatchPriority.values()[i]);
@@ -133,6 +141,11 @@
                         if( onShutdown!=null ) {
                             onShutdown.run();
                         }
+                        try {
+                            nioHandler.shutdown();
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        }
                     }
                 }, MAX_USER_PRIORITY + 1);
                 Thread rc = thread;
@@ -374,5 +387,4 @@
     public String getName() {
         return name;
     }
-
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java?rev=891410&r1=891409&r2=891410&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java Wed Dec 16 20:06:09 2009
@@ -35,6 +35,7 @@
     private final String label;
     private final AdvancedDispatcher dispatcher;
     private final DispatchPriority priority;
+    private Object context;
     
     public GlobalDispatchQueue(AdvancedDispatcher dispatcher, DispatchPriority priority) {
         this.dispatcher = dispatcher;
@@ -75,11 +76,11 @@
     }
 
     public <Context> Context getContext() {
-        throw new UnsupportedOperationException();
+        return (Context) context;
     }
 
     public <Context> void setContext(Context context) {
-        throw new UnsupportedOperationException();
+        this.context = context;
     }
 
     public void addShutdownWatcher(Runnable finalizer) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SerialDispatchQueue.java?rev=891410&r1=891409&r2=891410&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SerialDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SerialDispatchQueue.java Wed Dec 16 20:06:09 2009
@@ -1,13 +1,17 @@
 package org.apache.activemq.dispatch.internal.advanced;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.activemq.dispatch.DispatchOption;
 import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.internal.AbstractSerialDispatchQueue;
 
 public class SerialDispatchQueue extends AbstractSerialDispatchQueue {
 
-    public SerialDispatchQueue(String label, DispatchOption...options) {
+    AdvancedDispatcher dispather;
+    public SerialDispatchQueue(AdvancedDispatcher dispather, String label, DispatchOption...options) {
         super(label, options);
+        this.dispather = dispather;
 //        context = new DispatchContext(this, true, label);
 }
     
@@ -28,5 +32,7 @@
         }
     }
     
-
+    public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
+        dispather.schedule(runnable, delay, unit);
+    }
 }

Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java (from r890921, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/SelectableDispatchContext.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/SelectableDispatchContext.java&r1=890921&r2=891410&rev=891410&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/SelectableDispatchContext.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java Wed Dec 16 20:06:09 2009
@@ -11,9 +11,11 @@
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
+import org.apache.activemq.dispatch.DispatchSource;
+import org.apache.activemq.dispatch.internal.AbstractDispatchObject;
 
 /**
  * SelectableDispatchContext
@@ -24,144 +26,133 @@
  * @author cmacnaug
  * @version 1.0
  */
-class SelectableDispatchContext extends DispatchContext {
+public class NIODispatchSource extends AbstractDispatchObject implements DispatchSource {
     public static final boolean DEBUG = false;
+    private final AtomicLong suspendCounter = new AtomicLong(1);
     private SelectableChannel channel;
     private SelectionKey key;
-    private int updateInterests = -1;
+    private int interestOps = 0;
     private int readyOps = 0;
+    private NIOSourceHandler sourceHandler;
+    private final EventHandler eventHandler = new EventHandler();
+    private Runnable cancelHandler;
 
-    SelectableDispatchContext(DispatcherThread thread, Runnable runnable, String name) {
-        super(thread, runnable, true, name);
-
+    public NIODispatchSource() {
+        super.retain();
     }
 
-    
     /**
-     * This can be called to set a channel on which the Dispatcher will 
-     * perform selection operations. The channel may be changed over time. 
-     * 
-     * This method may only be called from the provided {@link Dispatchable}
-     * dispatch method, and is not thread safe.
+     * This can be called to set a channel on which the Dispatcher will perform
+     * selection operations. The channel may be changed over time, but only from
+     * the registered event handler.
      * 
-     * @param channel The channel on which to select.
-     * @throws ClosedChannelException If a closed chanel is provided
+     * @param channel
+     *            The channel on which to select.
+     * @throws ClosedChannelException
+     *             If a closed channel is provided
      */
     public void setChannel(SelectableChannel channel) throws ClosedChannelException {
         if (this.channel != channel) {
-            if (isClosed()) {
+            if (super.isShutdown()) {
                 return;
             }
-            int interests = 0;
+            int interests = interestOps;
             if (key != null) {
-                interests = key.interestOps();
+                interests |= key.interestOps();
                 key.cancel();
                 key = null;
             }
             this.channel = channel;
             if (channel != null) {
-                updateInterestOps(interests);
+                setMask(interests);
             }
         }
     }
 
     /**
-     * May be overriden by subclass to additional work on dispatcher switch
+     * Set's the handler for this source, this may only be called from the event
+     * handler
      * 
-     * @param oldDispatcher
-     *            The old dispatcher
-     * @param newDispatcher
-     *            The new Dispatcher
+     * @param newHandler
+     *            The new handler for the source
      */
-    protected void switchedDispatcher(DispatcherThread oldDispatcher, DispatcherThread newDispatcher) {
-        if (DEBUG) {
-            if (oldDispatcher == newDispatcher) {
-                System.out.println(this + " switching to same dispatcher " + newDispatcher + Thread.currentThread());
-            }
-        }
-        if (channel != null) {
-            if (DEBUG)
-                System.out.println(this + "Canceling key on dispatcher switch: " + oldDispatcher + newDispatcher);
+    protected void setHandler(NIOSourceHandler newHandler) {
 
-            SelectionKey exising = channel.keyFor(((NIODispatcherThread)oldDispatcher).getSelector());
-            if (exising != null) {
-                updateInterests = exising.interestOps();
-                exising.cancel();
-            }
-        }
-    }
-
-    public void processForeignUpdates() {
-        synchronized (this) {
+        if (sourceHandler != newHandler) {
             if (channel != null) {
-                if (updateInterests > 0) {
-                    if (DEBUG)
-                        debug(this + "processing foreign update interests: " + updateInterests);
-                    updateInterestOps(updateInterests);
-                    updateInterests = -1;
+                if (DEBUG)
+                    System.out.println(this + "Canceling key on source handler switch: " + sourceHandler + "-" + newHandler);
+
+                SelectionKey exising = channel.keyFor(sourceHandler.getSelector());
+                if (exising != null) {
+                    interestOps = exising.interestOps();
+                    exising.cancel();
                 }
             }
+        } else if (DEBUG) {
+            if (this.sourceHandler == newHandler) {
+                System.out.println(this + " switching to new source handler " + sourceHandler + Thread.currentThread());
+            }
         }
-        super.processForeignUpdates();
     }
 
     /**
      * This call updates the interest ops on which the dispatcher should select.
-     * When an interest becomes ready, the dispatcher will call the {@link Dispatchable}'s 
-     * dispatch() method. At that time, {@link #readyOps()} can be called to see what
-     * interests are now ready. 
-     * 
-     * This method may only be called from {@link Dispatchable#dispatch()} and is not
-     * threadsafe. If the {@link Dispatchable} wishes to change it's interest op it
-     * must call {@link #requestDispatch()} so that they can be changed from the dispatch()
-     * method.
+     * When an interest becomes ready, the eventHandler for the source will be
+     * called. At that time the EventHandler can call {@link #getData()} to see
+     * which ops are ready.
+     * 
+     * The masks may be changed over time, but it is only legal to do so from
+     * the supplied eventHandler.
      * 
-     * @param interestOps The interest ops. 
+     * @param interestOps
+     *            The interest ops.
      */
-    public void updateInterestOps(int ops) {
+    public void setMask(long ops) {
+
         readyOps &= ~ops;
         if (key != null && key.isValid()) {
-            key.interestOps(key.interestOps() | ops);
+            key.interestOps(key.interestOps() | (int) ops);
         } else {
-            if (isClosed()) {
+
+            if (isShutdown() || suspendCounter.get() > 0) {
+                interestOps |= (int) ops;
                 return;
             }
 
             // Make sure that we don't already have an invalidated key for
             // this selector. If we do then do a select to get rid of the
             // key:
-            SelectionKey existing = channel.keyFor( ((NIODispatcherThread)target).getSelector());
+            SelectionKey existing = channel.keyFor(sourceHandler.getSelector());
             if (existing != null && !existing.isValid()) {
                 if (DEBUG)
-                    debug(this + " registering existing invalid key:" + target + Thread.currentThread());
+                    debug(this + " registering existing invalid key:" + sourceHandler + Thread.currentThread());
                 try {
-                    ((NIODispatcherThread)target).getSelector().selectNow();
+                    sourceHandler.getSelector().selectNow();
                 } catch (IOException e) {
                     // TODO Auto-generated catch block
                     e.printStackTrace();
                 }
             }
             if (DEBUG)
-                System.out.println(this + " registering new key with interests: " + ops);
+                System.out.println(this + " registering new key with interests: " + interestOps);
             try {
-                key = channel.register( ((NIODispatcherThread)target).getSelector(), ops, this);
+                key = channel.register(sourceHandler.getSelector(), interestOps, this);
             } catch (ClosedChannelException e) {
-                // TODO Auto-generated catch block
-                e.printStackTrace();
+                throw new IllegalStateException("Channel was closed", e);
             }
         }
     }
 
     /**
-     * This call retrieves the operations that have become ready since the last call
-     * to {@link #readyOps()}. Calling this method clears the ready ops. 
+     * This call retrieves the operations that have become ready since the last
+     * call to {@link #readyOps()}. Calling this method clears the ready ops.
      * 
-     * This method may only be called from {@link Dispatchable#dispatch()} and is not
-     * threadsafe. 
+     * It is only legal to call this method from the supplied eventHandler.
      * 
-     * @return the readyOps. 
+     * @return the readyOps.
      */
-    public int readyOps() {
+    public long getData() {
         return readyOps;
         /*
          * if (key == null || !key.isValid()) { return 0; } else { return
@@ -169,37 +160,123 @@
          */
     }
 
-    public boolean onSelect() {
+    final boolean onSelect() {
         readyOps = key.readyOps();
         key.interestOps(key.interestOps() & ~key.readyOps());
-        synchronized (this) {
-            if (!isLinked()) {
-                target.execute(runnable, listPrio);
-            }
+        if (suspendCounter.get() <= 0) {
+            eventHandler.addToQueue();
         }
 
         // System.out.println(this + "onSelect " + key.readyOps() + "/" +
-        return true;
+        return key.interestOps() == 0;
     }
 
-    public void close(boolean sync) {
+    @Override
+    protected void shutdown() {
         // actual close can only happen on the owning dispatch thread:
-        if (target == DispatcherThread.CURRENT.get()) {
-
-            if (key != null && key.isValid()) {
-                // This will make sure that the key is removed
-                // from the selector.
-                key.cancel();
-                try {
-                    ((NIODispatcherThread)target).getSelector().selectNow();
-                } catch (IOException e) {
-                    if (DEBUG) {
-                        debug("Error in close", e);
-                    }
+        if (key != null && key.isValid()) {
+            // This will make sure that the key is removed
+            // from the selector.
+            key.cancel();
+            try {
+                sourceHandler.getSelector().selectNow();
+            } catch (IOException e) {
+                if (DEBUG) {
+                    debug("Error in close", e);
                 }
             }
+
+            if (cancelHandler != null) {
+                cancelHandler.run();
+            }
+        }
+    }
+
+    @Override
+    public void retain() {
+        throw new UnsupportedOperationException("Sources are retained until canceled");
+    }
+
+    @Override
+    public void release() {
+        throw new UnsupportedOperationException("Sources must be release via cancel");
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.dispatch.DispatchObject#resume()
+     */
+    public void resume() {
+        long c = suspendCounter.decrementAndGet();
+        if (c < 0) {
+            throw new IllegalStateException();
+        }
+        if (c == 0 && readyOps != 0) {
+            eventHandler.addToQueue();
         }
-        super.close(sync);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.dispatch.DispatchObject#suspend()
+     */
+    public void suspend() {
+        suspendCounter.incrementAndGet();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.dispatch.DispatchSource#cancel()
+     */
+    public void cancel() {
+        getTargetQueue().dispatchAsync(new Runnable() {
+            public void run() {
+                NIODispatchSource.super.release();
+            }
+        });
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.dispatch.DispatchSource#getMask()
+     */
+    public long getMask() {
+        return interestOps;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.dispatch.DispatchSource#isCanceled()
+     */
+    public boolean isCanceled() {
+        return isShutdown();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.activemq.dispatch.DispatchSource#setCancelHandler(java.lang
+     * .Runnable)
+     */
+    public void setCancelHandler(Runnable cancelHandler) {
+        this.cancelHandler = cancelHandler;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.activemq.dispatch.DispatchSource#setEventHandler(java.lang
+     * .Runnable)
+     */
+    public void setEventHandler(Runnable eventHandler) {
+        this.eventHandler.setUserHandler(eventHandler);
     }
 
     protected void debug(String str) {
@@ -214,4 +291,50 @@
             thrown.printStackTrace();
         }
     }
+
+    private class EventHandler implements Runnable {
+
+        private Runnable userHandler;
+        private AtomicBoolean queued = new AtomicBoolean(false);
+        private AtomicBoolean running = new AtomicBoolean(false);
+        
+        /*
+         * (non-Javadoc)
+         * 
+         * @see java.lang.Runnable#run()
+         */
+        public void run() {
+            running.set(true);
+            queued.set(false);
+            try {
+                userHandler.run();
+            } finally {
+                running.set(false);
+            }
+        }
+        
+        public void addToQueue()
+        {
+            if(!queued.compareAndSet(false, true))
+            {
+                getTargetQueue().dispatchAsync(this);
+            }
+        }
+        
+        public boolean isRunning()
+        {
+            return running.get();
+        }
+        
+        public boolean isQueued()
+        {
+            return queued.get();
+        }
+        
+        public void setUserHandler(Runnable userHandler)
+        {
+            this.userHandler = userHandler;
+        }
+    }
+
 }

Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIOSourceHandler.java (from r890887, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatcherThread.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIOSourceHandler.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIOSourceHandler.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatcherThread.java&r1=890887&r2=891410&rev=891410&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatcherThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIOSourceHandler.java Wed Dec 16 20:06:09 2009
@@ -22,19 +22,22 @@
 import java.nio.channels.Selector;
 import java.util.Iterator;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatcher;
 import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
 
-public class NIODispatcherThread extends DispatcherThread {
+public class NIOSourceHandler {
     private final boolean DEBUG = false;
 
     private final Selector selector;
+    private final DispatcherThread thread;
 
-    protected NIODispatcherThread(AdvancedDispatcher dispatcher, String name, int priorities) throws IOException {
-        super(dispatcher, name, priorities);
+    public NIOSourceHandler(DispatcherThread thread) throws IOException {
         this.selector = Selector.open();
+        this.thread = thread;
+    }
+
+    DispatcherThread getThread() {
+        return thread;
     }
 
     Selector getSelector() {
@@ -51,23 +54,6 @@
         }
     }
 
-    public SelectableDispatchContext registerSelectable(Runnable dispatchable, String name) {
-        return new SelectableDispatchContext(this, dispatchable, name);
-    }
-
-    /**
-     * Subclasses may override this to do do additional dispatch work:
-     * 
-     * @throws Exception
-     */
-    protected void dispatchHook() throws Exception {
-        doSelect(true);
-    }
-
-    protected void waitForEvents() throws Exception {
-        doSelect(false);
-    }
-
     /**
      * Subclasses may override this to provide an alternative wakeup mechanism.
      */
@@ -75,51 +61,28 @@
         selector.wakeup();
     }
 
-    private long lastSelect = System.nanoTime();
-    private long frequency = 50000000;
-
-    private void doSelect(boolean now) throws IOException {
+    /**
+     * Selects ready sources, potentially blocking. If wakeup is called during
+     * select the method will return.
+     * 
+     * @param timeout
+     *            A negative value cause the select to block until a source is
+     *            ready, 0 will do a non blocking select. Otherwise the select
+     *            will block up to timeout in milliseconds waiting for a source
+     *            to become ready.
+     * @throws IOException
+     */
+    public void doSelect(long timeout) throws IOException {
 
-        // Select what's ready now:
         try {
-            if (now) {
-                if (selector.keys().isEmpty()) {
-                    return;
-                }
-                // selector.selectNow();
-                // processSelected();
-
-                long time = System.nanoTime();
-                if (time - lastSelect > frequency) {
-                    selector.selectNow();
-                    lastSelect = time;
-
-                    int registered = selector.keys().size();
-                    int selected = selector.selectedKeys().size();
-                    if (selected == 0) {
-                        frequency += 1000000;
-                        if (DEBUG)
-                            debug(this + "Increased select frequency to " + frequency);
-                    } else if (selected > registered / 4) {
-                        frequency -= 1000000;
-                        if (DEBUG)
-                            debug(this + "Decreased select frequency to " + frequency);
-                    }
-                    processSelected();
-
-                }
+            if (timeout == -1) {
+                selector.select();
+            } else if (timeout > 0) {
+                selector.select(timeout);
             } else {
-                long next = timerHeap.timeToNext(TimeUnit.MILLISECONDS);
-                if (next == -1) {
-                    selector.select();
-                } else if (next > 0) {
-                    selector.select(next);
-                } else {
-                    selector.selectNow();
-                }
-                lastSelect = System.nanoTime();
-                processSelected();
+                selector.selectNow();
             }
+            processSelected();
 
         } catch (CancelledKeyException ignore) {
             // A key may have been canceled.
@@ -136,17 +99,17 @@
                 boolean done = false;
                 SelectionKey key = i.next();
                 if (key.isValid()) {
-                    SelectableDispatchContext context = (SelectableDispatchContext) key.attachment();
+                    NIODispatchSource source = (NIODispatchSource) key.attachment();
 
                     done = true;
                     try {
-                        done = context.onSelect();
+                        done = source.onSelect();
                     } catch (RuntimeException re) {
                         if (DEBUG)
-                            debug("Exception in " + context + " closing");
+                            debug("Exception in " + source + " canceling");
                         // If there is a Runtime error close the context:
                         // TODO better error handling here:
-                        context.close(false);
+                        source.cancel();
                     }
                 } else {
                     done = true;
@@ -160,6 +123,14 @@
         }
     }
 
+    public void shutdown() throws IOException {
+        for (SelectionKey key : selector.keys()) {
+            NIODispatchSource source = (NIODispatchSource) key.attachment();
+            source.cancel();
+        }
+        selector.close();
+    }
+
     private final void debug(String str) {
         System.out.println(this + ": " + str);
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java?rev=891410&r1=891409&r2=891410&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java Wed Dec 16 20:06:09 2009
@@ -135,8 +135,7 @@
             SimpleDispatcher.CURRENT_QUEUE.set(current);
         }
     }
-
-    @Override
+    
     public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
         dispatcher.timerThread.addRelative(runnable, this, delay, unit);
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorBenchmark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorBenchmark.java?rev=891410&r1=891409&r2=891410&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorBenchmark.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorBenchmark.java Wed Dec 16 20:06:09 2009
@@ -1,5 +1,7 @@
 package org.apache.activemq.actor;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.activemq.dispatch.internal.AbstractSerialDispatchQueue;
 import org.junit.Test;
 
@@ -7,18 +9,15 @@
 
 public class ActorBenchmark {
 
-    
-    public static class PizzaService implements IPizzaService
-    {
+    public static class PizzaService implements IPizzaService {
         long counter;
-        
+
         @Message
-        public void order(long count)
-        {
+        public void order(long count) {
             counter += count;
         }
     }
-    
+
     @Test
     public void benchmarkCGLibProxy() throws Exception {
         String name = "cglib proxy";
@@ -34,7 +33,7 @@
         IPizzaService proxy = new PizzaServiceCustomProxy(service, createQueue());
         benchmark(name, service, proxy);
     }
-    
+
     @Test
     public void benchmarkAsmProxy() throws Exception {
         String name = "asm proxy";
@@ -48,34 +47,39 @@
             public void dispatchAsync(Runnable runnable) {
                 runnable.run();
             }
+
+            public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
+                throw new RuntimeException("TODO: implement me.");
+                
+            }
+
         };
     }
 
     private void benchmark(String name, PizzaService service, IPizzaService proxy) throws Exception {
         // warm it up..
-        benchmark(proxy, 1000*1000);
-        if( service.counter == 0)
+        benchmark(proxy, 1000 * 1000);
+        if (service.counter == 0)
             throw new Exception();
-        
-        int iterations = 1000*1000*100;
-        
+
+        int iterations = 1000 * 1000 * 100;
+
         long start = System.nanoTime();
         benchmark(proxy, iterations);
         long end = System.nanoTime();
-        
-        if( service.counter == 0)
+
+        if (service.counter == 0)
             throw new Exception();
 
-        double durationMS = 1.0d*(end-start)/1000000d;
+        double durationMS = 1.0d * (end - start) / 1000000d;
         double rate = 1000d * iterations / durationMS;
         System.out.println(format("name: %s, duration: %,.3f ms, rate: %,.2f executions/sec", name, durationMS, rate));
     }
 
-
     private void benchmark(IPizzaService proxy, int iterations) {
-        for( int i=0; i < iterations; i++ ) {
+        for (int i = 0; i < iterations; i++) {
             proxy.order(1);
         }
     }
-    
+
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/AsmActorTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/AsmActorTest.java?rev=891410&r1=891409&r2=891410&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/AsmActorTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/AsmActorTest.java Wed Dec 16 20:06:09 2009
@@ -1,5 +1,7 @@
 package org.apache.activemq.actor;
 
+import java.util.concurrent.TimeUnit;
+
 import junit.framework.Assert;
 
 import org.apache.activemq.dispatch.internal.AbstractSerialDispatchQueue;
@@ -43,6 +45,10 @@
             public void dispatchAsync(Runnable runnable) {
                 runnable.run();
             }
+
+            public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
+                throw new UnsupportedOperationException("Not implemented");
+            }
         };
     }