You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/02/20 15:23:26 UTC
svn commit: r746251 - in /activemq/sandbox/activemq-flow/src:
main/java/org/apache/activemq/dispatch/ test/java/org/apache/activemq/flow/
Author: chirino
Date: Fri Feb 20 14:23:26 2009
New Revision: 746251
URL: http://svn.apache.org/viewvc?rev=746251&view=rev
Log:
Applying Colin's patch https://issues.apache.org/activemq/browse/AMQ-2129 Thanks!
Added:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/TimerHeap.java
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java?rev=746251&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java Fri Feb 20 14:23:26 2009
@@ -0,0 +1,158 @@
+package org.apache.activemq.dispatch;
+
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+public abstract class AbstractPooledDispatcher<D extends IDispatcher> implements IDispatcher, PooledDispatcher<D> {
+
+ private final String name;
+
+ private final ThreadLocal<D> dispatcher = new ThreadLocal<D>();
+ private final ThreadLocal<PooledDispatchContext<D>> dispatcherContext = new ThreadLocal<PooledDispatchContext<D>>();
+ private final ArrayList<D> dispatchers = new ArrayList<D>();
+
+ final AtomicBoolean started = new AtomicBoolean();
+ final AtomicBoolean shutdown = new AtomicBoolean();
+
+ private int roundRobinCounter = 0;
+ private final int size;
+
+ protected ExecutionLoadBalancer<D> loadBalancer;
+
+ protected AbstractPooledDispatcher(String name, int size) {
+ this.name = name;
+ this.size = size;
+ loadBalancer = new SimpleLoadBalancer<D>();
+ }
+
+ /**
+ * Subclasses should implement this to return a new dispatcher.
+ *
+ * @param name
+ * The name to assign the dispatcher.
+ * @param pool
+ * The pool.
+ * @return The new dispathcer.
+ */
+ protected abstract D createDispatcher(String name, AbstractPooledDispatcher<D> pool) throws Exception;
+
+ /**
+ * @see org.apache.activemq.dispatch.IDispatcher#start()
+ */
+ public synchronized final void start() throws Exception {
+ loadBalancer.start();
+ if (started.compareAndSet(false, true)) {
+ // Create all the workers.
+ try {
+ for (int i = 0; i < size; i++) {
+ D dispatacher = createDispatcher(name + "-" + (i + 1), this);
+
+ dispatchers.add(dispatacher);
+ dispatacher.start();
+ }
+ } catch (Exception e) {
+ shutdown();
+ }
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
+ */
+ public synchronized final void shutdown() throws InterruptedException {
+ shutdown.set(true);
+ boolean interrupted = false;
+ while (!dispatchers.isEmpty()) {
+ try {
+ dispatchers.get(dispatchers.size() - 1).shutdown();
+ } catch (InterruptedException ie) {
+ interrupted = true;
+ continue;
+ }
+ dispatchers.remove(dispatchers.size() - 1);
+
+ }
+ // Re-interrupt:
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+
+ loadBalancer.stop();
+ }
+
+ public void setCurrentDispatchContext(PooledDispatchContext<D> context) {
+ dispatcherContext.set(context);
+ }
+
+ public PooledDispatchContext<D> getCurrentDispatchContext() {
+ return dispatcherContext.get();
+ }
+
+ /**
+ * Returns the currently executing dispatcher, or null if the current thread
+ * is not a dispatcher:
+ *
+ * @return The currently executing dispatcher
+ */
+ public D getCurrentDispatcher() {
+ return dispatcher.get();
+ }
+
+ /**
+ * A Dispatcher must call this to indicate that is has started it's dispatch
+ * loop.
+ */
+ public void onDispatcherStarted(D d) {
+ dispatcher.set(d);
+ loadBalancer.addDispatcher(d);
+ }
+
+ public ExecutionLoadBalancer<D> getLoadBalancer() {
+ return loadBalancer;
+ }
+
+ /**
+ * A Dispatcher must call this when exiting it's dispatch loop
+ */
+ public void onDispatcherStopped(D d) {
+ loadBalancer.removeDispatcher(d);
+ }
+
+ protected D chooseDispatcher() {
+ D d = dispatcher.get();
+ if (d == null) {
+ synchronized (dispatchers) {
+ if (++roundRobinCounter >= size) {
+ roundRobinCounter = 0;
+ }
+ return dispatchers.get(roundRobinCounter);
+ }
+ } else {
+ return d;
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable,
+ * long, java.util.concurrent.TimeUnit)
+ */
+ public void schedule(final Runnable runnable, long delay, TimeUnit timeUnit) {
+ chooseDispatcher().schedule(runnable, delay, timeUnit);
+ }
+
+ public DispatchContext register(Dispatchable dispatchable, String name) {
+ return chooseDispatcher().register(dispatchable, name);
+ }
+
+ public String toString() {
+ return name;
+ }
+
+}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java?rev=746251&r1=746250&r2=746251&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java Fri Feb 20 14:23:26 2009
@@ -16,24 +16,23 @@
*/
package org.apache.activemq.dispatch;
-import org.apache.activemq.dispatch.PooledDispatcher.PoolableDispatcher;
import org.apache.activemq.dispatch.PooledDispatcher.PooledDispatchContext;
-public interface ExecutionLoadBalancer {
+public interface ExecutionLoadBalancer<D extends IDispatcher> {
- public interface ExecutionTracker {
- public void onDispatchRequest(PoolableDispatcher caller, PooledDispatchContext context);
+ public interface ExecutionTracker<D extends IDispatcher> {
+ public void onDispatchRequest(D caller, PooledDispatchContext<D> context);
- public void close();
- }
+ public void close();
+ }
- public void addDispatcher(PoolableDispatcher dispatcher);
+ public void addDispatcher(D dispatcher);
- public void removeDispatcher(PoolableDispatcher dispatcher);
+ public void removeDispatcher(D dispatcher);
- public ExecutionTracker createExecutionTracker(PooledDispatchContext context);
+ public ExecutionTracker<D> createExecutionTracker(PooledDispatchContext<D> context);
- public void start();
+ public void start();
- public void stop();
+ public void stop();
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java?rev=746251&r1=746250&r2=746251&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java Fri Feb 20 14:23:26 2009
@@ -79,10 +79,10 @@
public void close();
}
- class RunnableAdapter implements Dispatchable {
+ public class RunnableAdapter implements Dispatchable {
final Runnable runnable;
- RunnableAdapter(Runnable runnable) {
+ public RunnableAdapter(Runnable runnable) {
this.runnable = runnable;
}
@@ -117,7 +117,7 @@
/**
* Starts the dispatcher.
*/
- public void start();
+ public void start() throws Exception;
/**
* Shuts down the dispatcher, this may result in previous dispatch requests
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java?rev=746251&r1=746250&r2=746251&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java Fri Feb 20 14:23:26 2009
@@ -16,86 +16,66 @@
*/
package org.apache.activemq.dispatch;
+import org.apache.activemq.dispatch.ExecutionLoadBalancer.ExecutionTracker;
import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
-public interface PooledDispatcher {
+public interface PooledDispatcher<D extends IDispatcher> {
- /**
- * A {@link PooledDispatchContext}s can be moved between different
- * dispatchers.
- */
- public interface PooledDispatchContext extends DispatchContext {
- /**
- * Called to transfer a {@link PooledDispatchContext} to a new
- * Dispatcher.
- */
- public void assignToNewDispatcher(PoolableDispatcher newDispatcher);
-
- /**
- * A dispatcher must call this when it starts dispatch for this context
- */
- public void startingDispatch();
-
- /**
- * A dispatcher must call this when it has finished dispatching a
- * context
- */
- public void finishedDispatch();
-
- /**
- * Called by the dispatch thread to let the pooled context set any info
- * set by other threads.
- */
- public void processForeignUpdates();
- }
-
- public interface PoolableDispatchContext extends DispatchContext {
-
- public void setPooledDispatchContext(PooledDispatchContext context);
-
- /**
- * Indicates that another thread has made an update to the dispatch
- * context.
- *
- */
- public void onForeignThreadUpdate();
-
- public PoolableDispatcher getDispatcher();
- }
-
- /**
- * A PoolableDispatcher is one that can be owned by an
- * {@link PooledDispatcher}.
- */
- public interface PoolableDispatcher extends IDispatcher {
-
- /**
- * Indicates that another thread has made an update to the dispatch
- * context.
- *
- */
- public PoolableDispatchContext createPoolableDispatchContext(Dispatchable dispatchable, String name);
- }
-
- /**
- * This wraps the dispatch context into one that is load balanced by the
- * LoadBalancer
- *
- * @param context
- * The context to wrap.
- * @return
- */
- public PooledDispatchContext createPooledDispatchContext(PoolableDispatchContext context);
-
- /**
- * A Dispatcher must call this from it's dispatcher thread to indicate that
- * is has started it's dispatch has started.
- */
- public void onDispatcherStarted(PoolableDispatcher dispatcher);
-
- /**
- * A Dispatcher must call this from it's dispatcher thread when exiting it's
- * dispatch loop
- */
- public void onDispatcherStopped(PoolableDispatcher dispatcher);
+ /**
+ * A {@link PooledDispatchContext}s can be moved between different
+ * dispatchers.
+ */
+ public interface PooledDispatchContext<D extends IDispatcher> extends DispatchContext {
+ /**
+ * Called to transfer a {@link PooledDispatchContext} to a new
+ * Dispatcher.
+ */
+ public void assignToNewDispatcher(D newDispatcher);
+
+ /**
+ * Gets the dispatcher to which this PooledDispatchContext currently
+ * belongs
+ *
+ * @return
+ */
+ public D getDispatcher();
+
+ /**
+ * Gets the execution tracker for the context.
+ *
+ * @return the execution tracker for the context:
+ */
+ public ExecutionTracker<D> getExecutionTracker();
+ }
+
+ /**
+ * A Dispatcher must call this from it's dispatcher thread to indicate that
+ * is has started it's dispatch has started.
+ */
+ public void onDispatcherStarted(D dispatcher);
+
+ /**
+ * A Dispatcher must call this from it's dispatcher thread when exiting it's
+ * dispatch loop
+ */
+ public void onDispatcherStopped(D dispatcher);
+
+ /**
+ * Returns the currently executing dispatcher, or null if the current thread
+ * is not a dispatcher:
+ *
+ * @return The currently executing dispatcher
+ */
+ public D getCurrentDispatcher();
+
+ public void setCurrentDispatchContext(PooledDispatchContext<D> context);
+
+ public PooledDispatchContext<D> getCurrentDispatchContext();
+
+ /**
+ * Returns the load balancer for this dispatch pool.
+ *
+ * @return
+ */
+ public ExecutionLoadBalancer<D> getLoadBalancer();
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java?rev=746251&r1=746250&r2=746251&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java Fri Feb 20 14:23:26 2009
@@ -16,449 +16,564 @@
*/
package org.apache.activemq.dispatch;
-import java.util.LinkedList;
-import java.util.TreeMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.activemq.dispatch.PooledDispatcher.PoolableDispatchContext;
-import org.apache.activemq.dispatch.PooledDispatcher.PoolableDispatcher;
+import org.apache.activemq.dispatch.ExecutionLoadBalancer.ExecutionTracker;
import org.apache.activemq.dispatch.PooledDispatcher.PooledDispatchContext;
import org.apache.activemq.queue.Mapper;
import org.apache.kahadb.util.LinkedNode;
import org.apache.kahadb.util.LinkedNodeList;
-public class PriorityDispatcher implements Runnable, PoolableDispatcher {
+public class PriorityDispatcher<D extends PriorityDispatcher<D>> implements Runnable, IDispatcher {
- private Thread thread;
- private boolean running = false;
- private boolean threaded = false;
- private final int MAX_USER_PRIORITY;
-
- static final ThreadLocal<PriorityDispatcher> dispatcher = new ThreadLocal<PriorityDispatcher>();
-
- private final PooledDispatcher pooledDispatcher;
-
- // The local dispatch queue:
- private final PriorityLinkedList<PriorityDispatchContext> priorityQueue;
-
- // Dispatch queue for requests from other threads:
- private final LinkedNodeList<ForeignEvent>[] foreignQueue;
- private static final int[] TOGGLE = new int[] { 1, 0 };
- private int foreignToggle = 0;
-
- // Timed Execution List
- private final TimerHeap timerHeap = new TimerHeap();
-
- private final String name;
- private final AtomicBoolean foreignAvailable = new AtomicBoolean(false);
- private final Semaphore foreignPermits = new Semaphore(0);
-
- private final Mapper<Integer, PriorityDispatchContext> PRIORITY_MAPPER = new Mapper<Integer, PriorityDispatchContext>() {
- public Integer map(PriorityDispatchContext element) {
- return element.listPrio;
- }
- };
-
- public PriorityDispatcher(String name, int priorities, PooledDispatcher pooledDispactcher) {
- this.name = name;
- MAX_USER_PRIORITY = priorities;
- priorityQueue = new PriorityLinkedList<PriorityDispatchContext>(MAX_USER_PRIORITY + 1, PRIORITY_MAPPER);
- foreignQueue = createForeignEventQueue();
- for (int i = 0; i < 2; i++) {
- foreignQueue[i] = new LinkedNodeList<ForeignEvent>();
- }
- this.pooledDispatcher = pooledDispactcher;
- }
-
- @SuppressWarnings("unchecked")
- private LinkedNodeList<ForeignEvent>[] createForeignEventQueue() {
- return new LinkedNodeList[2];
- }
-
- private abstract class ForeignEvent extends LinkedNode<ForeignEvent> {
- public abstract void execute();
-
- final void addToList() {
- synchronized (foreignQueue) {
- if (!this.isLinked()) {
- foreignQueue[foreignToggle].addLast(this);
- if (!foreignAvailable.getAndSet(true)) {
- foreignPermits.release();
- }
- }
- }
- }
- }
-
- public boolean isThreaded() {
- return threaded;
- }
-
- public void setThreaded(boolean threaded) {
- this.threaded = threaded;
- }
-
- private class UpdateEvent extends ForeignEvent {
- private final PriorityDispatchContext pdc;
-
- UpdateEvent(PriorityDispatchContext pdc) {
- this.pdc = pdc;
- }
-
- // Can only be called by the owner of this dispatch context:
- public void execute() {
- pdc.poolContext.processForeignUpdates();
- }
- }
-
- class PriorityDispatchContext extends LinkedNode<PriorityDispatchContext> implements PoolableDispatchContext {
- // The dispatchable target:
- final Dispatchable dispatchable;
- PooledDispatchContext poolContext;
- // The name of this context:
- final String name;
- // list prio can only be updated in the thread of of this dispatcher:
- int listPrio;
- // The update events are used to update fields in the dispatch context
- // from foreign threads:
- final UpdateEvent updateEvent[] = new UpdateEvent[] { new UpdateEvent(this), new UpdateEvent(this) };
-
- private PriorityDispatchContext(Dispatchable dispatchable, boolean persistent, String name) {
- super();
- this.dispatchable = dispatchable;
- this.name = name;
- }
-
- // This can only be called on this thread
- public final void requestDispatch() {
- if (!isLinked()) {
- priorityQueue.add(this, listPrio);
- }
- return;
- }
-
- // This can only be called on this thread
- public final void updatePriority(int priority) {
- if (priority != listPrio) {
-
- listPrio = priority;
- // If there is a priority change relink the context
- // at the new priority:
- if (isLinked()) {
- unlink();
- priorityQueue.add(this, listPrio);
- }
- }
- return;
-
- }
-
- public void onForeignThreadUpdate() {
- synchronized (foreignQueue) {
- updateEvent[foreignToggle].addToList();
- }
- }
-
- // This can only be called on this thread
- public void close() {
- if (isLinked()) {
- unlink();
- }
- synchronized (foreignQueue) {
- if (updateEvent[foreignToggle].isLinked()) {
- updateEvent[foreignToggle].unlink();
- }
- }
- }
-
- /**
- * This can only be called by the owning dispatch thread:
- *
- * @return False if the dispatchable has more work to do.
- */
- public final boolean dispatch() {
- return dispatchable.dispatch();
- }
-
- public String toString() {
- return name;
- }
-
- public Dispatchable getDispatchable() {
- return dispatchable;
- }
-
- public void setPooledDispatchContext(PooledDispatchContext context) {
- this.poolContext = context;
- }
-
- public String getName() {
- return name;
- }
-
- public PoolableDispatcher getDispatcher() {
- return PriorityDispatcher.this;
- }
- }
-
- public DispatchContext register(Dispatchable dispatchable, String name) {
- return createPoolableDispatchContext(dispatchable, name);
- }
-
- public PoolableDispatchContext createPoolableDispatchContext(Dispatchable dispatchable, String name) {
- return new PriorityDispatchContext(dispatchable, true, name);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.dispatch.IDispatcher#start()
- */
- public synchronized final void start() {
- if (thread == null) {
- running = true;
- thread = new Thread(this, name);
- thread.start();
- }
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
- */
- public synchronized final void shutdown() throws InterruptedException {
- if (thread != null) {
- dispatch(new RunnableAdapter(new Runnable() {
-
- public void run() {
- running = false;
- }
-
- }), MAX_USER_PRIORITY + 1);
- thread.interrupt();
- thread.join();
- thread = null;
- }
- }
-
- public void run() {
-
- // Inform the dispatcher that we have started:
- pooledDispatcher.onDispatcherStarted(this);
- dispatcher.set(this);
- PriorityDispatchContext pdc;
- try {
- while (running) {
- pdc = priorityQueue.poll();
- // If no local work available wait for foreign work:
- if (pdc == null) {
- foreignPermits.acquire();
- } else {
- pdc.poolContext.startingDispatch();
-
- while (!pdc.dispatch()) {
- // If there is a higher priority dispatchable stop
- // processing this one:
- if (pdc.listPrio < priorityQueue.getHighestPriority()) {
- // May have gotten relinked by the caller:
- if (!pdc.isLinked()) {
- priorityQueue.add(pdc, pdc.listPrio);
- }
- break;
- }
- }
-
- pdc.poolContext.finishedDispatch();
-
- }
-
- // Execute delayed events:
- timerHeap.executeReadyEvents();
-
- // Check for foreign dispatch requests:
- if (foreignAvailable.get()) {
- LinkedNodeList<ForeignEvent> foreign;
- synchronized (foreignQueue) {
- // Swap foreign queues and drain permits;
- foreign = foreignQueue[foreignToggle];
- foreignToggle = TOGGLE[foreignToggle];
- foreignAvailable.set(false);
- foreignPermits.drainPermits();
- }
- while (true) {
- ForeignEvent fe = foreign.getHead();
- if (fe == null) {
- break;
- }
-
- fe.unlink();
- fe.execute();
- }
-
- }
- }
- } catch (InterruptedException e) {
- return;
- } catch (Throwable thrown) {
- thrown.printStackTrace();
- } finally {
- pooledDispatcher.onDispatcherStopped(this);
- }
- }
-
- class ThreadSafeDispatchContext implements PooledDispatchContext {
- final PriorityDispatchContext delegate;
-
- ThreadSafeDispatchContext(PriorityDispatchContext context) {
- this.delegate = context;
- delegate.setPooledDispatchContext(this);
- }
-
- public void finishedDispatch() {
- // NOOP
-
- }
-
- public void startingDispatch() {
- // Noop
-
- }
-
- public void close() {
- // Noop this is always transient:
- }
-
- public void processForeignUpdates() {
- requestDispatch();
- }
-
- public Dispatchable getDispatchable() {
- return delegate.getDispatchable();
- }
-
- public void requestDispatch() {
- if (dispatcher.get() == PriorityDispatcher.this) {
- delegate.requestDispatch();
- } else {
- delegate.onForeignThreadUpdate();
- }
- }
-
- public void updatePriority(int priority) {
- throw new UnsupportedOperationException("Not implemented");
- }
-
- public String getName() {
- return delegate.name;
- }
-
- public void assignToNewDispatcher(PoolableDispatcher newDispatcher) {
- }
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.dispatch.IDispatcher#dispatch(org.apache.activemq
- * .dispatch.Dispatcher.Dispatchable)
- */
- final void dispatch(Dispatchable dispatchable, int priority) {
- ThreadSafeDispatchContext context = new ThreadSafeDispatchContext(new PriorityDispatchContext(dispatchable, false, name));
- context.delegate.updatePriority(priority);
- context.requestDispatch();
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.dispatch.IDispatcher#createPriorityExecutor(int)
- */
- public Executor createPriorityExecutor(final int priority) {
-
- return new Executor() {
-
- public void execute(final Runnable runnable) {
- dispatch(new RunnableAdapter(runnable), priority);
- }
- };
- }
-
- public void execute(final Runnable runnable) {
- dispatch(new RunnableAdapter(runnable), 0);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable,
- * long, java.util.concurrent.TimeUnit)
- */
- public void schedule(final Runnable runnable, final long delay, final TimeUnit timeUnit) {
- if (dispatcher.get() == this) {
- timerHeap.add(runnable, delay, timeUnit);
- } else {
- new ForeignEvent() {
- public void execute() {
- timerHeap.add(runnable, delay, timeUnit);
- }
- }.addToList();
- }
- }
-
- public String toString() {
- return name;
- }
-
- private class TimerHeap {
-
- final TreeMap<Long, LinkedList<Runnable>> timers = new TreeMap<Long, LinkedList<Runnable>>();
-
- private void add(Runnable runnable, long delay, TimeUnit timeUnit) {
-
- long nanoDelay = timeUnit.convert(delay, TimeUnit.NANOSECONDS);
- long eTime = System.nanoTime() + nanoDelay;
- LinkedList<Runnable> list = new LinkedList<Runnable>();
- list.add(runnable);
-
- LinkedList<Runnable> old = timers.put(eTime, list);
- if (old != null) {
- list.addAll(old);
- }
- }
-
- private void executeReadyEvents() {
- LinkedList<Runnable> ready = null;
- if (timers.isEmpty()) {
- return;
- } else {
- long now = System.nanoTime();
- long first = timers.firstKey();
- if (first > now) {
- return;
- }
- ready = new LinkedList<Runnable>();
-
- while (first < now) {
- ready.addAll(timers.remove(first));
- if (timers.isEmpty()) {
- break;
- }
- first = timers.firstKey();
-
- }
- }
-
- for (Runnable runnable : ready) {
- try {
- runnable.run();
- } catch (Throwable thrown) {
- thrown.printStackTrace();
- }
- }
- }
- }
+ private static final boolean DEBUG = false;
+ private Thread thread;
+ protected boolean running = false;
+ private boolean threaded = false;
+ private final int MAX_USER_PRIORITY;
+
+ // Set if this dispatcher is part of a dispatch pool:
+ protected final PooledDispatcher<D> pooledDispatcher;
+
+ // The local dispatch queue:
+ private final PriorityLinkedList<PriorityDispatchContext> priorityQueue;
+
+ // Dispatch queue for requests from other threads:
+ private final LinkedNodeList<ForeignEvent>[] foreignQueue;
+ private static final int[] TOGGLE = new int[] { 1, 0 };
+ private int foreignToggle = 0;
+
+ // Timed Execution List
+ protected final TimerHeap timerHeap = new TimerHeap();
+
+ private final String name;
+ private final AtomicBoolean foreignAvailable = new AtomicBoolean(false);
+ private final Semaphore foreignPermits = new Semaphore(0);
+
+ private final Mapper<Integer, PriorityDispatchContext> PRIORITY_MAPPER = new Mapper<Integer, PriorityDispatchContext>() {
+ public Integer map(PriorityDispatchContext element) {
+ return element.listPrio;
+ }
+ };
+
+ protected PriorityDispatcher(String name, int priorities, PooledDispatcher<D> pooledDispactcher) {
+ this.name = name;
+ MAX_USER_PRIORITY = priorities;
+ priorityQueue = new PriorityLinkedList<PriorityDispatchContext>(MAX_USER_PRIORITY + 1, PRIORITY_MAPPER);
+ foreignQueue = createForeignEventQueue();
+ for (int i = 0; i < 2; i++) {
+ foreignQueue[i] = new LinkedNodeList<ForeignEvent>();
+ }
+ this.pooledDispatcher = pooledDispactcher;
+ }
+
+ public static final IDispatcher createPriorityDispatcher(String name, int numPriorities) {
+ return new PriorityDispatcher(name, numPriorities, null);
+ }
+
+ public static final IDispatcher createPriorityDispatchPool(String name, final int numPriorities, int size) {
+ return new AbstractPooledDispatcher<PriorityDispatcher>(name, size) {
+
+ @Override
+ protected final PriorityDispatcher createDispatcher(String name, AbstractPooledDispatcher<PriorityDispatcher> pool) throws Exception {
+ // TODO Auto-generated method stub
+ return new PriorityDispatcher(name, numPriorities, this);
+ }
+
+ public final Executor createPriorityExecutor(final int priority) {
+ return new Executor() {
+ public void execute(final Runnable runnable) {
+ chooseDispatcher().dispatch(new RunnableAdapter(runnable), priority);
+ }
+ };
+ }
+ };
+ }
+
+ @SuppressWarnings("unchecked")
+ private LinkedNodeList<ForeignEvent>[] createForeignEventQueue() {
+ return new LinkedNodeList[2];
+ }
+
+ protected abstract class ForeignEvent extends LinkedNode<ForeignEvent> {
+ public abstract void execute();
+
+ final void addToList() {
+ synchronized (foreignQueue) {
+ if (!this.isLinked()) {
+ foreignQueue[foreignToggle].addLast(this);
+ if (!foreignAvailable.getAndSet(true)) {
+ wakeup();
+ }
+ }
+ }
+ }
+ }
+
+ public boolean isThreaded() {
+ return threaded;
+ }
+
+ public void setThreaded(boolean threaded) {
+ this.threaded = threaded;
+ }
+
+ private class UpdateEvent extends ForeignEvent {
+ private final PriorityDispatchContext pdc;
+
+ UpdateEvent(PriorityDispatchContext pdc) {
+ this.pdc = pdc;
+ }
+
+ // Can only be called by the owner of this dispatch context:
+ public void execute() {
+ pdc.processForeignUpdates();
+ }
+ }
+
+ public DispatchContext register(Dispatchable dispatchable, String name) {
+ return new PriorityDispatchContext(dispatchable, true, name);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.dispatch.IDispatcher#start()
+ */
+ public synchronized final void start() {
+ if (thread == null) {
+ running = true;
+ thread = new Thread(this, name);
+ thread.start();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
+ */
+ public synchronized final void shutdown() throws InterruptedException {
+ if (thread != null) {
+ dispatch(new RunnableAdapter(new Runnable() {
+
+ public void run() {
+ running = false;
+ }
+
+ }), MAX_USER_PRIORITY + 1);
+ // thread.interrupt();
+ thread.join();
+ thread = null;
+ }
+ }
+
+ public void run() {
+
+ // Inform the dispatcher that we have started:
+ pooledDispatcher.onDispatcherStarted((D) this);
+ PriorityDispatchContext pdc;
+ try {
+ while (running) {
+ pdc = priorityQueue.poll();
+ // If no local work available wait for foreign work:
+ if (pdc == null) {
+ waitForEvents();
+ } else {
+ if (pdc.tracker != null) {
+ pooledDispatcher.setCurrentDispatchContext(pdc);
+ }
+
+ while (!pdc.dispatch()) {
+ // If there is a higher priority dispatchable stop
+ // processing this one:
+ if (pdc.listPrio < priorityQueue.getHighestPriority()) {
+ // May have gotten relinked by the caller:
+ if (!pdc.isLinked()) {
+ priorityQueue.add(pdc, pdc.listPrio);
+ }
+ break;
+ }
+ }
+
+ pooledDispatcher.setCurrentDispatchContext(null);
+ }
+
+ // Execute delayed events:
+ timerHeap.executeReadyTimers();
+
+ // Allow subclasses to do additional work:
+ dispatchHook();
+
+ // Check for foreign dispatch requests:
+ if (foreignAvailable.get()) {
+ LinkedNodeList<ForeignEvent> foreign;
+ synchronized (foreignQueue) {
+ // Swap foreign queues and drain permits;
+ foreign = foreignQueue[foreignToggle];
+ foreignToggle = TOGGLE[foreignToggle];
+ foreignAvailable.set(false);
+ foreignPermits.drainPermits();
+ }
+ while (true) {
+ ForeignEvent fe = foreign.getHead();
+ if (fe == null) {
+ break;
+ }
+
+ fe.unlink();
+ fe.execute();
+ }
+
+ }
+ }
+ } catch (InterruptedException e) {
+ return;
+ } catch (Throwable thrown) {
+ thrown.printStackTrace();
+ } finally {
+ pooledDispatcher.onDispatcherStopped((D) this);
+ }
+ }
+
+ /**
+ * Subclasses may override this to do do additional dispatch work:
+ */
+ protected void dispatchHook() throws Exception {
+
+ }
+
+ /**
+ * Subclasses may override this to implement another mechanism for wakeup.
+ *
+ * @throws Exception
+ */
+ protected void waitForEvents() throws Exception {
+ foreignPermits.acquire();
+ }
+
+ /**
+ * Subclasses may override this to provide an alternative wakeup mechanism.
+ */
+ protected void wakeup() {
+ foreignPermits.release();
+ }
+
+ protected final void onForeignUdate(PriorityDispatchContext context) {
+ synchronized (foreignQueue) {
+
+ ForeignEvent fe = context.updateEvent[foreignToggle];
+ if (!fe.isLinked()) {
+ foreignQueue[foreignToggle].addLast(fe);
+ if (!foreignAvailable.getAndSet(true)) {
+ wakeup();
+ }
+ }
+ }
+ }
+
+ protected final boolean removeDispatchContext(PriorityDispatchContext context) {
+ synchronized (foreignQueue) {
+
+ if (context.updateEvent[0].isLinked()) {
+ context.updateEvent[0].unlink();
+ }
+ if (context.updateEvent[1].isLinked()) {
+ context.updateEvent[1].unlink();
+ }
+ if (context.isLinked()) {
+ context.unlink();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.dispatch.IDispatcher#dispatch(org.apache.activemq
+ * .dispatch.Dispatcher.Dispatchable)
+ */
+ public final void dispatch(Dispatchable dispatchable, int priority) {
+ PriorityDispatchContext context = new PriorityDispatchContext(dispatchable, false, name);
+ context.updatePriority(priority);
+ context.requestDispatch();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.dispatch.IDispatcher#createPriorityExecutor(int)
+ */
+ public Executor createPriorityExecutor(final int priority) {
+
+ return new Executor() {
+
+ public void execute(final Runnable runnable) {
+ dispatch(new RunnableAdapter(runnable), priority);
+ }
+ };
+ }
+
+ public void execute(final Runnable runnable) {
+ dispatch(new RunnableAdapter(runnable), 0);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable,
+ * long, java.util.concurrent.TimeUnit)
+ */
+ public void schedule(final Runnable runnable, final long delay, final TimeUnit timeUnit) {
+ if (getCurrentDispatcher() == this) {
+ timerHeap.add(runnable, delay, timeUnit);
+ } else {
+ new ForeignEvent() {
+ public void execute() {
+ timerHeap.add(runnable, delay, timeUnit);
+ }
+ }.addToList();
+ }
+ }
+
+ public String toString() {
+ return name;
+ }
+
+ private final D getCurrentDispatcher() {
+ return pooledDispatcher.getCurrentDispatcher();
+ }
+
+ private final PooledDispatchContext<D> getCurrentDispatchContext() {
+ return pooledDispatcher.getCurrentDispatchContext();
+ }
+
+ /**
+ *
+ */
+ protected class PriorityDispatchContext extends LinkedNode<PriorityDispatchContext> implements PooledDispatchContext<D> {
+ // The dispatchable target:
+ final Dispatchable dispatchable;
+ // The name of this context:
+ final String name;
+ // list prio can only be updated in the thread of of the owning
+ // dispatcher
+ protected int listPrio;
+
+ // The update events are used to update fields in the dispatch context
+ // from foreign threads:
+ final UpdateEvent updateEvent[];
+
+ private final ExecutionTracker<D> tracker;
+ private D currentOwner;
+ private D updateDispatcher = null;
+
+ private int priority;
+ private boolean dispatchRequested = false;
+ private boolean closed = false;
+
+ protected PriorityDispatchContext(Dispatchable dispatchable, boolean persistent, String name) {
+ this.dispatchable = dispatchable;
+ this.name = name;
+ this.currentOwner = (D) PriorityDispatcher.this;
+ if (persistent) {
+ this.tracker = pooledDispatcher.getLoadBalancer().createExecutionTracker((PooledDispatchContext<D>) this);
+ } else {
+ this.tracker = null;
+ }
+ updateEvent = createUpdateEvent();
+ updateEvent[0] = new UpdateEvent(this);
+ updateEvent[1] = new UpdateEvent(this);
+ }
+
+ @SuppressWarnings("unchecked")
+ private final PriorityDispatcher<D>.UpdateEvent[] createUpdateEvent() {
+ return new PriorityDispatcher.UpdateEvent[2];
+ }
+
+ /**
+ * Gets the execution tracker for the context.
+ *
+ * @return the execution tracker for the context:
+ */
+ public ExecutionTracker<D> getExecutionTracker() {
+ return tracker;
+ }
+
+ /**
+ * This can only be called by the owning dispatch thread:
+ *
+ * @return False if the dispatchable has more work to do.
+ */
+ public final boolean dispatch() {
+ return dispatchable.dispatch();
+ }
+
+ public final void assignToNewDispatcher(D newDispatcher) {
+ synchronized (this) {
+
+ // If we're already set to this dispatcher
+ if (newDispatcher == currentOwner) {
+ if (updateDispatcher == null || updateDispatcher == newDispatcher) {
+ return;
+ }
+ }
+
+ updateDispatcher = newDispatcher;
+ if (DEBUG)
+ System.out.println(getName() + " updating to " + updateDispatcher);
+ }
+ currentOwner.onForeignUdate(this);
+ }
+
+ public void requestDispatch() {
+
+ D callingDispatcher = getCurrentDispatcher();
+ if (tracker != null)
+ tracker.onDispatchRequest(callingDispatcher, getCurrentDispatchContext());
+
+ // Otherwise this is coming off another thread, so we need to
+ // synchronize
+ // to protect against ownership changes:
+ synchronized (this) {
+ // If the owner of this context is the calling thread, then
+ // delegate to the dispatcher.
+ if (currentOwner == callingDispatcher) {
+
+ if (!isLinked()) {
+ currentOwner.priorityQueue.add(this, listPrio);
+ }
+ return;
+ }
+
+ dispatchRequested = true;
+ }
+ // FIXME Thread safety!
+ currentOwner.onForeignUdate(this);
+ }
+
+ public void updatePriority(int priority) {
+ if (this.priority == priority) {
+ return;
+ }
+ D callingDispatcher = getCurrentDispatcher();
+
+ // Otherwise this is coming off another thread, so we need to
+ // synchronize to protect against ownership changes:
+ synchronized (this) {
+ this.priority = priority;
+
+ // If this is called by the owning dispatcher, then we go ahead
+ // and update:
+ if (currentOwner == callingDispatcher) {
+
+ if (priority != listPrio) {
+
+ listPrio = priority;
+ // If there is a priority change relink the context
+ // at the new priority:
+ if (isLinked()) {
+ unlink();
+ currentOwner.priorityQueue.add(this, listPrio);
+ }
+ }
+ return;
+ }
+ }
+ // FIXME Thread safety!
+ currentOwner.onForeignUdate(this);
+ }
+
+ public void processForeignUpdates() {
+ boolean ownerChange = false;
+ synchronized (this) {
+
+ if (closed) {
+ close();
+ return;
+ }
+
+ if (updateDispatcher != null) {
+ if (DEBUG) {
+ System.out.println("Assigning " + getName() + " to " + updateDispatcher);
+ }
+ if (currentOwner.removeDispatchContext(this)) {
+ dispatchRequested = true;
+ }
+ currentOwner = updateDispatcher;
+ updateDispatcher = null;
+ ownerChange = true;
+ } else {
+ updatePriority(priority);
+
+ if (dispatchRequested) {
+ dispatchRequested = false;
+ requestDispatch();
+ }
+ }
+ }
+
+ if (ownerChange) {
+ currentOwner.onForeignUdate(this);
+ }
+ }
+
+ /**
+ * May be overriden by subclass to additional work on dispatcher switch
+ *
+ * @param oldDispatcher
+ * The old dispatcher
+ * @param newDispatcher
+ * The new Dispatcher
+ */
+ protected void switchedDispatcher(D oldDispatcher, D newDispatcher) {
+
+ }
+
+ public void close() {
+ tracker.close();
+ D callingDispatcher = getCurrentDispatcher();
+ synchronized (this) {
+ closed = true;
+
+ // If the owner of this context is the calling thread, then
+ // delegate to the dispatcher.
+ if (currentOwner == callingDispatcher) {
+ if (isLinked()) {
+ unlink();
+ }
+ // FIXME Deadlock potential!
+ synchronized (foreignQueue) {
+ if (updateEvent[foreignToggle].isLinked()) {
+ updateEvent[foreignToggle].unlink();
+ }
+ }
+ }
+ }
+ currentOwner.onForeignUdate(this);
+ }
+
+ public final String toString() {
+ return getName();
+ }
+
+ public Dispatchable getDispatchable() {
+ return dispatchable;
+ }
+
+ public D getDispatcher() {
+ return currentOwner;
+ }
+
+ public String getName() {
+ return name;
+ }
+ }
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java?rev=746251&r1=746250&r2=746251&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java Fri Feb 20 14:23:26 2009
@@ -1,307 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.dispatch;
-
-import java.util.ArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.activemq.dispatch.ExecutionLoadBalancer.ExecutionTracker;
-
-public class PriorityPooledDispatcher implements IDispatcher, PooledDispatcher {
- private final String name;
-
- private static final ThreadLocal<PooledDispatchContext> dispatchContext = new ThreadLocal<PooledDispatchContext>();
- private static final ThreadLocal<PoolableDispatcher> dispatcher = new ThreadLocal<PoolableDispatcher>();
-
- private final ArrayList<PriorityDispatcher> dispatchers = new ArrayList<PriorityDispatcher>();
-
- final AtomicBoolean started = new AtomicBoolean();
- final AtomicBoolean shutdown = new AtomicBoolean();
-
- private int roundRobinCounter = 0;
- private final int size;
- private final boolean DEBUG = false;
-
- private final ExecutionLoadBalancer loadBalancer;
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.dispatch.IDispatcher#createPriorityExecutor(int)
- */
- public Executor createPriorityExecutor(final int priority) {
- return new Executor() {
- public void execute(final Runnable runnable) {
- chooseDispatcher().dispatch(new RunnableAdapter(runnable), priority);
- }
- };
- }
-
- public PriorityPooledDispatcher(String name, int size, int priorities) {
- this.name = name;
- this.size = size;
- loadBalancer = new SimpleLoadBalancer();
- // Create all the workers.
- for (int i = 0; i < size; i++) {
- PriorityDispatcher dispatcher = new PriorityDispatcher(name + "-" + (i + 1), priorities, this);
- dispatchers.add(dispatcher);
- }
- }
-
- public DispatchContext register(Dispatchable dispatchable, String name) {
- return createPooledDispatchContext(chooseDispatcher().createPoolableDispatchContext(dispatchable, name));
- }
-
- /**
- * @see org.apache.activemq.dispatch.IDispatcher#start()
- */
- public synchronized final void start() {
- loadBalancer.start();
- if (started.compareAndSet(false, true)) {
- // Create all the workers.
- for (int i = 0; i < size; i++) {
- dispatchers.get(i).start();
- }
- }
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
- */
- public synchronized final void shutdown() throws InterruptedException {
- shutdown.set(true);
- for (PriorityDispatcher dispatcher : dispatchers) {
- dispatcher.shutdown();
- }
- loadBalancer.stop();
- }
-
- private PriorityDispatcher chooseDispatcher() {
- PriorityDispatcher d = PriorityDispatcher.dispatcher.get();
- if (d == null) {
- synchronized (dispatchers) {
- if (++roundRobinCounter >= size) {
- roundRobinCounter = 0;
- }
- return dispatchers.get(roundRobinCounter);
- }
- } else {
- return d;
- }
- }
-
- public void execute(final Runnable runnable) {
- chooseDispatcher().dispatch(new RunnableAdapter(runnable), 0);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable,
- * long, java.util.concurrent.TimeUnit)
- */
- public void schedule(final Runnable runnable, long delay, TimeUnit timeUnit) {
- chooseDispatcher().schedule(runnable, delay, timeUnit);
- }
-
- public PooledDispatchContext createPooledDispatchContext(PoolableDispatchContext context) {
- return new PriorityPooledDispatchContext(context);
- }
-
- /**
- * A Dispatcher must call this to indicate that is has started it's dispatch
- * loop.
- */
- public void onDispatcherStarted(PoolableDispatcher d) {
- dispatcher.set(d);
- loadBalancer.addDispatcher(d);
- }
-
- /**
- * A Dispatcher must call this when exiting it's dispatch loop
- */
- public void onDispatcherStopped(PoolableDispatcher d) {
- loadBalancer.removeDispatcher(d);
- }
-
- /**
- * ExecutionGraphNode tracks dispatch information for a
- * MappableDispatchContext.
- *
- */
- public class PriorityPooledDispatchContext implements PooledDispatchContext {
- private final ExecutionTracker tracker;
-
- private PoolableDispatchContext context;
- private PoolableDispatcher currentOwner;
- private int priority;
- private boolean dispatchRequested = false;
- private PoolableDispatcher updateDispatcher = null;
- private boolean closed = false;
-
- PriorityPooledDispatchContext(PoolableDispatchContext context) {
- this.context = context;
- this.context.setPooledDispatchContext(this);
- this.currentOwner = context.getDispatcher();
- this.tracker = loadBalancer.createExecutionTracker(this);
-
- }
-
- public final void startingDispatch() {
- dispatchContext.set(this);
- }
-
- public final void finishedDispatch() {
- dispatchContext.set(null);
- }
-
- public final void assignToNewDispatcher(PoolableDispatcher newDispatcher) {
- synchronized (this) {
-
- // If we're already set to this dispatcher
- if (newDispatcher == currentOwner) {
- if (updateDispatcher == null || updateDispatcher == newDispatcher) {
- return;
- }
- }
-
- updateDispatcher = newDispatcher;
- if (DEBUG)
- System.out.println(getName() + " updating to " + context.getDispatcher());
- }
- context.onForeignThreadUpdate();
- }
-
- public void requestDispatch() {
-
- PoolableDispatcher callingDispatcher = dispatcher.get();
-
- tracker.onDispatchRequest(callingDispatcher, dispatchContext.get());
-
- // Otherwise this is coming off another thread, so we need to
- // synchronize
- // to protect against ownership changes:
- synchronized (this) {
- // If the owner of this context is the calling thread, then
- // delegate to the dispatcher.
- if (currentOwner == callingDispatcher) {
-
- context.requestDispatch();
- return;
- }
-
- dispatchRequested = true;
- }
- context.onForeignThreadUpdate();
- }
-
- public void updatePriority(int priority) {
- if (this.priority == priority) {
- return;
- }
- // Otherwise this is coming off another thread, so we need to
- // synchronize to protect against ownership changes:
- synchronized (this) {
- this.priority = priority;
-
- IDispatcher callingDispatcher = dispatcher.get();
-
- // If the owner of this context is the calling thread, then
- // delegate to the dispatcher.
- if (currentOwner == callingDispatcher) {
-
- context.updatePriority(priority);
- return;
- }
- }
- context.onForeignThreadUpdate();
- }
-
- public void processForeignUpdates() {
- boolean ownerChange = false;
- synchronized (this) {
-
- if (closed) {
- context.close();
- return;
- }
-
- if (updateDispatcher != null) {
- // Close the old context:
- if (DEBUG) {
- System.out.println("Assigning " + getName() + " to " + updateDispatcher);
- }
- context.close();
-
- currentOwner = updateDispatcher;
- updateDispatcher = null;
- context = currentOwner.createPoolableDispatchContext(context.getDispatchable(), context.getName());
- dispatchRequested = true;
- context.updatePriority(priority);
- context.setPooledDispatchContext(this);
- ownerChange = true;
- } else {
- context.updatePriority(priority);
-
- if (dispatchRequested) {
- context.requestDispatch();
- dispatchRequested = false;
- }
- }
- }
-
- if (ownerChange) {
- context.onForeignThreadUpdate();
- }
- }
-
- public void close() {
- tracker.close();
- synchronized (this) {
- IDispatcher callingDispatcher = dispatcher.get();
-
- // If the owner of this context is the calling thread, then
- // delegate to the dispatcher.
- if (currentOwner == callingDispatcher) {
- context.close();
- return;
- }
- }
- context.onForeignThreadUpdate();
- }
-
- public final String toString() {
- return context.toString();
- }
-
- public Dispatchable getDispatchable() {
- return context.getDispatchable();
- }
-
- public String getName() {
- return context.getName();
- }
- }
-
- public String toString() {
- return name;
- }
-}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java?rev=746251&r1=746250&r2=746251&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java Fri Feb 20 14:23:26 2009
@@ -19,117 +19,119 @@
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.activemq.dispatch.PooledDispatcher.PoolableDispatcher;
import org.apache.activemq.dispatch.PooledDispatcher.PooledDispatchContext;
-public class SimpleLoadBalancer implements ExecutionLoadBalancer {
+public class SimpleLoadBalancer<D extends IDispatcher> implements ExecutionLoadBalancer<D> {
- private final boolean DEBUG = false;
+ private final boolean DEBUG = false;
- SimpleLoadBalancer() {
- }
+ public SimpleLoadBalancer() {
+ }
- private class ExecutionStats {
- final PooledDispatchContext target;
- final PooledDispatchContext source;
- int count;
-
- ExecutionStats(PooledDispatchContext source, PooledDispatchContext target) {
- this.target = target;
- this.source = source;
- }
-
- public String toString() {
- return "Connection from: " + source + " to " + target;
- }
- }
-
- public void addDispatcher(PoolableDispatcher dispatcher) {
-
- }
-
- public void removeDispatcher(PoolableDispatcher dispatcher) {
-
- }
-
- public void start() {
- }
-
- public void stop() {
- }
-
- public ExecutionTracker createExecutionTracker(PooledDispatchContext context) {
- return new SimpleExecutionTracker(context);
- }
-
- private class SimpleExecutionTracker implements ExecutionTracker {
- private final HashMap<PooledDispatchContext, ExecutionStats> sources = new HashMap<PooledDispatchContext, ExecutionStats>();
- private final PooledDispatchContext context;
- private final AtomicInteger work = new AtomicInteger(0);
-
- private PooledDispatchContext singleSource;
- private PoolableDispatcher currentOwner;
-
- SimpleExecutionTracker(PooledDispatchContext context) {
- this.context = context;
- }
-
- /**
- * This method is called to track which dispatch contexts are requesting
- * dispatch for the target context represented by this node.
- *
- * This method is not threadsafe, the caller must ensure serialized
- * access to this method.
- *
- * @param callngDispatcher
- * The calling dispatcher.
- * @param context
- * the originating dispatch context
- * @return True if this method resulted in the dispatch request being
- * assigned to another dispatcher.
- */
- public void onDispatchRequest(PoolableDispatcher callingDispatcher, PooledDispatchContext callingContext) {
-
- if (callingContext != null) {
- // Make sure we are being called by another node:
- if (callingContext == null || callingContext == context) {
- return;
- }
-
- // Optimize for single source case:
- if (singleSource != callingContext) {
- if (singleSource == null && sources.isEmpty()) {
- singleSource = callingContext;
- ExecutionStats stats = new ExecutionStats(callingContext, context);
- sources.put(callingContext, stats);
-
- // If this context only has a single source
- // assign it to that source to minimize contention:
- if (callingDispatcher != currentOwner) {
- currentOwner = callingDispatcher;
- if (DEBUG)
- System.out.println("Assigning: " + context + " to " + callingContext + "'s dispatcher: " + callingDispatcher);
- context.assignToNewDispatcher(callingDispatcher);
- }
-
- } else {
-
- ExecutionStats stats = sources.get(callingContext);
- if (stats == null) {
- stats = new ExecutionStats(callingContext, context);
- sources.put(callingContext, stats);
- }
-
- if (singleSource != null) {
- singleSource = null;
- }
- }
- }
- work.incrementAndGet();
- }
- }
-
- public void close() {
- }
- }
+ @SuppressWarnings("hiding")
+ private class ExecutionStats<D extends IDispatcher> {
+ final PooledDispatchContext<D> target;
+ final PooledDispatchContext<D> source;
+ int count;
+
+ ExecutionStats(PooledDispatchContext<D> source, PooledDispatchContext<D> target) {
+ this.target = target;
+ this.source = source;
+ }
+
+ public String toString() {
+ return "Connection from: " + source + " to " + target;
+ }
+ }
+
+ public void addDispatcher(D dispatcher) {
+
+ }
+
+ public void removeDispatcher(D dispatcher) {
+
+ }
+
+ public void start() {
+ }
+
+ public void stop() {
+ }
+
+ public ExecutionTracker<D> createExecutionTracker(PooledDispatchContext<D> context) {
+ return new SimpleExecutionTracker<D>(context);
+ }
+
+ private class SimpleExecutionTracker<D extends IDispatcher> implements ExecutionTracker<D> {
+ private final HashMap<PooledDispatchContext<D>, ExecutionStats<D>> sources = new HashMap<PooledDispatchContext<D>, ExecutionStats<D>>();
+ private final PooledDispatchContext<D> context;
+ private final AtomicInteger work = new AtomicInteger(0);
+
+ private PooledDispatchContext<D> singleSource;
+ private IDispatcher currentOwner;
+
+ SimpleExecutionTracker(PooledDispatchContext<D> context) {
+ this.context = context;
+ currentOwner = context.getDispatcher();
+ }
+
+ /**
+ * This method is called to track which dispatch contexts are requesting
+ * dispatch for the target context represented by this node.
+ *
+ * This method is not threadsafe, the caller must ensure serialized
+ * access to this method.
+ *
+ * @param callngDispatcher
+ * The calling dispatcher.
+ * @param context
+ * the originating dispatch context
+ * @return True if this method resulted in the dispatch request being
+ * assigned to another dispatcher.
+ */
+ public void onDispatchRequest(D callingDispatcher, PooledDispatchContext<D> callingContext) {
+
+ if (callingContext != null) {
+ // Make sure we are being called by another node:
+ if (callingContext == null || callingContext == context) {
+ return;
+ }
+
+ // Optimize for single source case:
+ if (singleSource != callingContext) {
+ if (singleSource == null && sources.isEmpty()) {
+ singleSource = callingContext;
+ ExecutionStats<D> stats = new ExecutionStats<D>(callingContext, context);
+ sources.put(callingContext, stats);
+
+ // If this context only has a single source
+ // assign it to that source to minimize contention:
+ if (callingDispatcher != currentOwner) {
+ if (DEBUG)
+ System.out.println("Assigning: " + context + " to " + callingContext + "'s dispatcher: " + callingDispatcher + " From: " + currentOwner);
+
+ currentOwner = callingDispatcher;
+ context.assignToNewDispatcher(callingDispatcher);
+ }
+
+ } else {
+
+ ExecutionStats<D> stats = sources.get(callingContext);
+ if (stats == null) {
+ stats = new ExecutionStats<D>(callingContext, context);
+ sources.put(callingContext, stats);
+ }
+
+ if (singleSource != null) {
+ singleSource = null;
+ }
+ }
+ }
+ work.incrementAndGet();
+ }
+ }
+
+ public void close() {
+ }
+ }
}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/TimerHeap.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/TimerHeap.java?rev=746251&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/TimerHeap.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/TimerHeap.java Fri Feb 20 14:23:26 2009
@@ -0,0 +1,71 @@
+package org.apache.activemq.dispatch;
+
+import java.util.LinkedList;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+public class TimerHeap {
+ final TreeMap<Long, LinkedList<Runnable>> timers = new TreeMap<Long, LinkedList<Runnable>>();
+
+ public final void add(Runnable runnable, long delay, TimeUnit timeUnit) {
+
+ long nanoDelay = timeUnit.convert(delay, TimeUnit.NANOSECONDS);
+ long eTime = System.nanoTime() + nanoDelay;
+ LinkedList<Runnable> list = new LinkedList<Runnable>();
+ list.add(runnable);
+
+ LinkedList<Runnable> old = timers.put(eTime, list);
+ if (old != null) {
+ list.addAll(old);
+ }
+ }
+
+ /**
+ * Returns the time of the next scheduled event.
+ * @return -1 if there are no events, otherwise the time that the next timer should fire.
+ */
+ public final long timeToNext() {
+ if(timers.isEmpty())
+ {
+ return -1;
+ }
+ else
+ {
+ return Math.max(0, timers.firstKey() - System.nanoTime());
+ }
+ }
+
+ /**
+ * Executes ready timers.
+ */
+ public final void executeReadyTimers() {
+ LinkedList<Runnable> ready = null;
+ if (timers.isEmpty()) {
+ return;
+ } else {
+ long now = System.nanoTime();
+ long first = timers.firstKey();
+ if (first > now) {
+ return;
+ }
+ ready = new LinkedList<Runnable>();
+
+ while (first < now) {
+ ready.addAll(timers.remove(first));
+ if (timers.isEmpty()) {
+ break;
+ }
+ first = timers.firstKey();
+
+ }
+ }
+
+ for (Runnable runnable : ready) {
+ try {
+ runnable.run();
+ } catch (Throwable thrown) {
+ thrown.printStackTrace();
+ }
+ }
+ }
+}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java?rev=746251&r1=746250&r2=746251&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java Fri Feb 20 14:23:26 2009
@@ -37,14 +37,14 @@
public boolean match(Message message);
}
- final Router router= new Router();
-
+ final Router router = new Router();
+
final ArrayList<RemoteConnection> connections = new ArrayList<RemoteConnection>();
final ArrayList<RemoteProducer> producers = new ArrayList<RemoteProducer>();
final ArrayList<RemoteConsumer> consumers = new ArrayList<RemoteConsumer>();
final ArrayList<BrokerConnection> brokerConnections = new ArrayList<BrokerConnection>();
final HashMap<Destination, MockQueue> queues = new HashMap<Destination, MockQueue>();
-
+
private TransportServer transportServer;
private String uri;
private String name;
@@ -63,7 +63,6 @@
}
}
-
public void addQueue(MockQueue queue) {
router.bind(queue, queue.getDestination());
queues.put(queue.getDestination(), queue);
@@ -111,17 +110,15 @@
}
final void startServices() throws Exception {
-
+
+ dispatcher.start();
+
transportServer = TransportFactory.bind(new URI(uri));
transportServer.setAcceptListener(this);
- if(transportServer instanceof DispatchableTransportServer)
- {
- ((DispatchableTransportServer)transportServer).setDispatcher(dispatcher);
+ if (transportServer instanceof DispatchableTransportServer) {
+ ((DispatchableTransportServer) transportServer).setDispatcher(dispatcher);
}
transportServer.start();
-
-
- dispatcher.start();
for (MockQueue queue : queues.values()) {
queue.start();
@@ -130,7 +127,7 @@
for (RemoteConsumer connection : consumers) {
connection.start();
}
-
+
for (RemoteProducer connection : producers) {
connection.start();
}
@@ -155,7 +152,7 @@
}
public void onAcceptError(Exception error) {
- System.out.println("Accept error: "+error);
+ System.out.println("Accept error: " + error);
error.printStackTrace();
}
@@ -186,5 +183,5 @@
public boolean isStopping() {
return stopping.get();
}
-
+
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=746251&r1=746250&r2=746251&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java Fri Feb 20 14:23:26 2009
@@ -24,7 +24,7 @@
import junit.framework.TestCase;
import org.apache.activemq.dispatch.IDispatcher;
-import org.apache.activemq.dispatch.PriorityPooledDispatcher;
+import org.apache.activemq.dispatch.PriorityDispatcher;
import org.apache.activemq.flow.Commands.Destination;
import org.apache.activemq.flow.Commands.Destination.DestinationBean;
import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
@@ -91,6 +91,8 @@
@Override
protected void setUp() throws Exception {
+ dispatcher = PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", Message.MAX_PRIORITY, asyncThreadPoolSize);
+
if( tcp ) {
sendBrokerURI = "tcp://localhost:10000?wireFormat=proto";
receiveBrokerURI = "tcp://localhost:20000?wireFormat=proto";
@@ -105,6 +107,21 @@
}
}
+ public void test_1_1_0() throws Exception {
+ producerCount = 1;
+ destCount = 1;
+
+ createConnections();
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
public void test_1_1_1() throws Exception {
producerCount = 1;
destCount = 1;
@@ -340,9 +357,7 @@
private void createConnections() throws IOException, URISyntaxException {
- dispatcher = new PriorityPooledDispatcher("BrokerDispatcher", asyncThreadPoolSize, Message.MAX_PRIORITY);
FlowController.setFlowExecutor(dispatcher.createPriorityExecutor(Message.MAX_PRIORITY));
-
if (multibroker) {
sendBroker = createBroker("SendBroker", sendBrokerURI);
rcvBroker = createBroker("RcvBroker", receiveBrokerURI);
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java?rev=746251&r1=746250&r2=746251&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java Fri Feb 20 14:23:26 2009
@@ -50,7 +50,7 @@
if(transport instanceof DispatchableTransport)
{
DispatchableTransport dt = ((DispatchableTransport)transport);
- dt.setName(name);
+ dt.setName(name + "-client-transport");
dt.setDispatcher(getDispatcher());
}
super.setTransport(transport);
@@ -59,7 +59,7 @@
transport.start();
// Let the remote side know our name.
transport.oneway(name);
- dispatchContext = getDispatcher().register(this, name + "-producer");
+ dispatchContext = getDispatcher().register(this, name + "-client");
dispatchContext.requestDispatch();
}