You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/12/21 19:45:33 UTC
svn commit: r892924 - in
/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src:
main/java/org/apache/activemq/dispatch/
main/java/org/apache/activemq/dispatch/internal/
main/java/org/apache/activemq/dispatch/internal/advanced/ main/java/org/a...
Author: chirino
Date: Mon Dec 21 18:45:33 2009
New Revision: 892924
URL: http://svn.apache.org/viewvc?rev=892924&view=rev
Log:
Simper nio based source.
Added:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioDispatchSource.java
- copied, changed from r892348, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioSelector.java
- copied, changed from r892348, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIOSourceHandler.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/NioDispatchSoruceTest.java
Removed:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIOSourceHandler.java
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseSuspendable.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java?rev=892924&r1=892923&r2=892924&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java Mon Dec 21 18:45:33 2009
@@ -25,13 +25,7 @@
public void cancel();
public boolean isCanceled();
- public long getData();
-
- public long getMask();
- public void setMask(long mask);
-
public void setCancelHandler(Runnable cancelHandler);
public void setEventHandler(Runnable eventHandler);
-
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java?rev=892924&r1=892923&r2=892924&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java Mon Dec 21 18:45:33 2009
@@ -16,8 +16,6 @@
*/
package org.apache.activemq.dispatch.internal;
-import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.activemq.dispatch.DispatchObject;
import org.apache.activemq.dispatch.DispatchQueue;
@@ -28,26 +26,27 @@
abstract public class AbstractDispatchObject extends BaseSuspendable implements DispatchObject {
protected volatile Object context;
+
protected volatile DispatchQueue targetQueue;
@SuppressWarnings("unchecked")
public <Context> Context getContext() {
- assertRetained();
+ assertRetained();
return (Context) context;
}
-
+
public <Context> void setContext(Context context) {
- assertRetained();
+ assertRetained();
this.context = context;
}
public void setTargetQueue(DispatchQueue targetQueue) {
- assertRetained();
+ assertRetained();
this.targetQueue = targetQueue;
}
public DispatchQueue getTargetQueue() {
- assertRetained();
+ assertRetained();
return this.targetQueue;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseSuspendable.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseSuspendable.java?rev=892924&r1=892923&r2=892924&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseSuspendable.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseSuspendable.java Mon Dec 21 18:45:33 2009
@@ -26,14 +26,14 @@
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class BaseSuspendable extends BaseRetainable implements Suspendable {
-
+
protected final AtomicBoolean startup = new AtomicBoolean(true);
protected final AtomicInteger suspended = new AtomicInteger();
public void resume() {
- assertRetained();
- if( suspended.decrementAndGet() == 0 ) {
- if( startup.compareAndSet(true, false) ) {
+ assertRetained();
+ if (suspended.decrementAndGet() == 0) {
+ if (startup.compareAndSet(true, false)) {
onStartup();
} else {
onResume();
@@ -42,8 +42,8 @@
}
public void suspend() {
- assertRetained();
- if( suspended.getAndIncrement()==0 ) {
+ assertRetained();
+ if (suspended.getAndIncrement() == 0) {
onSuspend();
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java?rev=892924&r1=892923&r2=892924&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java Mon Dec 21 18:45:33 2009
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.dispatch.internal.advanced;
-import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.util.ArrayList;
import java.util.concurrent.Executor;
@@ -32,7 +31,7 @@
import org.apache.activemq.dispatch.Dispatcher;
import org.apache.activemq.dispatch.DispatcherConfig;
import org.apache.activemq.dispatch.internal.BaseSuspendable;
-import org.apache.activemq.dispatch.internal.nio.NIODispatchSource;
+import org.apache.activemq.dispatch.internal.nio.NioDispatchSource;
import static org.apache.activemq.dispatch.DispatchPriority.*;
@@ -212,17 +211,7 @@
}
public DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue) {
- NIODispatchSource source = new NIODispatchSource();
- try {
- source.setChannel(channel);
- } catch (ClosedChannelException e) {
- e.printStackTrace();
- }
- source.setMask(interestOps);
- //Dispatch Source must be sticky so that it sticks to it's thread's selector:
- if (!(queue.getOptions().contains(DispatchOption.STICK_TO_CALLER_THREAD) || queue.getOptions().contains(DispatchOption.STICK_TO_CALLER_THREAD))) {
- throw new IllegalStateException("Source dispatch queue must be sticky");
- }
+ NioDispatchSource source = new NioDispatchSource(this, channel, interestOps);
source.setTargetQueue(queue);
return source;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java?rev=892924&r1=892923&r2=892924&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java Mon Dec 21 18:45:33 2009
@@ -28,7 +28,7 @@
import org.apache.activemq.dispatch.DispatchPriority;
import org.apache.activemq.dispatch.DispatchQueue;
import org.apache.activemq.dispatch.DispatchSource;
-import org.apache.activemq.dispatch.internal.nio.NIOSourceHandler;
+import org.apache.activemq.dispatch.internal.nio.NioSelector;
import org.apache.activemq.util.Mapper;
import org.apache.activemq.util.PriorityLinkedList;
import org.apache.activemq.util.TimerHeap;
@@ -80,11 +80,11 @@
};
- private final NIOSourceHandler nioHandler;
+ private final NioSelector nioHandler;
protected DispatcherThread(AdvancedDispatcher dispatcher, String name, int priorities) throws IOException {
this.name = name;
- this.nioHandler = new NIOSourceHandler(this);
+ this.nioHandler = new NioSelector();
this.dispatchQueues = new ThreadDispatchQueue[3];
for (int i = 0; i < 3; i++) {
dispatchQueues[i] = new ThreadDispatchQueue(this, DispatchPriority.values()[i]);
Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioDispatchSource.java (from r892348, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioDispatchSource.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioDispatchSource.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java&r1=892348&r2=892924&rev=892924&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioDispatchSource.java Mon Dec 21 18:45:33 2009
@@ -12,10 +12,15 @@
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
+import org.apache.activemq.actor.ActorProxy;
+import org.apache.activemq.dispatch.DispatchOption;
+import org.apache.activemq.dispatch.DispatchQueue;
import org.apache.activemq.dispatch.DispatchSource;
-import org.apache.activemq.dispatch.internal.AbstractDispatchObject;
+import org.apache.activemq.dispatch.Dispatcher;
+import org.apache.activemq.dispatch.internal.BaseSuspendable;
+
+import static java.lang.String.*;
/**
* SelectableDispatchContext
@@ -26,314 +31,174 @@
* @author cmacnaug
* @version 1.0
*/
-public class NIODispatchSource extends AbstractDispatchObject implements DispatchSource {
- public static final boolean DEBUG = false;
- private final AtomicLong suspendCounter = new AtomicLong(1);
- private SelectableChannel channel;
- private SelectionKey key;
- private int interestOps = 0;
- private int readyOps = 0;
- private NIOSourceHandler sourceHandler;
- private final EventHandler eventHandler = new EventHandler();
- private Runnable cancelHandler;
+final public class NioDispatchSource extends BaseSuspendable implements DispatchSource {
- public NIODispatchSource() {
- }
+ public static final boolean DEBUG = true;
- /**
- * This can be called to set a channel on which the Dispatcher will perform
- * selection operations. The channel may be changed over time, but only from
- * the registered event handler.
- *
- * @param channel
- * The channel on which to select.
- * @throws ClosedChannelException
- * If a closed channel is provided
- */
- public void setChannel(SelectableChannel channel) throws ClosedChannelException {
- if (this.channel != channel) {
- if (super.isShutdown()) {
- return;
- }
- int interests = interestOps;
- if (key != null) {
- interests |= key.interestOps();
- key.cancel();
- key = null;
- }
- this.channel = channel;
- if (channel != null) {
- setMask(interests);
- }
- }
+ interface KeyActor {
+ public void register();
+ public void resume();
+ public void addInterest(int ops);
+ public void cancel();
+ }
+
+ private final SelectableChannel channel;
+ private final int interestOps;
+ private final DispatchQueue actorQueue;
+ private final KeyActor actor;
+ private final AtomicBoolean canceled = new AtomicBoolean();
+
+ private volatile DispatchQueue targetQueue;
+ private volatile Runnable cancelHandler;
+ private volatile Runnable eventHandler;
+ private volatile Object context;
+
+ public NioDispatchSource(Dispatcher dispatcher, SelectableChannel channel, int interestOps) {
+ if( interestOps == 0 ) {
+ throw new IllegalArgumentException("invalid interest ops");
+ }
+ this.channel = channel;
+ this.interestOps = interestOps;
+ this.suspended.incrementAndGet();
+ this.actorQueue = dispatcher.createSerialQueue(getClass().getName(), DispatchOption.STICK_TO_DISPATCH_THREAD);
+ this.actor = ActorProxy.create(KeyActor.class, new KeyActorImpl(), actorQueue);
}
- /**
- * Set's the handler for this source, this may only be called from the event
- * handler
- *
- * @param newHandler
- * The new handler for the source
- */
- protected void setHandler(NIOSourceHandler newHandler) {
-
- if (sourceHandler != newHandler) {
- if (channel != null) {
- if (DEBUG)
- System.out.println(this + "Canceling key on source handler switch: " + sourceHandler + "-" + newHandler);
-
- SelectionKey exising = channel.keyFor(sourceHandler.getSelector());
- if (exising != null) {
- interestOps = exising.interestOps();
- exising.cancel();
- }
- }
- } else if (DEBUG) {
- if (this.sourceHandler == newHandler) {
- System.out.println(this + " switching to new source handler " + sourceHandler + Thread.currentThread());
- }
- }
+ @Override
+ protected void onStartup() {
+ actor.register();
}
+
/**
- * This call updates the interest ops on which the dispatcher should select.
- * When an interest becomes ready, the eventHandler for the source will be
- * called. At that time the EventHandler can call {@link #getData()} to see
- * which ops are ready.
- *
- * The masks may be changed over time, but it is only legal to do so from
- * the supplied eventHandler.
- *
- * @param interestOps
- * The interest ops.
+ * All operations on this object are serialized via a serial queue and actor proxy.
+ * Additional synchronization is not required.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
- public void setMask(long ops) {
-
- readyOps &= ~ops;
- if (key != null && key.isValid()) {
- key.interestOps(key.interestOps() | (int) ops);
- } else {
-
- if (isShutdown() || suspendCounter.get() > 0) {
- interestOps |= (int) ops;
- return;
- }
+ final class KeyActorImpl implements KeyActor {
+
+ private SelectionKey key;
+ private int readyOps;
- // Make sure that we don't already have an invalidated key for
- // this selector. If we do then do a select to get rid of the
- // key:
- SelectionKey existing = channel.keyFor(sourceHandler.getSelector());
- if (existing != null && !existing.isValid()) {
- if (DEBUG)
- debug(this + " registering existing invalid key:" + sourceHandler + Thread.currentThread());
- try {
- sourceHandler.getSelector().selectNow();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- if (DEBUG)
- System.out.println(this + " registering new key with interests: " + interestOps);
+ public void register() {
+ NioSelector selector = NioSelector.CURRENT_SELECTOR.get();
try {
- key = channel.register(sourceHandler.getSelector(), interestOps, this);
+ key = channel.register(selector.getSelector(), interestOps);
+ key.attach(new Runnable() {
+ public void run() {
+ int ops = key.readyOps();
+ debug("%s: selector found ready ops: %d", this, ops);
+ readyOps |= ops;
+ resume();
+ }
+ });
} catch (ClosedChannelException e) {
- throw new IllegalStateException("Channel was closed", e);
+ debug(e, "could not register selector");
}
}
- }
-
- /**
- * This call retrieves the operations that have become ready since the last
- * call to {@link #readyOps()}. Calling this method clears the ready ops.
- *
- * It is only legal to call this method from the supplied eventHandler.
- *
- * @return the readyOps.
- */
- public long getData() {
- return readyOps;
- /*
- * if (key == null || !key.isValid()) { return 0; } else { return
- * key.readyOps(); }
- */
- }
-
- final boolean onSelect() {
- readyOps = key.readyOps();
- key.interestOps(key.interestOps() & ~key.readyOps());
- if (suspendCounter.get() <= 0) {
- eventHandler.addToQueue();
+
+ public void resume() {
+ if( readyOps!=0 && suspended.get() <= 0) {
+ final int dispatchedOps = readyOps;
+ readyOps = 0;
+ debug("%s: dispatching for ops: %d", this, dispatchedOps);
+ targetQueue.dispatchAsync(new Runnable() {
+ public void run() {
+ eventHandler.run();
+ actor.addInterest(dispatchedOps);
+ }
+ });
+ }
}
-
- // System.out.println(this + "onSelect " + key.readyOps() + "/" +
- return key.interestOps() == 0;
- }
-
- @Override
- protected void onShutdown() {
- // actual close can only happen on the owning dispatch thread:
- if (key != null && key.isValid()) {
- // This will make sure that the key is removed
- // from the selector.
- key.cancel();
- try {
- sourceHandler.getSelector().selectNow();
- } catch (IOException e) {
- if (DEBUG) {
- debug("Error in close", e);
+
+ public void addInterest(int ops) {
+ debug("%s: adding interest: %d", this, ops);
+ key.interestOps(key.interestOps()|ops);
+ }
+
+ public void cancel() {
+ if (key != null && key.isValid()) {
+ debug("%s: canceling key: %s", this, key);
+ // This will make sure that the key is removed
+ // from the selector.
+ key.cancel();
+ try {
+ NioSelector selector = NioSelector.CURRENT_SELECTOR.get();
+ selector.getSelector().selectNow();
+ } catch (IOException e) {
+ debug(e, "Error in close");
+ }
+
+ if( cancelHandler!=null ) {
+ cancelHandler.run();
}
- }
-
- if (cancelHandler != null) {
- cancelHandler.run();
}
}
}
@Override
- public void retain() {
- throw new UnsupportedOperationException("Sources are retained until canceled");
+ protected void onResume() {
+ actor.resume();
}
@Override
- public void release() {
- throw new UnsupportedOperationException("Sources must be release via cancel");
+ protected void onShutdown() {
+ cancel();
+ this.actorQueue.release();
+ super.onShutdown();
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.dispatch.DispatchObject#resume()
- */
- public void resume() {
- long c = suspendCounter.decrementAndGet();
- if (c < 0) {
- throw new IllegalStateException();
- }
- if (c == 0 && readyOps != 0) {
- eventHandler.addToQueue();
+ public void cancel() {
+ if( canceled.compareAndSet(false, true) ) {
+ actor.cancel();
}
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.dispatch.DispatchObject#suspend()
- */
- public void suspend() {
- suspendCounter.incrementAndGet();
+ public boolean isCanceled() {
+ return canceled.get();
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.dispatch.DispatchSource#cancel()
- */
- public void cancel() {
- getTargetQueue().dispatchAsync(new Runnable() {
- public void run() {
- NIODispatchSource.super.release();
- }
- });
+ public void setCancelHandler(Runnable cancelHandler) {
+ this.cancelHandler = cancelHandler;
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.dispatch.DispatchSource#getMask()
- */
- public long getMask() {
- return interestOps;
+ public void setEventHandler(Runnable eventHandler) {
+ this.eventHandler = eventHandler;
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.dispatch.DispatchSource#isCanceled()
- */
- public boolean isCanceled() {
- return isShutdown();
+ @SuppressWarnings("unchecked")
+ public <Context> Context getContext() {
+ return (Context) context;
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.activemq.dispatch.DispatchSource#setCancelHandler(java.lang
- * .Runnable)
- */
- public void setCancelHandler(Runnable cancelHandler) {
- this.cancelHandler = cancelHandler;
+ public <Context> void setContext(Context context) {
+ this.context = context;
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.activemq.dispatch.DispatchSource#setEventHandler(java.lang
- * .Runnable)
- */
- public void setEventHandler(Runnable eventHandler) {
- this.eventHandler.setUserHandler(eventHandler);
+ public void setTargetQueue(DispatchQueue targetQueue) {
+ this.targetQueue = targetQueue;
}
- protected void debug(String str) {
- System.out.println(str);
+ public DispatchQueue getTargetQueue() {
+ return this.targetQueue;
}
- protected void debug(String str, Throwable thrown) {
- if (str != null) {
- System.out.println(str);
- }
- if (thrown != null) {
- thrown.printStackTrace();
+ protected void debug(String str, Object... args) {
+ if (DEBUG) {
+ System.out.println(format(str, args));
}
}
- private class EventHandler implements Runnable {
-
- private Runnable userHandler;
- private AtomicBoolean queued = new AtomicBoolean(false);
- private AtomicBoolean running = new AtomicBoolean(false);
-
- /*
- * (non-Javadoc)
- *
- * @see java.lang.Runnable#run()
- */
- public void run() {
- running.set(true);
- queued.set(false);
- try {
- userHandler.run();
- } finally {
- running.set(false);
+ protected void debug(Throwable thrown, String str, Object... args) {
+ if (DEBUG) {
+ if (str != null) {
+ debug(str, args);
}
- }
-
- public void addToQueue()
- {
- if(!queued.compareAndSet(false, true))
- {
- getTargetQueue().dispatchAsync(this);
+ if (thrown != null) {
+ thrown.printStackTrace();
}
}
-
- public boolean isRunning()
- {
- return running.get();
- }
-
- public boolean isQueued()
- {
- return queued.get();
- }
-
- public void setUserHandler(Runnable userHandler)
- {
- this.userHandler = userHandler;
- }
}
}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioSelector.java (from r892348, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIOSourceHandler.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioSelector.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioSelector.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIOSourceHandler.java&r1=892348&r2=892924&rev=892924&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIOSourceHandler.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioSelector.java Mon Dec 21 18:45:33 2009
@@ -23,21 +23,15 @@
import java.util.Iterator;
import java.util.Set;
-import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
-
-public class NIOSourceHandler {
+public class NioSelector {
+
+ public final static ThreadLocal<NioSelector> CURRENT_SELECTOR = new ThreadLocal<NioSelector>();
+
private final boolean DEBUG = false;
-
private final Selector selector;
- private final DispatcherThread thread;
- public NIOSourceHandler(DispatcherThread thread) throws IOException {
+ public NioSelector() throws IOException {
this.selector = Selector.open();
- this.thread = thread;
- }
-
- DispatcherThread getThread() {
- return thread;
}
Selector getSelector() {
@@ -96,28 +90,12 @@
Set<SelectionKey> selectedKeys = selector.selectedKeys();
if (!selectedKeys.isEmpty()) {
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
- boolean done = false;
SelectionKey key = i.next();
- if (key.isValid()) {
- NIODispatchSource source = (NIODispatchSource) key.attachment();
-
- done = true;
- try {
- done = source.onSelect();
- } catch (RuntimeException re) {
- if (DEBUG)
- debug("Exception in " + source + " canceling");
- // If there is a Runtime error close the context:
- // TODO better error handling here:
- source.cancel();
- }
- } else {
- done = true;
- }
-
- // If no more interests remove:
- if (done) {
- i.remove();
+ boolean valid = key.isValid();
+ i.remove();
+ if (valid) {
+ key.interestOps(key.interestOps() & ~key.readyOps());
+ ((Runnable) key.attachment()).run();
}
}
}
@@ -125,7 +103,7 @@
public void shutdown() throws IOException {
for (SelectionKey key : selector.keys()) {
- NIODispatchSource source = (NIODispatchSource) key.attachment();
+ NioDispatchSource source = (NioDispatchSource) key.attachment();
source.cancel();
}
selector.close();
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java?rev=892924&r1=892923&r2=892924&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java Mon Dec 21 18:45:33 2009
@@ -17,10 +17,13 @@
package org.apache.activemq.dispatch.internal.simple;
+import java.io.IOException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.activemq.dispatch.internal.nio.NioSelector;
+
/**
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -33,6 +36,7 @@
final AtomicLong threadQueuedRunnables = new AtomicLong();
final IntegerCounter executionCounter = new IntegerCounter();
ThreadDispatchQueue currentThreadQueue;
+ private NioSelector selector;
public DispatcherThread(SimpleDispatcher dispatcher, int ordinal) {
this.dispatcher = dispatcher;
@@ -51,8 +55,23 @@
int processGlobalQueueCount = PRIORITIES;
try {
+ this.selector = new NioSelector();
+ NioSelector.CURRENT_SELECTOR.set(selector);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return;
+ }
+
+ try {
+
start: for (;;) {
+ try {
+ this.selector.doSelect(0);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
executionCounter.set(MAX_LOCAL_DISPATCH_BEFORE_CHECKING_GLOBAL);
// Process the local non-synchronized queues.
@@ -145,6 +164,13 @@
}
}
} catch (Shutdown e) {
+ } finally {
+ try {
+ selector.shutdown();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ NioSelector.CURRENT_SELECTOR.remove();
}
}
@@ -174,6 +200,11 @@
private final AtomicBoolean inWaitingList = new AtomicBoolean(false);
private void waitForWakeup() throws InterruptedException {
+ try {
+ this.selector.doSelect(0);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
while (threadQueuedRunnables.get() == 0 && dispatcher.globalQueuedRunnables.get() == 0) {
if (!wakeups.tryAcquire()) {
if (inWaitingList.compareAndSet(false, true)) {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java?rev=892924&r1=892923&r2=892924&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java Mon Dec 21 18:45:33 2009
@@ -29,6 +29,7 @@
import org.apache.activemq.dispatch.DispatcherConfig;
import org.apache.activemq.dispatch.internal.AbstractSerialDispatchQueue;
import org.apache.activemq.dispatch.internal.BaseSuspendable;
+import org.apache.activemq.dispatch.internal.nio.NioDispatchSource;
import static org.apache.activemq.dispatch.DispatchPriority.*;
@@ -58,7 +59,7 @@
globalQueues[i] = new GlobalDispatchQueue(this, DispatchPriority.values()[i]);
}
dispatchers = new DispatcherThread[config.getThreads()];
- super.suspend();
+ this.suspended.incrementAndGet();
}
public DispatchQueue getMainQueue() {
@@ -84,7 +85,9 @@
}
public DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue) {
- return null;
+ NioDispatchSource source = new NioDispatchSource(this, channel, interestOps);
+ source.setTargetQueue(queue);
+ return source;
}
public boolean addWaitingDispatcher(DispatcherThread dispatcher) {
@@ -129,7 +132,6 @@
Runnable countDown = new Runnable() {
AtomicInteger shutdownCountDown = new AtomicInteger(dispatchers.length);
-
public void run() {
if (shutdownCountDown.decrementAndGet() == 0) {
// Notify any registered shutdown watchers.
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/NioDispatchSoruceTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/NioDispatchSoruceTest.java?rev=892924&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/NioDispatchSoruceTest.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/NioDispatchSoruceTest.java Mon Dec 21 18:45:33 2009
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+import org.apache.activemq.dispatch.internal.RunnableCountDownLatch;
+import org.apache.activemq.dispatch.internal.simple.SimpleDispatcher;
+import org.junit.Test;
+
+import static java.util.concurrent.TimeUnit.*;
+import static junit.framework.Assert.*;
+
+public class NioDispatchSoruceTest {
+
+ @Test
+ public void test() throws IOException, InterruptedException {
+
+ // Create the nio server socket...
+ final ServerSocketChannel channel = ServerSocketChannel.open();
+ channel.configureBlocking(false);
+ channel.socket().bind(address("0.0.0.0", 0), 10);
+
+
+ // Get a dispatcher and queue..
+ SimpleDispatcher dispatcher = new SimpleDispatcher(new DispatcherConfig());
+ dispatcher.resume();
+ DispatchQueue accepts = dispatcher.createSerialQueue("test");
+
+ // Create a source attached to the server socket to deal with new connectins..
+ DispatchSource source = dispatcher.createSource(channel, SelectionKey.OP_ACCEPT, accepts);
+ // All we do is just release a countdown latch...
+ RunnableCountDownLatch accepted = new RunnableCountDownLatch(1) {
+ @Override
+ public void run() {
+ try {
+ SocketChannel socket = channel.accept();
+ socket.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ super.run();
+ }
+ };
+ source.setEventHandler(accepted);
+
+ // Connect to the server in a new thread.
+ new Thread("connect") {
+ public void run() {
+ try {
+ Socket socket = new Socket();
+ socket.connect(channel.socket().getLocalSocketAddress());
+ socket.close();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ }.start();
+
+ // Events should not get delivered until the source is resumed.
+ assertFalse(accepted.await(1, SECONDS));
+ source.resume();
+
+ // Count down latch should get released now.
+ assertTrue(accepted.await(1, SECONDS));
+
+ }
+
+ static public InetSocketAddress address(String host, int port) throws UnknownHostException {
+ return new InetSocketAddress(ip(host), port);
+ }
+
+ static public InetAddress ip(String host) throws UnknownHostException {
+ return InetAddress.getByName(host);
+ }
+
+}