You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/12/21 19:45:33 UTC

svn commit: r892924 - in /activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src: main/java/org/apache/activemq/dispatch/ main/java/org/apache/activemq/dispatch/internal/ main/java/org/apache/activemq/dispatch/internal/advanced/ main/java/org/a...

Author: chirino
Date: Mon Dec 21 18:45:33 2009
New Revision: 892924

URL: http://svn.apache.org/viewvc?rev=892924&view=rev
Log:
Simper nio based source.


Added:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioDispatchSource.java
      - copied, changed from r892348, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioSelector.java
      - copied, changed from r892348, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIOSourceHandler.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/NioDispatchSoruceTest.java
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIOSourceHandler.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseSuspendable.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java?rev=892924&r1=892923&r2=892924&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java Mon Dec 21 18:45:33 2009
@@ -25,13 +25,7 @@
     public void cancel();
     public boolean isCanceled();
     
-    public long getData();
-    
-    public long getMask();
-    public void setMask(long mask);
-    
     public void setCancelHandler(Runnable cancelHandler);
     public void setEventHandler(Runnable eventHandler);
     
-    
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java?rev=892924&r1=892923&r2=892924&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java Mon Dec 21 18:45:33 2009
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.dispatch.internal;
 
-import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.activemq.dispatch.DispatchObject;
 import org.apache.activemq.dispatch.DispatchQueue;
 
