You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/12/16 21:06:10 UTC
svn commit: r891410 - in
/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src:
main/java/org/apache/activemq/dispatch/
main/java/org/apache/activemq/dispatch/internal/
main/java/org/apache/activemq/dispatch/internal/advanced/ main/java/org/a...
Author: cmacnaug
Date: Wed Dec 16 20:06:09 2009
New Revision: 891410
URL: http://svn.apache.org/viewvc?rev=891410&view=rev
Log:
Some changes in support of an NIODispatchSource
Added:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java
- copied, changed from r890921, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/SelectableDispatchContext.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIOSourceHandler.java
- copied, changed from r890887, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatcherThread.java
Removed:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatcherThread.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/SelectableDispatchContext.java
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SerialDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorBenchmark.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/AsmActorTest.java
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java?rev=891410&r1=891409&r2=891410&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java Wed Dec 16 20:06:09 2009
@@ -21,7 +21,7 @@
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public interface DispatchSource extends DispatchObject {
-
+
public void cancel();
public boolean isCanceled();
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java?rev=891410&r1=891409&r2=891410&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java Wed Dec 16 20:06:09 2009
@@ -70,10 +70,6 @@
suspendCounter.incrementAndGet();
}
- public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
- throw new RuntimeException("TODO: implement me.");
- }
-
public void execute(Runnable command) {
dispatchAsync(command);
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java?rev=891410&r1=891409&r2=891410&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java Wed Dec 16 20:06:09 2009
@@ -5,7 +5,7 @@
public class BaseRetained {
- final protected AtomicInteger reatinCounter = new AtomicInteger(0);
+ final protected AtomicInteger retainCounter = new AtomicInteger(0);
final protected ArrayList<Runnable> shutdownHandlers = new ArrayList<Runnable>();
public void addShutdownWatcher(Runnable shutdownHandler) {
@@ -15,16 +15,21 @@
}
public void retain() {
- if( reatinCounter.getAndIncrement() == 0 ) {
+ if( retainCounter.getAndIncrement() == 0 ) {
startup();
}
}
public void release() {
- if( reatinCounter.decrementAndGet()==0 ) {
+ if( retainCounter.decrementAndGet()==0 ) {
shutdown();
}
}
+
+ protected boolean isShutdown()
+ {
+ return retainCounter.get() <= 0;
+ }
/**
* Subclasses should override if they want to do some startup processing.
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java?rev=891410&r1=891409&r2=891410&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java Wed Dec 16 20:06:09 2009
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.dispatch.internal.advanced;
+import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.util.ArrayList;
import java.util.concurrent.Executor;
@@ -31,15 +32,15 @@
import org.apache.activemq.dispatch.Dispatcher;
import org.apache.activemq.dispatch.DispatcherConfig;
import org.apache.activemq.dispatch.internal.BaseRetained;
+import org.apache.activemq.dispatch.internal.nio.NIODispatchSource;
import static org.apache.activemq.dispatch.DispatchPriority.*;
-
final public class AdvancedDispatcher extends BaseRetained implements Dispatcher {
public final static ThreadLocal<DispatchQueue> CURRENT_QUEUE = new ThreadLocal<DispatchQueue>();
- final SerialDispatchQueue mainQueue = new SerialDispatchQueue("main");
+ final SerialDispatchQueue mainQueue;
final GlobalDispatchQueue globalQueues[];
final AtomicLong globalQueuedRunnables = new AtomicLong();
@@ -54,6 +55,7 @@
public AdvancedDispatcher(DispatcherConfig config) {
this.size = config.getThreads();
this.numPriorities = 3;
+ this.mainQueue = new SerialDispatchQueue(this, "main");
globalQueues = new GlobalDispatchQueue[3];
for (int i = 0; i < 3; i++) {
globalQueues[i] = new GlobalDispatchQueue(this, DispatchPriority.values()[i]);
@@ -64,20 +66,26 @@
/**
* @see org.apache.activemq.dispatch.internal.advanced.DispatcherThread#start()
*/
- protected void startup() {
+ protected void startup() {
loadBalancer.start();
for (int i = 0; i < size; i++) {
- DispatcherThread dispatacher = new DispatcherThread(this, ("dispatcher -" + (i + 1)), numPriorities);
- dispatchers.add(dispatacher);
- dispatacher.start();
+ DispatcherThread dispatacher;
+ try {
+ dispatacher = new DispatcherThread(this, ("dispatcher -" + (i + 1)), numPriorities);
+ dispatchers.add(dispatacher);
+ dispatacher.start();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
}
-
+
protected void shutdown() {
Runnable countDown = new Runnable() {
AtomicInteger shutdownCountDown = new AtomicInteger(dispatchers.size());
+
public void run() {
- if( shutdownCountDown.decrementAndGet()==0 ) {
+ if (shutdownCountDown.decrementAndGet() == 0) {
// Notify any registered shutdown watchers.
AdvancedDispatcher.super.shutdown();
}
@@ -119,8 +127,7 @@
DispatcherThread d = DispatcherThread.CURRENT.get();
if (d == null) {
synchronized (dispatchers) {
- if(dispatchers.isEmpty())
- {
+ if (dispatchers.isEmpty()) {
throw new RejectedExecutionException();
}
if (++roundRobinCounter >= size) {
@@ -133,14 +140,14 @@
}
}
-// public DispatchContext register(Runnable runnable, String name) {
-// return chooseDispatcher().register(runnable, name);
-// }
-
- public int getSize() {
- return size;
- }
-
+ // public DispatchContext register(Runnable runnable, String name) {
+ // return chooseDispatcher().register(runnable, name);
+ // }
+
+ public int getSize() {
+ return size;
+ }
+
public final Executor createPriorityExecutor(final int priority) {
return new Executor() {
public void execute(final Runnable runnable) {
@@ -158,7 +165,7 @@
public void execute(Runnable command) {
chooseDispatcher().dispatch(command, 0);
}
-
+
public void execute(Runnable command, int priority) {
chooseDispatcher().dispatch(command, priority);
}
@@ -170,11 +177,11 @@
public void schedule(final Runnable runnable, int priority, long delay, TimeUnit timeUnit) {
chooseDispatcher().schedule(runnable, priority, delay, timeUnit);
}
-
+
public DispatchQueue getMainQueue() {
return mainQueue;
}
-
+
public DispatchQueue getGlobalQueue() {
return getGlobalQueue(DEFAULT);
}
@@ -182,19 +189,31 @@
public DispatchQueue getGlobalQueue(DispatchPriority priority) {
return globalQueues[priority.ordinal()];
}
-
+
public DispatchQueue createSerialQueue(String label, DispatchOption... options) {
- SerialDispatchQueue rc = new SerialDispatchQueue(label, options);
+ SerialDispatchQueue rc = new SerialDispatchQueue(this, label, options);
rc.setTargetQueue(getGlobalQueue());
return rc;
}
-
+
public void dispatchMain() {
mainQueue.run();
}
public DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue) {
- return null;
+ NIODispatchSource source = new NIODispatchSource();
+ try {
+ source.setChannel(channel);
+ } catch (ClosedChannelException e) {
+ e.printStackTrace();
+ }
+ source.setMask(interestOps);
+ //Dispatch Source must be sticky so that it sticks to it's thread's selector:
+ if (!(queue.getOptions().contains(DispatchOption.STICK_TO_CALLER_THREAD) || queue.getOptions().contains(DispatchOption.STICK_TO_CALLER_THREAD))) {
+ throw new IllegalStateException("Source dispatch queue must be sticky");
+ }
+ source.setTargetQueue(queue);
+ return source;
}
public DispatchQueue getCurrentQueue() {
@@ -203,10 +222,10 @@
public DispatchQueue getCurrentThreadQueue() {
DispatcherThread thread = DispatcherThread.CURRENT.get();
- if( thread==null ) {
+ if (thread == null) {
return null;
}
return thread.currentDispatchQueue;
- }
+ }
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedQueue.java?rev=891410&r1=891409&r2=891410&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedQueue.java Wed Dec 16 20:06:09 2009
@@ -1,5 +1,12 @@
package org.apache.activemq.dispatch.internal.advanced;
+import org.apache.activemq.dispatch.internal.simple.GlobalDispatchQueue;
+import org.apache.activemq.dispatch.internal.simple.SerialDispatchQueue;
+import org.apache.activemq.dispatch.internal.simple.ThreadDispatchQueue;
+
public interface AdvancedQueue {
+ SerialDispatchQueue isSerialDispatchQueue();
+ ThreadDispatchQueue isThreadDispatchQueue();
+ GlobalDispatchQueue isGlobalDispatchQueue();
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java?rev=891410&r1=891409&r2=891410&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java Wed Dec 16 20:06:09 2009
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.dispatch.internal.advanced;
+import java.io.IOException;
+import java.nio.channels.SelectableChannel;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.Executor;
@@ -24,6 +26,9 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.dispatch.DispatchPriority;
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.DispatchSource;
+import org.apache.activemq.dispatch.internal.nio.NIOSourceHandler;
import org.apache.activemq.util.Mapper;
import org.apache.activemq.util.PriorityLinkedList;
import org.apache.activemq.util.TimerHeap;
@@ -73,10 +78,13 @@
return element.listPrio;
}
};
+
+
+ private final NIOSourceHandler nioHandler;
- protected DispatcherThread(AdvancedDispatcher dispatcher, String name, int priorities) {
+ protected DispatcherThread(AdvancedDispatcher dispatcher, String name, int priorities) throws IOException {
this.name = name;
-
+ this.nioHandler = new NIOSourceHandler(this);
this.dispatchQueues = new ThreadDispatchQueue[3];
for (int i = 0; i < 3; i++) {
dispatchQueues[i] = new ThreadDispatchQueue(this, DispatchPriority.values()[i]);
@@ -133,6 +141,11 @@
if( onShutdown!=null ) {
onShutdown.run();
}
+ try {
+ nioHandler.shutdown();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
}
}, MAX_USER_PRIORITY + 1);
Thread rc = thread;
@@ -374,5 +387,4 @@
public String getName() {
return name;
}
-
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java?rev=891410&r1=891409&r2=891410&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java Wed Dec 16 20:06:09 2009
@@ -35,6 +35,7 @@
private final String label;
private final AdvancedDispatcher dispatcher;
private final DispatchPriority priority;
+ private Object context;
public GlobalDispatchQueue(AdvancedDispatcher dispatcher, DispatchPriority priority) {
this.dispatcher = dispatcher;
@@ -75,11 +76,11 @@
}
public <Context> Context getContext() {
- throw new UnsupportedOperationException();
+ return (Context) context;
}
public <Context> void setContext(Context context) {
- throw new UnsupportedOperationException();
+ this.context = context;
}
public void addShutdownWatcher(Runnable finalizer) {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SerialDispatchQueue.java?rev=891410&r1=891409&r2=891410&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SerialDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SerialDispatchQueue.java Wed Dec 16 20:06:09 2009
@@ -1,13 +1,17 @@
package org.apache.activemq.dispatch.internal.advanced;
+import java.util.concurrent.TimeUnit;
+
import org.apache.activemq.dispatch.DispatchOption;
import org.apache.activemq.dispatch.DispatchQueue;
import org.apache.activemq.dispatch.internal.AbstractSerialDispatchQueue;
public class SerialDispatchQueue extends AbstractSerialDispatchQueue {
- public SerialDispatchQueue(String label, DispatchOption...options) {
+ AdvancedDispatcher dispather;
+ public SerialDispatchQueue(AdvancedDispatcher dispather, String label, DispatchOption...options) {
super(label, options);
+ this.dispather = dispather;
// context = new DispatchContext(this, true, label);
}
@@ -28,5 +32,7 @@
}
}
-
+ public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
+ dispather.schedule(runnable, delay, unit);
+ }
}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java (from r890921, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/SelectableDispatchContext.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/SelectableDispatchContext.java&r1=890921&r2=891410&rev=891410&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/SelectableDispatchContext.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java Wed Dec 16 20:06:09 2009
@@ -11,9 +11,11 @@
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
-import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
+import org.apache.activemq.dispatch.DispatchSource;
+import org.apache.activemq.dispatch.internal.AbstractDispatchObject;
/**
* SelectableDispatchContext
@@ -24,144 +26,133 @@
* @author cmacnaug
* @version 1.0
*/
-class SelectableDispatchContext extends DispatchContext {
+public class NIODispatchSource extends AbstractDispatchObject implements DispatchSource {
public static final boolean DEBUG = false;
+ private final AtomicLong suspendCounter = new AtomicLong(1);
private SelectableChannel channel;
private SelectionKey key;
- private int updateInterests = -1;
+ private int interestOps = 0;
private int readyOps = 0;
+ private NIOSourceHandler sourceHandler;
+ private final EventHandler eventHandler = new EventHandler();
+ private Runnable cancelHandler;
- SelectableDispatchContext(DispatcherThread thread, Runnable runnable, String name) {
- super(thread, runnable, true, name);
-
+ public NIODispatchSource() {
+ super.retain();
}
-
/**
- * This can be called to set a channel on which the Dispatcher will
- * perform selection operations. The channel may be changed over time.
- *
- * This method may only be called from the provided {@link Dispatchable}
- * dispatch method, and is not thread safe.
+ * This can be called to set a channel on which the Dispatcher will perform
+ * selection operations. The channel may be changed over time, but only from
+ * the registered event handler.
*
- * @param channel The channel on which to select.
- * @throws ClosedChannelException If a closed chanel is provided
+ * @param channel
+ * The channel on which to select.
+ * @throws ClosedChannelException
+ * If a closed channel is provided
*/
public void setChannel(SelectableChannel channel) throws ClosedChannelException {
if (this.channel != channel) {
- if (isClosed()) {
+ if (super.isShutdown()) {
return;
}
- int interests = 0;
+ int interests = interestOps;
if (key != null) {
- interests = key.interestOps();
+ interests |= key.interestOps();
key.cancel();
key = null;
}
this.channel = channel;
if (channel != null) {
- updateInterestOps(interests);
+ setMask(interests);
}
}
}
/**
- * May be overriden by subclass to additional work on dispatcher switch
+ * Set's the handler for this source, this may only be called from the event
+ * handler
*
- * @param oldDispatcher
- * The old dispatcher
- * @param newDispatcher
- * The new Dispatcher
+ * @param newHandler
+ * The new handler for the source
*/
- protected void switchedDispatcher(DispatcherThread oldDispatcher, DispatcherThread newDispatcher) {
- if (DEBUG) {
- if (oldDispatcher == newDispatcher) {
- System.out.println(this + " switching to same dispatcher " + newDispatcher + Thread.currentThread());
- }
- }
- if (channel != null) {
- if (DEBUG)
- System.out.println(this + "Canceling key on dispatcher switch: " + oldDispatcher + newDispatcher);
+ protected void setHandler(NIOSourceHandler newHandler) {
- SelectionKey exising = channel.keyFor(((NIODispatcherThread)oldDispatcher).getSelector());
- if (exising != null) {
- updateInterests = exising.interestOps();
- exising.cancel();
- }
- }
- }
-
- public void processForeignUpdates() {
- synchronized (this) {
+ if (sourceHandler != newHandler) {
if (channel != null) {
- if (updateInterests > 0) {
- if (DEBUG)
- debug(this + "processing foreign update interests: " + updateInterests);
- updateInterestOps(updateInterests);
- updateInterests = -1;
+ if (DEBUG)
+ System.out.println(this + "Canceling key on source handler switch: " + sourceHandler + "-" + newHandler);
+
+ SelectionKey exising = channel.keyFor(sourceHandler.getSelector());
+ if (exising != null) {
+ interestOps = exising.interestOps();
+ exising.cancel();
}
}
+ } else if (DEBUG) {
+ if (this.sourceHandler == newHandler) {
+ System.out.println(this + " switching to new source handler " + sourceHandler + Thread.currentThread());
+ }
}
- super.processForeignUpdates();
}
/**
* This call updates the interest ops on which the dispatcher should select.
- * When an interest becomes ready, the dispatcher will call the {@link Dispatchable}'s
- * dispatch() method. At that time, {@link #readyOps()} can be called to see what
- * interests are now ready.
- *
- * This method may only be called from {@link Dispatchable#dispatch()} and is not
- * threadsafe. If the {@link Dispatchable} wishes to change it's interest op it
- * must call {@link #requestDispatch()} so that they can be changed from the dispatch()
- * method.
+ * When an interest becomes ready, the eventHandler for the source will be
+ * called. At that time the EventHandler can call {@link #getData()} to see
+ * which ops are ready.
+ *
+ * The masks may be changed over time, but it is only legal to do so from
+ * the supplied eventHandler.
*
- * @param interestOps The interest ops.
+ * @param interestOps
+ * The interest ops.
*/
- public void updateInterestOps(int ops) {
+ public void setMask(long ops) {
+
readyOps &= ~ops;
if (key != null && key.isValid()) {
- key.interestOps(key.interestOps() | ops);
+ key.interestOps(key.interestOps() | (int) ops);
} else {
- if (isClosed()) {
+
+ if (isShutdown() || suspendCounter.get() > 0) {
+ interestOps |= (int) ops;
return;
}
// Make sure that we don't already have an invalidated key for
// this selector. If we do then do a select to get rid of the
// key:
- SelectionKey existing = channel.keyFor( ((NIODispatcherThread)target).getSelector());
+ SelectionKey existing = channel.keyFor(sourceHandler.getSelector());
if (existing != null && !existing.isValid()) {
if (DEBUG)
- debug(this + " registering existing invalid key:" + target + Thread.currentThread());
+ debug(this + " registering existing invalid key:" + sourceHandler + Thread.currentThread());
try {
- ((NIODispatcherThread)target).getSelector().selectNow();
+ sourceHandler.getSelector().selectNow();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (DEBUG)
- System.out.println(this + " registering new key with interests: " + ops);
+ System.out.println(this + " registering new key with interests: " + interestOps);
try {
- key = channel.register( ((NIODispatcherThread)target).getSelector(), ops, this);
+ key = channel.register(sourceHandler.getSelector(), interestOps, this);
} catch (ClosedChannelException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ throw new IllegalStateException("Channel was closed", e);
}
}
}
/**
- * This call retrieves the operations that have become ready since the last call
- * to {@link #readyOps()}. Calling this method clears the ready ops.
+ * This call retrieves the operations that have become ready since the last
+ * call to {@link #readyOps()}. Calling this method clears the ready ops.
*
- * This method may only be called from {@link Dispatchable#dispatch()} and is not
- * threadsafe.
+ * It is only legal to call this method from the supplied eventHandler.
*
- * @return the readyOps.
+ * @return the readyOps.
*/
- public int readyOps() {
+ public long getData() {
return readyOps;
/*
* if (key == null || !key.isValid()) { return 0; } else { return
@@ -169,37 +160,123 @@
*/
}
- public boolean onSelect() {
+ final boolean onSelect() {
readyOps = key.readyOps();
key.interestOps(key.interestOps() & ~key.readyOps());
- synchronized (this) {
- if (!isLinked()) {
- target.execute(runnable, listPrio);
- }
+ if (suspendCounter.get() <= 0) {
+ eventHandler.addToQueue();
}
// System.out.println(this + "onSelect " + key.readyOps() + "/" +
- return true;
+ return key.interestOps() == 0;
}
- public void close(boolean sync) {
+ @Override
+ protected void shutdown() {
// actual close can only happen on the owning dispatch thread:
- if (target == DispatcherThread.CURRENT.get()) {
-
- if (key != null && key.isValid()) {
- // This will make sure that the key is removed
- // from the selector.
- key.cancel();
- try {
- ((NIODispatcherThread)target).getSelector().selectNow();
- } catch (IOException e) {
- if (DEBUG) {
- debug("Error in close", e);
- }
+ if (key != null && key.isValid()) {
+ // This will make sure that the key is removed
+ // from the selector.
+ key.cancel();
+ try {
+ sourceHandler.getSelector().selectNow();
+ } catch (IOException e) {
+ if (DEBUG) {
+ debug("Error in close", e);
}
}
+
+ if (cancelHandler != null) {
+ cancelHandler.run();
+ }
+ }
+ }
+
+ @Override
+ public void retain() {
+ throw new UnsupportedOperationException("Sources are retained until canceled");
+ }
+
+ @Override
+ public void release() {
+ throw new UnsupportedOperationException("Sources must be release via cancel");
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.dispatch.DispatchObject#resume()
+ */
+ public void resume() {
+ long c = suspendCounter.decrementAndGet();
+ if (c < 0) {
+ throw new IllegalStateException();
+ }
+ if (c == 0 && readyOps != 0) {
+ eventHandler.addToQueue();
}
- super.close(sync);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.dispatch.DispatchObject#suspend()
+ */
+ public void suspend() {
+ suspendCounter.incrementAndGet();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.dispatch.DispatchSource#cancel()
+ */
+ public void cancel() {
+ getTargetQueue().dispatchAsync(new Runnable() {
+ public void run() {
+ NIODispatchSource.super.release();
+ }
+ });
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.dispatch.DispatchSource#getMask()
+ */
+ public long getMask() {
+ return interestOps;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.dispatch.DispatchSource#isCanceled()
+ */
+ public boolean isCanceled() {
+ return isShutdown();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.dispatch.DispatchSource#setCancelHandler(java.lang
+ * .Runnable)
+ */
+ public void setCancelHandler(Runnable cancelHandler) {
+ this.cancelHandler = cancelHandler;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.dispatch.DispatchSource#setEventHandler(java.lang
+ * .Runnable)
+ */
+ public void setEventHandler(Runnable eventHandler) {
+ this.eventHandler.setUserHandler(eventHandler);
}
protected void debug(String str) {
@@ -214,4 +291,50 @@
thrown.printStackTrace();
}
}
+
+ private class EventHandler implements Runnable {
+
+ private Runnable userHandler;
+ private AtomicBoolean queued = new AtomicBoolean(false);
+ private AtomicBoolean running = new AtomicBoolean(false);
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ running.set(true);
+ queued.set(false);
+ try {
+ userHandler.run();
+ } finally {
+ running.set(false);
+ }
+ }
+
+ public void addToQueue()
+ {
+ if(!queued.compareAndSet(false, true))
+ {
+ getTargetQueue().dispatchAsync(this);
+ }
+ }
+
+ public boolean isRunning()
+ {
+ return running.get();
+ }
+
+ public boolean isQueued()
+ {
+ return queued.get();
+ }
+
+ public void setUserHandler(Runnable userHandler)
+ {
+ this.userHandler = userHandler;
+ }
+ }
+
}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIOSourceHandler.java (from r890887, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatcherThread.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIOSourceHandler.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIOSourceHandler.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatcherThread.java&r1=890887&r2=891410&rev=891410&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatcherThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIOSourceHandler.java Wed Dec 16 20:06:09 2009
@@ -22,19 +22,22 @@
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatcher;
import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
-public class NIODispatcherThread extends DispatcherThread {
+public class NIOSourceHandler {
private final boolean DEBUG = false;
private final Selector selector;
+ private final DispatcherThread thread;
- protected NIODispatcherThread(AdvancedDispatcher dispatcher, String name, int priorities) throws IOException {
- super(dispatcher, name, priorities);
+ public NIOSourceHandler(DispatcherThread thread) throws IOException {
this.selector = Selector.open();
+ this.thread = thread;
+ }
+
+ DispatcherThread getThread() {
+ return thread;
}
Selector getSelector() {
@@ -51,23 +54,6 @@
}
}
- public SelectableDispatchContext registerSelectable(Runnable dispatchable, String name) {
- return new SelectableDispatchContext(this, dispatchable, name);
- }
-
- /**
- * Subclasses may override this to do do additional dispatch work:
- *
- * @throws Exception
- */
- protected void dispatchHook() throws Exception {
- doSelect(true);
- }
-
- protected void waitForEvents() throws Exception {
- doSelect(false);
- }
-
/**
* Subclasses may override this to provide an alternative wakeup mechanism.
*/
@@ -75,51 +61,28 @@
selector.wakeup();
}
- private long lastSelect = System.nanoTime();
- private long frequency = 50000000;
-
- private void doSelect(boolean now) throws IOException {
+ /**
+ * Selects ready sources, potentially blocking. If wakeup is called during
+ * select the method will return.
+ *
+ * @param timeout
+ * A negative value cause the select to block until a source is
+ * ready, 0 will do a non blocking select. Otherwise the select
+ * will block up to timeout in milliseconds waiting for a source
+ * to become ready.
+ * @throws IOException
+ */
+ public void doSelect(long timeout) throws IOException {
- // Select what's ready now:
try {
- if (now) {
- if (selector.keys().isEmpty()) {
- return;
- }
- // selector.selectNow();
- // processSelected();
-
- long time = System.nanoTime();
- if (time - lastSelect > frequency) {
- selector.selectNow();
- lastSelect = time;
-
- int registered = selector.keys().size();
- int selected = selector.selectedKeys().size();
- if (selected == 0) {
- frequency += 1000000;
- if (DEBUG)
- debug(this + "Increased select frequency to " + frequency);
- } else if (selected > registered / 4) {
- frequency -= 1000000;
- if (DEBUG)
- debug(this + "Decreased select frequency to " + frequency);
- }
- processSelected();
-
- }
+ if (timeout == -1) {
+ selector.select();
+ } else if (timeout > 0) {
+ selector.select(timeout);
} else {
- long next = timerHeap.timeToNext(TimeUnit.MILLISECONDS);
- if (next == -1) {
- selector.select();
- } else if (next > 0) {
- selector.select(next);
- } else {
- selector.selectNow();
- }
- lastSelect = System.nanoTime();
- processSelected();
+ selector.selectNow();
}
+ processSelected();
} catch (CancelledKeyException ignore) {
// A key may have been canceled.
@@ -136,17 +99,17 @@
boolean done = false;
SelectionKey key = i.next();
if (key.isValid()) {
- SelectableDispatchContext context = (SelectableDispatchContext) key.attachment();
+ NIODispatchSource source = (NIODispatchSource) key.attachment();
done = true;
try {
- done = context.onSelect();
+ done = source.onSelect();
} catch (RuntimeException re) {
if (DEBUG)
- debug("Exception in " + context + " closing");
+ debug("Exception in " + source + " canceling");
// If there is a Runtime error close the context:
// TODO better error handling here:
- context.close(false);
+ source.cancel();
}
} else {
done = true;
@@ -160,6 +123,14 @@
}
}
+ public void shutdown() throws IOException {
+ for (SelectionKey key : selector.keys()) {
+ NIODispatchSource source = (NIODispatchSource) key.attachment();
+ source.cancel();
+ }
+ selector.close();
+ }
+
private final void debug(String str) {
System.out.println(this + ": " + str);
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java?rev=891410&r1=891409&r2=891410&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java Wed Dec 16 20:06:09 2009
@@ -135,8 +135,7 @@
SimpleDispatcher.CURRENT_QUEUE.set(current);
}
}
-
- @Override
+
public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
dispatcher.timerThread.addRelative(runnable, this, delay, unit);
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorBenchmark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorBenchmark.java?rev=891410&r1=891409&r2=891410&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorBenchmark.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorBenchmark.java Wed Dec 16 20:06:09 2009
@@ -1,5 +1,7 @@
package org.apache.activemq.actor;
+import java.util.concurrent.TimeUnit;
+
import org.apache.activemq.dispatch.internal.AbstractSerialDispatchQueue;
import org.junit.Test;
@@ -7,18 +9,15 @@
public class ActorBenchmark {
-
- public static class PizzaService implements IPizzaService
- {
+ public static class PizzaService implements IPizzaService {
long counter;
-
+
@Message
- public void order(long count)
- {
+ public void order(long count) {
counter += count;
}
}
-
+
@Test
public void benchmarkCGLibProxy() throws Exception {
String name = "cglib proxy";
@@ -34,7 +33,7 @@
IPizzaService proxy = new PizzaServiceCustomProxy(service, createQueue());
benchmark(name, service, proxy);
}
-
+
@Test
public void benchmarkAsmProxy() throws Exception {
String name = "asm proxy";
@@ -48,34 +47,39 @@
public void dispatchAsync(Runnable runnable) {
runnable.run();
}
+
+ public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
+ throw new RuntimeException("TODO: implement me.");
+
+ }
+
};
}
private void benchmark(String name, PizzaService service, IPizzaService proxy) throws Exception {
// warm it up..
- benchmark(proxy, 1000*1000);
- if( service.counter == 0)
+ benchmark(proxy, 1000 * 1000);
+ if (service.counter == 0)
throw new Exception();
-
- int iterations = 1000*1000*100;
-
+
+ int iterations = 1000 * 1000 * 100;
+
long start = System.nanoTime();
benchmark(proxy, iterations);
long end = System.nanoTime();
-
- if( service.counter == 0)
+
+ if (service.counter == 0)
throw new Exception();
- double durationMS = 1.0d*(end-start)/1000000d;
+ double durationMS = 1.0d * (end - start) / 1000000d;
double rate = 1000d * iterations / durationMS;
System.out.println(format("name: %s, duration: %,.3f ms, rate: %,.2f executions/sec", name, durationMS, rate));
}
-
private void benchmark(IPizzaService proxy, int iterations) {
- for( int i=0; i < iterations; i++ ) {
+ for (int i = 0; i < iterations; i++) {
proxy.order(1);
}
}
-
+
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/AsmActorTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/AsmActorTest.java?rev=891410&r1=891409&r2=891410&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/AsmActorTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/AsmActorTest.java Wed Dec 16 20:06:09 2009
@@ -1,5 +1,7 @@
package org.apache.activemq.actor;
+import java.util.concurrent.TimeUnit;
+
import junit.framework.Assert;
import org.apache.activemq.dispatch.internal.AbstractSerialDispatchQueue;
@@ -43,6 +45,10 @@
public void dispatchAsync(Runnable runnable) {
runnable.run();
}
+
+ public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
};
}