@@ -28,26 +26,27 @@
 abstract public class AbstractDispatchObject extends BaseSuspendable implements DispatchObject {
 
     protected volatile Object context;
+
     protected volatile DispatchQueue targetQueue;
 
     @SuppressWarnings("unchecked")
     public <Context> Context getContext() {
-       assertRetained();
+        assertRetained();
         return (Context) context;
     }
-    
+
     public <Context> void setContext(Context context) {
-       assertRetained();
+        assertRetained();
         this.context = context;
     }
 
     public void setTargetQueue(DispatchQueue targetQueue) {
-       assertRetained();
+        assertRetained();
         this.targetQueue = targetQueue;
     }
 
     public DispatchQueue getTargetQueue() {
-       assertRetained();
+        assertRetained();
         return this.targetQueue;
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseSuspendable.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseSuspendable.java?rev=892924&r1=892923&r2=892924&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseSuspendable.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseSuspendable.java Mon Dec 21 18:45:33 2009
@@ -26,14 +26,14 @@
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 public class BaseSuspendable extends BaseRetainable implements Suspendable {
-    
+
     protected final AtomicBoolean startup = new AtomicBoolean(true);
     protected final AtomicInteger suspended = new AtomicInteger();
 
     public void resume() {
-       assertRetained();
-        if( suspended.decrementAndGet() == 0 ) {
-            if( startup.compareAndSet(true, false) ) {
+        assertRetained();
+        if (suspended.decrementAndGet() == 0) {
+            if (startup.compareAndSet(true, false)) {
                 onStartup();
             } else {
                 onResume();
@@ -42,8 +42,8 @@
     }
 
     public void suspend() {
-       assertRetained();
-        if( suspended.getAndIncrement()==0 ) {
+        assertRetained();
+        if (suspended.getAndIncrement() == 0) {
             onSuspend();
         }
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java?rev=892924&r1=892923&r2=892924&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java Mon Dec 21 18:45:33 2009
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.dispatch.internal.advanced;
 
-import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectableChannel;
 import java.util.ArrayList;
 import java.util.concurrent.Executor;
@@ -32,7 +31,7 @@
 import org.apache.activemq.dispatch.Dispatcher;
 import org.apache.activemq.dispatch.DispatcherConfig;
 import org.apache.activemq.dispatch.internal.BaseSuspendable;
-import org.apache.activemq.dispatch.internal.nio.NIODispatchSource;
+import org.apache.activemq.dispatch.internal.nio.NioDispatchSource;
 
 import static org.apache.activemq.dispatch.DispatchPriority.*;
 
@@ -212,17 +211,7 @@
     }
 
     public DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue) {
-        NIODispatchSource source = new NIODispatchSource();
-        try {
-            source.setChannel(channel);
-        } catch (ClosedChannelException e) {
-            e.printStackTrace();
-        }
-        source.setMask(interestOps);
-        //Dispatch Source must be sticky so that it sticks to it's thread's selector:
-        if (!(queue.getOptions().contains(DispatchOption.STICK_TO_CALLER_THREAD) || queue.getOptions().contains(DispatchOption.STICK_TO_CALLER_THREAD))) {
-            throw new IllegalStateException("Source dispatch queue must be sticky");
-        }
+        NioDispatchSource source = new NioDispatchSource(this, channel, interestOps);
         source.setTargetQueue(queue);
         return source;
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java?rev=892924&r1=892923&r2=892924&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java Mon Dec 21 18:45:33 2009
@@ -28,7 +28,7 @@
 import org.apache.activemq.dispatch.DispatchPriority;
 import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.DispatchSource;
-import org.apache.activemq.dispatch.internal.nio.NIOSourceHandler;
+import org.apache.activemq.dispatch.internal.nio.NioSelector;
 import org.apache.activemq.util.Mapper;
 import org.apache.activemq.util.PriorityLinkedList;
 import org.apache.activemq.util.TimerHeap;
@@ -80,11 +80,11 @@
     };
     
 
-    private final NIOSourceHandler nioHandler;
+    private final NioSelector nioHandler;
 
     protected DispatcherThread(AdvancedDispatcher dispatcher, String name, int priorities) throws IOException {
         this.name = name;
-        this.nioHandler = new NIOSourceHandler(this);
+        this.nioHandler = new NioSelector();
         this.dispatchQueues = new ThreadDispatchQueue[3];
         for (int i = 0; i < 3; i++) {
             dispatchQueues[i] = new ThreadDispatchQueue(this, DispatchPriority.values()[i]);

Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioDispatchSource.java (from r892348, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioDispatchSource.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioDispatchSource.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java&r1=892348&r2=892924&rev=892924&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioDispatchSource.java Mon Dec 21 18:45:33 2009
@@ -12,10 +12,15 @@
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.activemq.actor.ActorProxy;
+import org.apache.activemq.dispatch.DispatchOption;
+import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.DispatchSource;
-import org.apache.activemq.dispatch.internal.AbstractDispatchObject;
+import org.apache.activemq.dispatch.Dispatcher;
+import org.apache.activemq.dispatch.internal.BaseSuspendable;
+
+import static java.lang.String.*;
 
 /**
  * SelectableDispatchContext
@@ -26,314 +31,174 @@
  * @author cmacnaug
  * @version 1.0
  */
-public class NIODispatchSource extends AbstractDispatchObject implements DispatchSource {
-    public static final boolean DEBUG = false;
-    private final AtomicLong suspendCounter = new AtomicLong(1);
-    private SelectableChannel channel;
-    private SelectionKey key;
-    private int interestOps = 0;
-    private int readyOps = 0;
-    private NIOSourceHandler sourceHandler;
-    private final EventHandler eventHandler = new EventHandler();
-    private Runnable cancelHandler;
+final public class NioDispatchSource extends BaseSuspendable implements DispatchSource {
 
-    public NIODispatchSource() {
-    }
+    public static final boolean DEBUG = true;
 
-    /**
-     * This can be called to set a channel on which the Dispatcher will perform
-     * selection operations. The channel may be changed over time, but only from
-     * the registered event handler.
-     * 
-     * @param channel
-     *            The channel on which to select.
-     * @throws ClosedChannelException
-     *             If a closed channel is provided
-     */
-    public void setChannel(SelectableChannel channel) throws ClosedChannelException {
-        if (this.channel != channel) {
-            if (super.isShutdown()) {
-                return;
-            }
-            int interests = interestOps;
-            if (key != null) {
-                interests |= key.interestOps();
-                key.cancel();
-                key = null;
-            }
-            this.channel = channel;
-            if (channel != null) {
-                setMask(interests);
-            }
-        }
+    interface KeyActor {
+        public void register();
+        public void resume();
+        public void addInterest(int ops);
+        public void cancel();
+    }
+
+    private final SelectableChannel channel;
+    private final int interestOps;
+    private final DispatchQueue actorQueue;
+    private final KeyActor actor;
+    private final AtomicBoolean canceled = new AtomicBoolean();
+
+    private volatile DispatchQueue targetQueue;
+    private volatile Runnable cancelHandler;
+    private volatile Runnable eventHandler;
+    private volatile Object context;
+
+    public NioDispatchSource(Dispatcher dispatcher, SelectableChannel channel, int interestOps) {
+        if( interestOps == 0 ) {
+            throw new IllegalArgumentException("invalid interest ops");
+        }
+        this.channel = channel;
+        this.interestOps = interestOps;
+        this.suspended.incrementAndGet();
+        this.actorQueue = dispatcher.createSerialQueue(getClass().getName(), DispatchOption.STICK_TO_DISPATCH_THREAD);
+        this.actor = ActorProxy.create(KeyActor.class, new KeyActorImpl(), actorQueue);
     }
 
-    /**
-     * Set's the handler for this source, this may only be called from the event
-     * handler
-     * 
-     * @param newHandler
-     *            The new handler for the source
-     */
-    protected void setHandler(NIOSourceHandler newHandler) {
-
-        if (sourceHandler != newHandler) {
-            if (channel != null) {
-                if (DEBUG)
-                    System.out.println(this + "Canceling key on source handler switch: " + sourceHandler + "-" + newHandler);
-
-                SelectionKey exising = channel.keyFor(sourceHandler.getSelector());
-                if (exising != null) {
-                    interestOps = exising.interestOps();
-                    exising.cancel();
-                }
-            }
-        } else if (DEBUG) {
-            if (this.sourceHandler == newHandler) {
-                System.out.println(this + " switching to new source handler " + sourceHandler + Thread.currentThread());
-            }
-        }
+    @Override
+    protected void onStartup() {
+        actor.register();
     }
+    
 
     /**
-     * This call updates the interest ops on which the dispatcher should select.
-     * When an interest becomes ready, the eventHandler for the source will be
-     * called. At that time the EventHandler can call {@link #getData()} to see
-     * which ops are ready.
-     * 
-     * The masks may be changed over time, but it is only legal to do so from
-     * the supplied eventHandler.
-     * 
-     * @param interestOps
-     *            The interest ops.
+     * All operations on this object are serialized via a serial queue and actor proxy.
+     * Additional synchronization is not required. 
+     *  
+     * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
      */
-    public void setMask(long ops) {
-
-        readyOps &= ~ops;
-        if (key != null && key.isValid()) {
-            key.interestOps(key.interestOps() | (int) ops);
-        } else {
-
-            if (isShutdown() || suspendCounter.get() > 0) {
-                interestOps |= (int) ops;
-                return;
-            }
+    final class KeyActorImpl implements KeyActor {
+        
+        private SelectionKey key;
+        private int readyOps;
 
-            // Make sure that we don't already have an invalidated key for
-            // this selector. If we do then do a select to get rid of the
-            // key:
-            SelectionKey existing = channel.keyFor(sourceHandler.getSelector());
-            if (existing != null && !existing.isValid()) {
-                if (DEBUG)
-                    debug(this + " registering existing invalid key:" + sourceHandler + Thread.currentThread());
-                try {
-                    sourceHandler.getSelector().selectNow();
-                } catch (IOException e) {
-                    // TODO Auto-generated catch block
-                    e.printStackTrace();
-                }
-            }
-            if (DEBUG)
-                System.out.println(this + " registering new key with interests: " + interestOps);
+        public void register() {
+            NioSelector selector = NioSelector.CURRENT_SELECTOR.get();
             try {
-                key = channel.register(sourceHandler.getSelector(), interestOps, this);
+                key = channel.register(selector.getSelector(), interestOps);
+                key.attach(new Runnable() {
+                    public void run() {
+                        int ops = key.readyOps();
+                        debug("%s: selector found ready ops: %d", this, ops);
+                        readyOps |= ops;
+                        resume();
+                    }
+                });
             } catch (ClosedChannelException e) {
-                throw new IllegalStateException("Channel was closed", e);
+                debug(e, "could not register selector");
             }
         }
-    }
-
-    /**
-     * This call retrieves the operations that have become ready since the last
-     * call to {@link #readyOps()}. Calling this method clears the ready ops.
-     * 
-     * It is only legal to call this method from the supplied eventHandler.
-     * 
-     * @return the readyOps.
-     */
-    public long getData() {
-        return readyOps;
-        /*
-         * if (key == null || !key.isValid()) { return 0; } else { return
-         * key.readyOps(); }
-         */
-    }
-
-    final boolean onSelect() {
-        readyOps = key.readyOps();
-        key.interestOps(key.interestOps() & ~key.readyOps());
-        if (suspendCounter.get() <= 0) {
-            eventHandler.addToQueue();
+        
+        public void resume() {
+            if( readyOps!=0 && suspended.get() <= 0) {
+                final int dispatchedOps = readyOps;
+                readyOps = 0;
+                debug("%s: dispatching for ops: %d", this, dispatchedOps);
+                targetQueue.dispatchAsync(new Runnable() {
+                    public void run() {
+                        eventHandler.run();
+                        actor.addInterest(dispatchedOps);
+                    }
+                });
+            }
         }
-
-        // System.out.println(this + "onSelect " + key.readyOps() + "/" +
-        return key.interestOps() == 0;
-    }
-
-    @Override
-    protected void onShutdown() {
-        // actual close can only happen on the owning dispatch thread:
-        if (key != null && key.isValid()) {
-            // This will make sure that the key is removed
-            // from the selector.
-            key.cancel();
-            try {
-                sourceHandler.getSelector().selectNow();
-            } catch (IOException e) {
-                if (DEBUG) {
-                    debug("Error in close", e);
+        
+        public void addInterest(int ops) {
+            debug("%s: adding interest: %d", this, ops);
+            key.interestOps(key.interestOps()|ops);
+        }
+        
+        public void cancel() {
+            if (key != null && key.isValid()) {
+                debug("%s: canceling key: %s", this, key);
+                // This will make sure that the key is removed
+                // from the selector.
+                key.cancel();
+                try {
+                    NioSelector selector = NioSelector.CURRENT_SELECTOR.get();
+                    selector.getSelector().selectNow();
+                } catch (IOException e) {
+                    debug(e, "Error in close");
+                }
+                
+                if( cancelHandler!=null ) {
+                    cancelHandler.run();
                 }
-            }
-
-            if (cancelHandler != null) {
-                cancelHandler.run();
             }
         }
     }
 
     @Override
-    public void retain() {
-        throw new UnsupportedOperationException("Sources are retained until canceled");
+    protected void onResume() {
+        actor.resume();
     }
 
     @Override
-    public void release() {
-        throw new UnsupportedOperationException("Sources must be release via cancel");
+    protected void onShutdown() {
+        cancel();
+        this.actorQueue.release();
+        super.onShutdown();
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.dispatch.DispatchObject#resume()
-     */
-    public void resume() {
-        long c = suspendCounter.decrementAndGet();
-        if (c < 0) {
-            throw new IllegalStateException();
-        }
-        if (c == 0 && readyOps != 0) {
-            eventHandler.addToQueue();
+    public void cancel() {
+        if( canceled.compareAndSet(false, true) ) {
+            actor.cancel();
         }
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.dispatch.DispatchObject#suspend()
-     */
-    public void suspend() {
-        suspendCounter.incrementAndGet();
+    public boolean isCanceled() {
+        return canceled.get();
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.dispatch.DispatchSource#cancel()
-     */
-    public void cancel() {
-        getTargetQueue().dispatchAsync(new Runnable() {
-            public void run() {
-                NIODispatchSource.super.release();
-            }
-        });
+    public void setCancelHandler(Runnable cancelHandler) {
+        this.cancelHandler = cancelHandler;
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.dispatch.DispatchSource#getMask()
-     */
-    public long getMask() {
-        return interestOps;
+    public void setEventHandler(Runnable eventHandler) {
+        this.eventHandler = eventHandler;
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.dispatch.DispatchSource#isCanceled()
-     */
-    public boolean isCanceled() {
-        return isShutdown();
+    @SuppressWarnings("unchecked")
+    public <Context> Context getContext() {
+        return (Context) context;
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * org.apache.activemq.dispatch.DispatchSource#setCancelHandler(java.lang
-     * .Runnable)
-     */
-    public void setCancelHandler(Runnable cancelHandler) {
-        this.cancelHandler = cancelHandler;
+    public <Context> void setContext(Context context) {
+        this.context = context;
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * org.apache.activemq.dispatch.DispatchSource#setEventHandler(java.lang
-     * .Runnable)
-     */
-    public void setEventHandler(Runnable eventHandler) {
-        this.eventHandler.setUserHandler(eventHandler);
+    public void setTargetQueue(DispatchQueue targetQueue) {
+        this.targetQueue = targetQueue;
     }
 
-    protected void debug(String str) {
-        System.out.println(str);
+    public DispatchQueue getTargetQueue() {
+        return this.targetQueue;
     }
 
-    protected void debug(String str, Throwable thrown) {
-        if (str != null) {
-            System.out.println(str);
-        }
-        if (thrown != null) {
-            thrown.printStackTrace();
+    protected void debug(String str, Object... args) {
+        if (DEBUG) {
+            System.out.println(format(str, args));
         }
     }
 
-    private class EventHandler implements Runnable {
-
-        private Runnable userHandler;
-        private AtomicBoolean queued = new AtomicBoolean(false);
-        private AtomicBoolean running = new AtomicBoolean(false);
-        
-        /*
-         * (non-Javadoc)
-         * 
-         * @see java.lang.Runnable#run()
-         */
-        public void run() {
-            running.set(true);
-            queued.set(false);
-            try {
-                userHandler.run();
-            } finally {
-                running.set(false);
+    protected void debug(Throwable thrown, String str, Object... args) {
+        if (DEBUG) {
+            if (str != null) {
+                debug(str, args);
             }
-        }
-        
-        public void addToQueue()
-        {
-            if(!queued.compareAndSet(false, true))
-            {
-                getTargetQueue().dispatchAsync(this);
+            if (thrown != null) {
+                thrown.printStackTrace();
             }
         }
-        
-        public boolean isRunning()
-        {
-            return running.get();
-        }
-        
-        public boolean isQueued()
-        {
-            return queued.get();
-        }
-        
-        public void setUserHandler(Runnable userHandler)
-        {
-            this.userHandler = userHandler;
-        }
     }
 
 }

Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioSelector.java (from r892348, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIOSourceHandler.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioSelector.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioSelector.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIOSourceHandler.java&r1=892348&r2=892924&rev=892924&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIOSourceHandler.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioSelector.java Mon Dec 21 18:45:33 2009
@@ -23,21 +23,15 @@
 import java.util.Iterator;
 import java.util.Set;
 
-import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
-
-public class NIOSourceHandler {
+public class NioSelector {
+    
+    public final static ThreadLocal<NioSelector> CURRENT_SELECTOR = new ThreadLocal<NioSelector>();
+    
     private final boolean DEBUG = false;
-
     private final Selector selector;
-    private final DispatcherThread thread;
 
-    public NIOSourceHandler(DispatcherThread thread) throws IOException {
+    public NioSelector() throws IOException {
         this.selector = Selector.open();
-        this.thread = thread;
-    }
-
-    DispatcherThread getThread() {
-        return thread;
     }
 
     Selector getSelector() {
@@ -96,28 +90,12 @@
         Set<SelectionKey> selectedKeys = selector.selectedKeys();
         if (!selectedKeys.isEmpty()) {
             for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
-                boolean done = false;
                 SelectionKey key = i.next();
-                if (key.isValid()) {
-                    NIODispatchSource source = (NIODispatchSource) key.attachment();
-
-                    done = true;
-                    try {
-                        done = source.onSelect();
-                    } catch (RuntimeException re) {
-                        if (DEBUG)
-                            debug("Exception in " + source + " canceling");
-                        // If there is a Runtime error close the context:
-                        // TODO better error handling here:
-                        source.cancel();
-                    }
-                } else {
-                    done = true;
-                }
-
-                // If no more interests remove:
-                if (done) {
-                    i.remove();
+                boolean valid = key.isValid();
+                i.remove();
+                if (valid) {
+                    key.interestOps(key.interestOps() & ~key.readyOps());
+                    ((Runnable) key.attachment()).run();
                 }
             }
         }
@@ -125,7 +103,7 @@
 
     public void shutdown() throws IOException {
         for (SelectionKey key : selector.keys()) {
-            NIODispatchSource source = (NIODispatchSource) key.attachment();
+            NioDispatchSource source = (NioDispatchSource) key.attachment();
             source.cancel();
         }
         selector.close();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java?rev=892924&r1=892923&r2=892924&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java Mon Dec 21 18:45:33 2009
@@ -17,10 +17,13 @@
 
 package org.apache.activemq.dispatch.internal.simple;
 
+import java.io.IOException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.activemq.dispatch.internal.nio.NioSelector;
+
 /**
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -33,6 +36,7 @@
     final AtomicLong threadQueuedRunnables = new AtomicLong();
     final IntegerCounter executionCounter = new IntegerCounter();
     ThreadDispatchQueue currentThreadQueue;
+    private NioSelector selector;
 
     public DispatcherThread(SimpleDispatcher dispatcher, int ordinal) {
         this.dispatcher = dispatcher;
@@ -51,8 +55,23 @@
         int processGlobalQueueCount = PRIORITIES;
 
         try {
+            this.selector = new NioSelector();
+            NioSelector.CURRENT_SELECTOR.set(selector);
+        } catch (IOException e) {
+            e.printStackTrace();
+            return;
+        }
+        
+        try {
+            
             start: for (;;) {
 
+                try {
+                    this.selector.doSelect(0);
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+                
                 executionCounter.set(MAX_LOCAL_DISPATCH_BEFORE_CHECKING_GLOBAL);
 
                 // Process the local non-synchronized queues.
@@ -145,6 +164,13 @@
                 }
             }
         } catch (Shutdown e) {
+        } finally {
+            try {
+                selector.shutdown();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+            NioSelector.CURRENT_SELECTOR.remove();
         }
     }
 
@@ -174,6 +200,11 @@
     private final AtomicBoolean inWaitingList = new AtomicBoolean(false);
 
     private void waitForWakeup() throws InterruptedException {
+        try {
+            this.selector.doSelect(0);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
         while (threadQueuedRunnables.get() == 0 && dispatcher.globalQueuedRunnables.get() == 0) {
             if (!wakeups.tryAcquire()) {
                 if (inWaitingList.compareAndSet(false, true)) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java?rev=892924&r1=892923&r2=892924&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java Mon Dec 21 18:45:33 2009
@@ -29,6 +29,7 @@
 import org.apache.activemq.dispatch.DispatcherConfig;
 import org.apache.activemq.dispatch.internal.AbstractSerialDispatchQueue;
 import org.apache.activemq.dispatch.internal.BaseSuspendable;
+import org.apache.activemq.dispatch.internal.nio.NioDispatchSource;
 
 import static org.apache.activemq.dispatch.DispatchPriority.*;
 
@@ -58,7 +59,7 @@
             globalQueues[i] = new GlobalDispatchQueue(this, DispatchPriority.values()[i]);
         }
         dispatchers = new DispatcherThread[config.getThreads()];
-        super.suspend();
+        this.suspended.incrementAndGet();
     }
 
     public DispatchQueue getMainQueue() {
@@ -84,7 +85,9 @@
     }
 
     public DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue) {
-        return null;
+        NioDispatchSource source = new NioDispatchSource(this, channel, interestOps);
+        source.setTargetQueue(queue);
+        return source;
     }
 
     public boolean addWaitingDispatcher(DispatcherThread dispatcher) {
@@ -129,7 +132,6 @@
 
         Runnable countDown = new Runnable() {
             AtomicInteger shutdownCountDown = new AtomicInteger(dispatchers.length);
-
             public void run() {
                 if (shutdownCountDown.decrementAndGet() == 0) {
                     // Notify any registered shutdown watchers.

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/NioDispatchSoruceTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/NioDispatchSoruceTest.java?rev=892924&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/NioDispatchSoruceTest.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/NioDispatchSoruceTest.java Mon Dec 21 18:45:33 2009
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+import org.apache.activemq.dispatch.internal.RunnableCountDownLatch;
+import org.apache.activemq.dispatch.internal.simple.SimpleDispatcher;
+import org.junit.Test;
+
+import static java.util.concurrent.TimeUnit.*;
+import static junit.framework.Assert.*;
+
+public class NioDispatchSoruceTest {
+
+    @Test
+    public void test() throws IOException, InterruptedException {
+
+        // Create the nio server socket...
+        final ServerSocketChannel channel = ServerSocketChannel.open();
+        channel.configureBlocking(false);
+        channel.socket().bind(address("0.0.0.0", 0), 10);
+
+
+        // Get a dispatcher and queue..
+        SimpleDispatcher dispatcher = new SimpleDispatcher(new DispatcherConfig());
+        dispatcher.resume();
+        DispatchQueue accepts = dispatcher.createSerialQueue("test");
+        
+        // Create a source attached to the server socket to deal with new connectins..
+        DispatchSource source = dispatcher.createSource(channel, SelectionKey.OP_ACCEPT, accepts);
+        // All we do is just release a countdown latch...
+        RunnableCountDownLatch accepted = new RunnableCountDownLatch(1) {
+            @Override
+            public void run() {
+                try {
+                    SocketChannel socket = channel.accept();
+                    socket.close();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+                super.run();
+            }
+        };
+        source.setEventHandler(accepted);
+
+        // Connect to the server in a new thread.
+        new Thread("connect") {
+            public void run() {
+                try {
+                    Socket socket = new Socket();
+                    socket.connect(channel.socket().getLocalSocketAddress());
+                    socket.close();
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                }
+            }
+        }.start();
+        
+        // Events should not get delivered until the source is resumed.
+        assertFalse(accepted.await(1, SECONDS));
+        source.resume();
+        
+        // Count down latch should get released now.
+        assertTrue(accepted.await(1, SECONDS));
+        
+    }
+
+    static public InetSocketAddress address(String host, int port) throws UnknownHostException {
+        return new InetSocketAddress(ip(host), port);
+    }
+
+    static public InetAddress ip(String host) throws UnknownHostException {
+        return InetAddress.getByName(host);
+    }
+    
+}