You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/02/12 23:07:17 UTC
svn commit: r743900 - in /activemq/sandbox/activemq-flow/src:
main/java/org/apache/activemq/dispatch/ test/java/org/apache/activemq/flow/
Author: chirino
Date: Thu Feb 12 22:07:16 2009
New Revision: 743900
URL: http://svn.apache.org/viewvc?rev=743900&view=rev
Log:
Applying colins patch https://issues.apache.org/activemq/browse/AMQ-2107
Added:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java?rev=743900&r1=743899&r2=743900&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java Thu Feb 12 22:07:16 2009
@@ -16,82 +16,24 @@
*/
package org.apache.activemq.dispatch;
-import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
+import org.apache.activemq.dispatch.PooledDispatcher.PoolableDispatcher;
+import org.apache.activemq.dispatch.PooledDispatcher.PooledDispatchContext;
public interface ExecutionLoadBalancer {
- /**
- * A Load Balanced Dispatch context can be moved between different
- * dispatchers.
- */
- public interface LoadBalancedDispatchContext extends DispatchContext {
- /**
- * A dispatcher must call this when it starts dispatch for this context
- */
- public void startingDispatch();
-
- /**
- * A dispatcher must call this when it has finished dispatching a
- * context
- */
- public void finishedDispatch();
-
- /**
- *
- */
- public void processForeignUpdates();
- }
-
- public interface PoolableDispatchContext extends DispatchContext {
-
- public void setLoadBalancedDispatchContext(LoadBalancedDispatchContext context);
-
- /**
- * Indicates that another thread has made an update to the dispatch
- * context.
- *
- */
- public void onForeignThreadUpdate();
-
- public PoolableDispatcher getDispatcher();
- }
-
- public interface PoolableDispatcher extends IDispatcher {
-
- /**
- * Indicates that another thread has made an update to the dispatch
- * context.
- *
- */
- public PoolableDispatchContext createPoolablDispatchContext(Dispatchable dispatchable, String name);
- }
-
- /**
- * This wraps the dispatch context into one that is load balanced by the
- * LoadBalancer
- *
- * @param context
- * The context to wrap.
- * @return
- */
- public LoadBalancedDispatchContext createLoadBalancedDispatchContext(PoolableDispatchContext context);
-
- /**
- * Adds a Dispatcher to the list of dispatchers managed by the load balancer
- *
- * @param dispatcher
- */
- public void addDispatcher(PoolableDispatcher dispatcher);
-
- /**
- * A Dispatcher must call this from it's dispatcher thread to indicate that
- * is has started it's dispatch has started.
- */
- public void onDispatcherStarted(PoolableDispatcher dispatcher);
-
- /**
- * A Dispatcher must call this from it's dispatcher thread when exiting it's
- * dispatch loop
- */
- public void onDispatcherStopped(PoolableDispatcher dispatcher);
+ public interface ExecutionTracker {
+ public void onDispatchRequest(PoolableDispatcher caller, PooledDispatchContext context);
+
+ public void close();
+ }
+
+ public void addDispatcher(PoolableDispatcher dispatcher);
+
+ public void removeDispatcher(PoolableDispatcher dispatcher);
+
+ public ExecutionTracker createExecutionTracker(PooledDispatchContext context);
+
+ public void start();
+
+ public void stop();
}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java?rev=743900&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java Thu Feb 12 22:07:16 2009
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
+
+public interface PooledDispatcher {
+
+ /**
+ * A {@link PooledDispatchContext}s can be moved between different
+ * dispatchers.
+ */
+ public interface PooledDispatchContext extends DispatchContext {
+ /**
+ * Called to transfer a {@link PooledDispatchContext} to a new
+ * Dispatcher.
+ */
+ public void assignToNewDispatcher(PoolableDispatcher newDispatcher);
+
+ /**
+ * A dispatcher must call this when it starts dispatch for this context
+ */
+ public void startingDispatch();
+
+ /**
+ * A dispatcher must call this when it has finished dispatching a
+ * context
+ */
+ public void finishedDispatch();
+
+ /**
+ * Called by the dispatch thread to let the pooled context set any info
+ * set by other threads.
+ */
+ public void processForeignUpdates();
+ }
+
+ public interface PoolableDispatchContext extends DispatchContext {
+
+ public void setPooledDispatchContext(PooledDispatchContext context);
+
+ /**
+ * Indicates that another thread has made an update to the dispatch
+ * context.
+ *
+ */
+ public void onForeignThreadUpdate();
+
+ public PoolableDispatcher getDispatcher();
+ }
+
+ /**
+ * A PoolableDispatcher is one that can be owned by an
+ * {@link PooledDispatcher}.
+ */
+ public interface PoolableDispatcher extends IDispatcher {
+
+ /**
+ * Indicates that another thread has made an update to the dispatch
+ * context.
+ *
+ */
+ public PoolableDispatchContext createPoolableDispatchContext(Dispatchable dispatchable, String name);
+ }
+
+ /**
+ * This wraps the dispatch context into one that is load balanced by the
+ * LoadBalancer
+ *
+ * @param context
+ * The context to wrap.
+ * @return
+ */
+ public PooledDispatchContext createPooledDispatchContext(PoolableDispatchContext context);
+
+ /**
+ * A Dispatcher must call this from it's dispatcher thread to indicate that
+ * is has started it's dispatch has started.
+ */
+ public void onDispatcherStarted(PoolableDispatcher dispatcher);
+
+ /**
+ * A Dispatcher must call this from it's dispatcher thread when exiting it's
+ * dispatch loop
+ */
+ public void onDispatcherStopped(PoolableDispatcher dispatcher);
+}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java?rev=743900&r1=743899&r2=743900&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java Thu Feb 12 22:07:16 2009
@@ -23,431 +23,442 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.activemq.dispatch.ExecutionLoadBalancer.LoadBalancedDispatchContext;
-import org.apache.activemq.dispatch.ExecutionLoadBalancer.PoolableDispatchContext;
-import org.apache.activemq.dispatch.ExecutionLoadBalancer.PoolableDispatcher;
+import org.apache.activemq.dispatch.PooledDispatcher.PoolableDispatchContext;
+import org.apache.activemq.dispatch.PooledDispatcher.PoolableDispatcher;
+import org.apache.activemq.dispatch.PooledDispatcher.PooledDispatchContext;
import org.apache.activemq.queue.Mapper;
import org.apache.kahadb.util.LinkedNode;
import org.apache.kahadb.util.LinkedNodeList;
public class PriorityDispatcher implements Runnable, PoolableDispatcher {
- private Thread thread;
- private boolean running = false;
- private boolean threaded = false;
- private final int MAX_USER_PRIORITY;
-
- static final ThreadLocal<PriorityDispatcher> dispatcher = new ThreadLocal<PriorityDispatcher>();
-
- private final ExecutionLoadBalancer loadBalancer;
-
- // The local dispatch queue:
- private final PriorityLinkedList<PriorityDispatchContext> priorityQueue;
-
- // Dispatch queue for requests from other threads:
- private final LinkedNodeList<ForeignEvent> foreignQueue = new LinkedNodeList<ForeignEvent>();
-
- // Timed Execution List
- private final TimerHeap timerHeap = new TimerHeap();
-
- private final String name;
- private final AtomicBoolean foreignAvailable = new AtomicBoolean(false);
- private final Semaphore foreignPermits = new Semaphore(0);
-
- private final Mapper<Integer, PriorityDispatchContext> PRIORITY_MAPPER = new Mapper<Integer, PriorityDispatchContext>() {
- public Integer map(PriorityDispatchContext element) {
- return element.listPrio;
- }
- };
-
- public PriorityDispatcher(String name, int priorities, ExecutionLoadBalancer loadBalancer) {
- this.name = name;
- MAX_USER_PRIORITY = priorities;
- priorityQueue = new PriorityLinkedList<PriorityDispatchContext>(MAX_USER_PRIORITY + 1, PRIORITY_MAPPER);
- this.loadBalancer = loadBalancer;
- loadBalancer.addDispatcher(this);
- }
-
- private abstract class ForeignEvent extends LinkedNode<ForeignEvent> {
- public abstract void execute();
-
- final void addToList() {
- synchronized (foreignQueue) {
- if (!this.isLinked()) {
- foreignQueue.addLast(this);
- if (!foreignAvailable.getAndSet(true)) {
- foreignPermits.release();
- }
- }
- }
- }
- }
-
- public boolean isThreaded() {
- return threaded;
- }
-
- public void setThreaded(boolean threaded) {
- this.threaded = threaded;
- }
-
- private class UpdateEvent extends ForeignEvent {
- private final PriorityDispatchContext pdc;
-
- UpdateEvent(PriorityDispatchContext pdc) {
- this.pdc = pdc;
- }
-
- // Can only be called by the owner of this dispatch context:
- public void execute() {
- pdc.lbContext.processForeignUpdates();
- }
- }
-
- class PriorityDispatchContext extends LinkedNode<PriorityDispatchContext> implements PoolableDispatchContext {
- // The dispatchable target:
- final Dispatchable dispatchable;
- LoadBalancedDispatchContext lbContext;
- // The name of this context:
- final String name;
- // list prio can only be updated in the thread of of this dispatcher:
- int listPrio;
- // The update event is used to update fields in the dispatch context
- // from foreign threads:
- final UpdateEvent updateEvent = new UpdateEvent(this);
-
- // Marked by the caller when this is closed.
- boolean closed = false;
-
- private PriorityDispatchContext(Dispatchable dispatchable, boolean persistent, String name) {
- super();
- this.dispatchable = dispatchable;
- this.name = name;
- }
-
- // The load balancer will guarantee that this is on our thread:
- public final void requestDispatch() {
- if (!isLinked()) {
- priorityQueue.add(this, listPrio);
- }
- return;
- }
-
- // The load balancer guarantees that this is called on our thread:
- public final void updatePriority(int priority) {
- if (priority != listPrio) {
-
- listPrio = priority;
- // If there is a priority change relink the context
- // at the new priority:
- if (isLinked()) {
- unlink();
- priorityQueue.add(this, listPrio);
- }
- }
- return;
-
- }
-
- public void onForeignThreadUpdate() {
- updateEvent.addToList();
- }
-
- // Will only be called on this thread:
- public void close() {
- if (isLinked()) {
- unlink();
- }
- synchronized (foreignQueue) {
- if (updateEvent.isLinked()) {
- updateEvent.unlink();
- }
- }
-
- closed = true;
- }
-
- /**
- * This can only be called by the owning dispatch thread:
- *
- * @return False if the dispatchable has more work to do.
- */
- public final boolean dispatch() {
- return dispatchable.dispatch();
- }
-
- public String toString() {
- return name;
- }
-
- public Dispatchable getDispatchable() {
- return dispatchable;
- }
-
- public void setLoadBalancedDispatchContext(LoadBalancedDispatchContext context) {
- this.lbContext = context;
- }
-
- public String getName() {
- return name;
- }
-
- public PoolableDispatcher getDispatcher() {
- return PriorityDispatcher.this;
- }
- }
-
- public DispatchContext register(Dispatchable dispatchable, String name) {
- return loadBalancer.createLoadBalancedDispatchContext(createPoolablDispatchContext(dispatchable, name));
- }
-
- public PoolableDispatchContext createPoolablDispatchContext(Dispatchable dispatchable, String name) {
- return new PriorityDispatchContext(dispatchable, true, name);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.dispatch.IDispatcher#start()
- */
- public synchronized final void start() {
- if (thread == null) {
- running = true;
- thread = new Thread(this, name);
- thread.start();
- }
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
- */
- public synchronized final void shutdown() throws InterruptedException {
- if (thread != null) {
- dispatch(new RunnableAdapter(new Runnable() {
-
- public void run() {
- running = false;
- }
-
- }), MAX_USER_PRIORITY + 1);
- thread.interrupt();
- thread.join();
- thread = null;
- }
- }
-
- public void run() {
-
- // Inform the dispatcher that we have started:
- loadBalancer.onDispatcherStarted(this);
- dispatcher.set(this);
- PriorityDispatchContext pdc;
- try {
- while (running) {
- pdc = priorityQueue.poll();
- // If no local work available wait for foreign work:
- if (pdc == null) {
- foreignPermits.acquire();
- } else {
- pdc.lbContext.startingDispatch();
-
- while (!pdc.dispatch()) {
- // If there is a higher priority dispatchable stop
- // processing this one:
- if (pdc.listPrio < priorityQueue.getHighestPriority()) {
- // May have gotten relinked by the caller:
- if (!pdc.isLinked()) {
- priorityQueue.add(pdc, pdc.listPrio);
- }
- break;
- }
- }
-
- pdc.lbContext.finishedDispatch();
-
- }
-
- // Execute delayed events:
- timerHeap.executeReadyEvents();
-
- // Check for foreign dispatch requests:
- if (foreignAvailable.get()) {
- synchronized (foreignQueue) {
- // Drain the foreign queue:
- while (true) {
- ForeignEvent fe = foreignQueue.getHead();
- // TODO should probably swap foreign queue here:
- if (fe == null) {
- foreignAvailable.set(false);
- foreignPermits.drainPermits();
- break;
- }
-
- fe.unlink();
- fe.execute();
- }
- }
- }
- }
- } catch (InterruptedException e) {
- return;
- } catch (Throwable thrown) {
- thrown.printStackTrace();
- } finally {
- loadBalancer.onDispatcherStopped(this);
- }
- }
-
- class ThreadSafeDispatchContext implements LoadBalancedDispatchContext {
- final PriorityDispatchContext delegate;
-
- ThreadSafeDispatchContext(PriorityDispatchContext context) {
- this.delegate = context;
- delegate.setLoadBalancedDispatchContext(this);
- }
-
- public void finishedDispatch() {
- // NOOP
-
- }
-
- public void startingDispatch() {
- // Noop
-
- }
-
- public void close() {
- // Noop this is always transient:
- }
-
- public void processForeignUpdates() {
- requestDispatch();
- }
-
- public Dispatchable getDispatchable() {
- return delegate.getDispatchable();
- }
-
- public void requestDispatch() {
- if (dispatcher.get() == PriorityDispatcher.this) {
- delegate.requestDispatch();
- } else {
- delegate.onForeignThreadUpdate();
- }
- }
-
- public void updatePriority(int priority) {
- throw new UnsupportedOperationException("Not implemented");
- }
-
- public String getName() {
- return delegate.name;
- }
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.activemq.dispatch.IDispatcher#dispatch(org.apache.activemq
- * .dispatch.Dispatcher.Dispatchable)
- */
- final void dispatch(Dispatchable dispatchable, int priority) {
- ThreadSafeDispatchContext context = new ThreadSafeDispatchContext(new PriorityDispatchContext(dispatchable, false, name));
- context.delegate.updatePriority(priority);
- context.requestDispatch();
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.dispatch.IDispatcher#createPriorityExecutor(int)
- */
- public Executor createPriorityExecutor(final int priority) {
-
- return new Executor() {
-
- public void execute(final Runnable runnable) {
- dispatch(new RunnableAdapter(runnable), priority);
- }
- };
- }
-
- public void execute(final Runnable runnable) {
- dispatch(new RunnableAdapter(runnable), 0);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable,
- * long, java.util.concurrent.TimeUnit)
- */
- public void schedule(final Runnable runnable, final long delay, final TimeUnit timeUnit) {
- if (dispatcher.get() == this) {
- timerHeap.add(runnable, delay, timeUnit);
- } else {
- new ForeignEvent() {
- public void execute() {
- timerHeap.add(runnable, delay, timeUnit);
- }
- }.addToList();
- }
- }
-
- public String toString() {
- return name;
- }
-
- private class TimerHeap {
-
- final TreeMap<Long, LinkedList<Runnable>> timers = new TreeMap<Long, LinkedList<Runnable>>();
-
- private void add(Runnable runnable, long delay, TimeUnit timeUnit) {
-
- long nanoDelay = timeUnit.convert(delay, TimeUnit.NANOSECONDS);
- long eTime = System.nanoTime() + nanoDelay;
- LinkedList<Runnable> list = new LinkedList<Runnable>();
- list.add(runnable);
-
- LinkedList<Runnable> old = timers.put(eTime, list);
- if (old != null) {
- list.addAll(old);
- }
- }
-
- private void executeReadyEvents() {
- LinkedList<Runnable> ready = null;
- if (timers.isEmpty()) {
- return;
- } else {
- long now = System.nanoTime();
- long first = timers.firstKey();
- if (first > now) {
- return;
- }
- ready = new LinkedList<Runnable>();
-
- while (first < now) {
- ready.addAll(timers.remove(first));
- if (timers.isEmpty()) {
- break;
- }
- first = timers.firstKey();
-
- }
- }
-
- for (Runnable runnable : ready) {
- try {
- runnable.run();
- } catch (Throwable thrown) {
- thrown.printStackTrace();
- }
- }
- }
- }
+ private Thread thread;
+ private boolean running = false;
+ private boolean threaded = false;
+ private final int MAX_USER_PRIORITY;
+
+ static final ThreadLocal<PriorityDispatcher> dispatcher = new ThreadLocal<PriorityDispatcher>();
+
+ private final PooledDispatcher pooledDispatcher;
+
+ // The local dispatch queue:
+ private final PriorityLinkedList<PriorityDispatchContext> priorityQueue;
+
+ // Dispatch queue for requests from other threads:
+ private final LinkedNodeList<ForeignEvent>[] foreignQueue;
+ private static final int[] TOGGLE = new int[] { 1, 0 };
+ private int foreignToggle = 0;
+
+ // Timed Execution List
+ private final TimerHeap timerHeap = new TimerHeap();
+
+ private final String name;
+ private final AtomicBoolean foreignAvailable = new AtomicBoolean(false);
+ private final Semaphore foreignPermits = new Semaphore(0);
+
+ private final Mapper<Integer, PriorityDispatchContext> PRIORITY_MAPPER = new Mapper<Integer, PriorityDispatchContext>() {
+ public Integer map(PriorityDispatchContext element) {
+ return element.listPrio;
+ }
+ };
+
+ public PriorityDispatcher(String name, int priorities, PooledDispatcher pooledDispactcher) {
+ this.name = name;
+ MAX_USER_PRIORITY = priorities;
+ priorityQueue = new PriorityLinkedList<PriorityDispatchContext>(MAX_USER_PRIORITY + 1, PRIORITY_MAPPER);
+ foreignQueue = createForeignEventQueue();
+ for (int i = 0; i < 2; i++) {
+ foreignQueue[i] = new LinkedNodeList<ForeignEvent>();
+ }
+ this.pooledDispatcher = pooledDispactcher;
+ }
+
+ @SuppressWarnings("unchecked")
+ private LinkedNodeList<ForeignEvent>[] createForeignEventQueue() {
+ return new LinkedNodeList[2];
+ }
+
+ private abstract class ForeignEvent extends LinkedNode<ForeignEvent> {
+ public abstract void execute();
+
+ final void addToList() {
+ synchronized (foreignQueue) {
+ if (!this.isLinked()) {
+ foreignQueue[foreignToggle].addLast(this);
+ if (!foreignAvailable.getAndSet(true)) {
+ foreignPermits.release();
+ }
+ }
+ }
+ }
+ }
+
+ public boolean isThreaded() {
+ return threaded;
+ }
+
+ public void setThreaded(boolean threaded) {
+ this.threaded = threaded;
+ }
+
+ private class UpdateEvent extends ForeignEvent {
+ private final PriorityDispatchContext pdc;
+
+ UpdateEvent(PriorityDispatchContext pdc) {
+ this.pdc = pdc;
+ }
+
+ // Can only be called by the owner of this dispatch context:
+ public void execute() {
+ pdc.poolContext.processForeignUpdates();
+ }
+ }
+
+ class PriorityDispatchContext extends LinkedNode<PriorityDispatchContext> implements PoolableDispatchContext {
+ // The dispatchable target:
+ final Dispatchable dispatchable;
+ PooledDispatchContext poolContext;
+ // The name of this context:
+ final String name;
+ // list prio can only be updated in the thread of of this dispatcher:
+ int listPrio;
+ // The update events are used to update fields in the dispatch context
+ // from foreign threads:
+ final UpdateEvent updateEvent[] = new UpdateEvent[] { new UpdateEvent(this), new UpdateEvent(this) };
+
+ private PriorityDispatchContext(Dispatchable dispatchable, boolean persistent, String name) {
+ super();
+ this.dispatchable = dispatchable;
+ this.name = name;
+ }
+
+ // This can only be called on this thread
+ public final void requestDispatch() {
+ if (!isLinked()) {
+ priorityQueue.add(this, listPrio);
+ }
+ return;
+ }
+
+ // This can only be called on this thread
+ public final void updatePriority(int priority) {
+ if (priority != listPrio) {
+
+ listPrio = priority;
+ // If there is a priority change relink the context
+ // at the new priority:
+ if (isLinked()) {
+ unlink();
+ priorityQueue.add(this, listPrio);
+ }
+ }
+ return;
+
+ }
+
+ public void onForeignThreadUpdate() {
+ synchronized (foreignQueue) {
+ updateEvent[foreignToggle].addToList();
+ }
+ }
+
+ // This can only be called on this thread
+ public void close() {
+ if (isLinked()) {
+ unlink();
+ }
+ synchronized (foreignQueue) {
+ if (updateEvent[foreignToggle].isLinked()) {
+ updateEvent[foreignToggle].unlink();
+ }
+ }
+ }
+
+ /**
+ * This can only be called by the owning dispatch thread:
+ *
+ * @return False if the dispatchable has more work to do.
+ */
+ public final boolean dispatch() {
+ return dispatchable.dispatch();
+ }
+
+ public String toString() {
+ return name;
+ }
+
+ public Dispatchable getDispatchable() {
+ return dispatchable;
+ }
+
+ public void setPooledDispatchContext(PooledDispatchContext context) {
+ this.poolContext = context;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public PoolableDispatcher getDispatcher() {
+ return PriorityDispatcher.this;
+ }
+ }
+
+ public DispatchContext register(Dispatchable dispatchable, String name) {
+ return createPoolableDispatchContext(dispatchable, name);
+ }
+
+ public PoolableDispatchContext createPoolableDispatchContext(Dispatchable dispatchable, String name) {
+ return new PriorityDispatchContext(dispatchable, true, name);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.dispatch.IDispatcher#start()
+ */
+ public synchronized final void start() {
+ if (thread == null) {
+ running = true;
+ thread = new Thread(this, name);
+ thread.start();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
+ */
+ public synchronized final void shutdown() throws InterruptedException {
+ if (thread != null) {
+ dispatch(new RunnableAdapter(new Runnable() {
+
+ public void run() {
+ running = false;
+ }
+
+ }), MAX_USER_PRIORITY + 1);
+ thread.interrupt();
+ thread.join();
+ thread = null;
+ }
+ }
+
+ public void run() {
+
+ // Inform the dispatcher that we have started:
+ pooledDispatcher.onDispatcherStarted(this);
+ dispatcher.set(this);
+ PriorityDispatchContext pdc;
+ try {
+ while (running) {
+ pdc = priorityQueue.poll();
+ // If no local work available wait for foreign work:
+ if (pdc == null) {
+ foreignPermits.acquire();
+ } else {
+ pdc.poolContext.startingDispatch();
+
+ while (!pdc.dispatch()) {
+ // If there is a higher priority dispatchable stop
+ // processing this one:
+ if (pdc.listPrio < priorityQueue.getHighestPriority()) {
+ // May have gotten relinked by the caller:
+ if (!pdc.isLinked()) {
+ priorityQueue.add(pdc, pdc.listPrio);
+ }
+ break;
+ }
+ }
+
+ pdc.poolContext.finishedDispatch();
+
+ }
+
+ // Execute delayed events:
+ timerHeap.executeReadyEvents();
+
+ // Check for foreign dispatch requests:
+ if (foreignAvailable.get()) {
+ LinkedNodeList<ForeignEvent> foreign;
+ synchronized (foreignQueue) {
+ // Swap foreign queues and drain permits;
+ foreign = foreignQueue[foreignToggle];
+ foreignToggle = TOGGLE[foreignToggle];
+ foreignAvailable.set(false);
+ foreignPermits.drainPermits();
+ }
+ while (true) {
+ ForeignEvent fe = foreign.getHead();
+ if (fe == null) {
+ break;
+ }
+
+ fe.unlink();
+ fe.execute();
+ }
+
+ }
+ }
+ } catch (InterruptedException e) {
+ return;
+ } catch (Throwable thrown) {
+ thrown.printStackTrace();
+ } finally {
+ pooledDispatcher.onDispatcherStopped(this);
+ }
+ }
+
+ class ThreadSafeDispatchContext implements PooledDispatchContext {
+ final PriorityDispatchContext delegate;
+
+ ThreadSafeDispatchContext(PriorityDispatchContext context) {
+ this.delegate = context;
+ delegate.setPooledDispatchContext(this);
+ }
+
+ public void finishedDispatch() {
+ // NOOP
+
+ }
+
+ public void startingDispatch() {
+ // Noop
+
+ }
+
+ public void close() {
+ // Noop this is always transient:
+ }
+
+ public void processForeignUpdates() {
+ requestDispatch();
+ }
+
+ public Dispatchable getDispatchable() {
+ return delegate.getDispatchable();
+ }
+
+ public void requestDispatch() {
+ if (dispatcher.get() == PriorityDispatcher.this) {
+ delegate.requestDispatch();
+ } else {
+ delegate.onForeignThreadUpdate();
+ }
+ }
+
+ public void updatePriority(int priority) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ public String getName() {
+ return delegate.name;
+ }
+
+ public void assignToNewDispatcher(PoolableDispatcher newDispatcher) {
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.dispatch.IDispatcher#dispatch(org.apache.activemq
+ * .dispatch.Dispatcher.Dispatchable)
+ */
+ final void dispatch(Dispatchable dispatchable, int priority) {
+ ThreadSafeDispatchContext context = new ThreadSafeDispatchContext(new PriorityDispatchContext(dispatchable, false, name));
+ context.delegate.updatePriority(priority);
+ context.requestDispatch();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.dispatch.IDispatcher#createPriorityExecutor(int)
+ */
+ public Executor createPriorityExecutor(final int priority) {
+
+ return new Executor() {
+
+ public void execute(final Runnable runnable) {
+ dispatch(new RunnableAdapter(runnable), priority);
+ }
+ };
+ }
+
+ public void execute(final Runnable runnable) {
+ dispatch(new RunnableAdapter(runnable), 0);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable,
+ * long, java.util.concurrent.TimeUnit)
+ */
+ public void schedule(final Runnable runnable, final long delay, final TimeUnit timeUnit) {
+ if (dispatcher.get() == this) {
+ timerHeap.add(runnable, delay, timeUnit);
+ } else {
+ new ForeignEvent() {
+ public void execute() {
+ timerHeap.add(runnable, delay, timeUnit);
+ }
+ }.addToList();
+ }
+ }
+
+ public String toString() {
+ return name;
+ }
+
+ private class TimerHeap {
+
+ final TreeMap<Long, LinkedList<Runnable>> timers = new TreeMap<Long, LinkedList<Runnable>>();
+
+ private void add(Runnable runnable, long delay, TimeUnit timeUnit) {
+
+ long nanoDelay = timeUnit.convert(delay, TimeUnit.NANOSECONDS);
+ long eTime = System.nanoTime() + nanoDelay;
+ LinkedList<Runnable> list = new LinkedList<Runnable>();
+ list.add(runnable);
+
+ LinkedList<Runnable> old = timers.put(eTime, list);
+ if (old != null) {
+ list.addAll(old);
+ }
+ }
+
+ private void executeReadyEvents() {
+ LinkedList<Runnable> ready = null;
+ if (timers.isEmpty()) {
+ return;
+ } else {
+ long now = System.nanoTime();
+ long first = timers.firstKey();
+ if (first > now) {
+ return;
+ }
+ ready = new LinkedList<Runnable>();
+
+ while (first < now) {
+ ready.addAll(timers.remove(first));
+ if (timers.isEmpty()) {
+ break;
+ }
+ first = timers.firstKey();
+
+ }
+ }
+
+ for (Runnable runnable : ready) {
+ try {
+ runnable.run();
+ } catch (Throwable thrown) {
+ thrown.printStackTrace();
+ }
+ }
+ }
+ }
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java?rev=743900&r1=743899&r2=743900&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java Thu Feb 12 22:07:16 2009
@@ -21,107 +21,287 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-public class PriorityPooledDispatcher implements IDispatcher {
- private final String name;
+import org.apache.activemq.dispatch.ExecutionLoadBalancer.ExecutionTracker;
- final AtomicBoolean started = new AtomicBoolean();
- final AtomicBoolean shutdown = new AtomicBoolean();
+public class PriorityPooledDispatcher implements IDispatcher, PooledDispatcher {
+ private final String name;
- ArrayList<PriorityDispatcher> dispatchers = new ArrayList<PriorityDispatcher>();
- private int roundRobinCounter = 0;
- private final int size;
-
- private final SimpleLoadBalancer executionGraphLoadBalancer;
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.dispatch.IDispatcher#createPriorityExecutor(int)
- */
- public Executor createPriorityExecutor(final int priority) {
- return new Executor() {
- public void execute(final Runnable runnable) {
- chooseDispatcher().dispatch(new RunnableAdapter(runnable), 0);
- }
- };
- }
-
- public PriorityPooledDispatcher(String name, int size, int priorities) {
- this.name = name;
- this.size = size;
- executionGraphLoadBalancer = new SimpleLoadBalancer(name);
- // Create all the workers.
- for (int i = 0; i < size; i++) {
- PriorityDispatcher dispatcher = new PriorityDispatcher(name + "-" + (i + 1), priorities, executionGraphLoadBalancer);
- dispatchers.add(dispatcher);
- }
- }
-
- public DispatchContext register(Dispatchable dispatchable, String name) {
- return chooseDispatcher().register(dispatchable, name);
- }
-
- /**
- * @see org.apache.activemq.dispatch.IDispatcher#start()
- */
- public synchronized final void start() {
- if (started.compareAndSet(false, true)) {
- // Create all the workers.
- for (int i = 0; i < size; i++) {
- dispatchers.get(i).start();
- }
- }
- try {
- executionGraphLoadBalancer.start();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
- */
- public synchronized final void shutdown() throws InterruptedException {
- shutdown.set(true);
- for (PriorityDispatcher dispatcher : dispatchers) {
- dispatcher.shutdown();
- }
- executionGraphLoadBalancer.shutdown();
- }
-
- private PriorityDispatcher chooseDispatcher() {
- PriorityDispatcher d = PriorityDispatcher.dispatcher.get();
- if (d == null) {
- synchronized (dispatchers) {
- if (++roundRobinCounter >= size) {
- roundRobinCounter = 0;
- }
- return dispatchers.get(roundRobinCounter);
- }
- } else {
- return d;
- }
- }
-
- public void execute(final Runnable runnable) {
- chooseDispatcher().dispatch(new RunnableAdapter(runnable), 0);
- }
-
- // TODO Implement
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable,
- * long, java.util.concurrent.TimeUnit)
- */
- public void schedule(final Runnable runnable, long delay, TimeUnit timeUnit) {
- chooseDispatcher().schedule(runnable, delay, timeUnit);
- }
-
- public String toString() {
- return name;
- }
+ private static final ThreadLocal<PooledDispatchContext> dispatchContext = new ThreadLocal<PooledDispatchContext>();
+ private static final ThreadLocal<PoolableDispatcher> dispatcher = new ThreadLocal<PoolableDispatcher>();
+
+ private final ArrayList<PriorityDispatcher> dispatchers = new ArrayList<PriorityDispatcher>();
+
+ final AtomicBoolean started = new AtomicBoolean();
+ final AtomicBoolean shutdown = new AtomicBoolean();
+
+ private int roundRobinCounter = 0;
+ private final int size;
+ private final boolean DEBUG = false;
+
+ private final ExecutionLoadBalancer loadBalancer;
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.dispatch.IDispatcher#createPriorityExecutor(int)
+ */
+ public Executor createPriorityExecutor(final int priority) {
+ return new Executor() {
+ public void execute(final Runnable runnable) {
+ chooseDispatcher().dispatch(new RunnableAdapter(runnable), priority);
+ }
+ };
+ }
+
+ public PriorityPooledDispatcher(String name, int size, int priorities) {
+ this.name = name;
+ this.size = size;
+ loadBalancer = new SimpleLoadBalancer();
+ // Create all the workers.
+ for (int i = 0; i < size; i++) {
+ PriorityDispatcher dispatcher = new PriorityDispatcher(name + "-" + (i + 1), priorities, this);
+ dispatchers.add(dispatcher);
+ }
+ }
+
+ public DispatchContext register(Dispatchable dispatchable, String name) {
+ return createPooledDispatchContext(chooseDispatcher().createPoolableDispatchContext(dispatchable, name));
+ }
+
+ /**
+ * @see org.apache.activemq.dispatch.IDispatcher#start()
+ */
+ public synchronized final void start() {
+ loadBalancer.start();
+ if (started.compareAndSet(false, true)) {
+ // Create all the workers.
+ for (int i = 0; i < size; i++) {
+ dispatchers.get(i).start();
+ }
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
+ */
+ public synchronized final void shutdown() throws InterruptedException {
+ shutdown.set(true);
+ for (PriorityDispatcher dispatcher : dispatchers) {
+ dispatcher.shutdown();
+ }
+ loadBalancer.stop();
+ }
+
+ private PriorityDispatcher chooseDispatcher() {
+ PriorityDispatcher d = PriorityDispatcher.dispatcher.get();
+ if (d == null) {
+ synchronized (dispatchers) {
+ if (++roundRobinCounter >= size) {
+ roundRobinCounter = 0;
+ }
+ return dispatchers.get(roundRobinCounter);
+ }
+ } else {
+ return d;
+ }
+ }
+
+ public void execute(final Runnable runnable) {
+ chooseDispatcher().dispatch(new RunnableAdapter(runnable), 0);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable,
+ * long, java.util.concurrent.TimeUnit)
+ */
+ public void schedule(final Runnable runnable, long delay, TimeUnit timeUnit) {
+ chooseDispatcher().schedule(runnable, delay, timeUnit);
+ }
+
+ public PooledDispatchContext createPooledDispatchContext(PoolableDispatchContext context) {
+ return new PriorityPooledDispatchContext(context);
+ }
+
+ /**
+ * A Dispatcher must call this to indicate that is has started it's dispatch
+ * loop.
+ */
+ public void onDispatcherStarted(PoolableDispatcher d) {
+ dispatcher.set(d);
+ loadBalancer.addDispatcher(d);
+ }
+
+ /**
+ * A Dispatcher must call this when exiting it's dispatch loop
+ */
+ public void onDispatcherStopped(PoolableDispatcher d) {
+ loadBalancer.removeDispatcher(d);
+ }
+
+ /**
+ * ExecutionGraphNode tracks dispatch information for a
+ * MappableDispatchContext.
+ *
+ */
+ public class PriorityPooledDispatchContext implements PooledDispatchContext {
+ private final ExecutionTracker tracker;
+
+ private PoolableDispatchContext context;
+ private PoolableDispatcher currentOwner;
+ private int priority;
+ private boolean dispatchRequested = false;
+ private PoolableDispatcher updateDispatcher = null;
+ private boolean closed = false;
+
+ PriorityPooledDispatchContext(PoolableDispatchContext context) {
+ this.context = context;
+ this.context.setPooledDispatchContext(this);
+ this.currentOwner = context.getDispatcher();
+ this.tracker = loadBalancer.createExecutionTracker(this);
+
+ }
+
+ public final void startingDispatch() {
+ dispatchContext.set(this);
+ }
+
+ public final void finishedDispatch() {
+ dispatchContext.set(null);
+ }
+
+ public final void assignToNewDispatcher(PoolableDispatcher newDispatcher) {
+ synchronized (this) {
+
+ // If we're already set to this dispatcher
+ if (newDispatcher == currentOwner) {
+ if (updateDispatcher == null || updateDispatcher == newDispatcher) {
+ return;
+ }
+ }
+
+ updateDispatcher = newDispatcher;
+ if (DEBUG)
+ System.out.println(getName() + " updating to " + context.getDispatcher());
+ }
+ context.onForeignThreadUpdate();
+ }
+
+ public void requestDispatch() {
+
+ PoolableDispatcher callingDispatcher = dispatcher.get();
+
+ tracker.onDispatchRequest(callingDispatcher, dispatchContext.get());
+
+ // Otherwise this is coming off another thread, so we need to
+ // synchronize
+ // to protect against ownership changes:
+ synchronized (this) {
+ // If the owner of this context is the calling thread, then
+ // delegate to the dispatcher.
+ if (currentOwner == callingDispatcher) {
+
+ context.requestDispatch();
+ return;
+ }
+
+ dispatchRequested = true;
+ }
+ context.onForeignThreadUpdate();
+ }
+
+ public void updatePriority(int priority) {
+ if (this.priority == priority) {
+ return;
+ }
+ // Otherwise this is coming off another thread, so we need to
+ // synchronize to protect against ownership changes:
+ synchronized (this) {
+ this.priority = priority;
+
+ IDispatcher callingDispatcher = dispatcher.get();
+
+ // If the owner of this context is the calling thread, then
+ // delegate to the dispatcher.
+ if (currentOwner == callingDispatcher) {
+
+ context.updatePriority(priority);
+ return;
+ }
+ }
+ context.onForeignThreadUpdate();
+ }
+
+ public void processForeignUpdates() {
+ boolean ownerChange = false;
+ synchronized (this) {
+
+ if (closed) {
+ context.close();
+ return;
+ }
+
+ if (updateDispatcher != null) {
+ // Close the old context:
+ if (DEBUG) {
+ System.out.println("Assigning " + getName() + " to " + updateDispatcher);
+ }
+ context.close();
+
+ currentOwner = updateDispatcher;
+ updateDispatcher = null;
+ context = currentOwner.createPoolableDispatchContext(context.getDispatchable(), context.getName());
+ dispatchRequested = true;
+ context.updatePriority(priority);
+ context.setPooledDispatchContext(this);
+ ownerChange = true;
+ } else {
+ context.updatePriority(priority);
+
+ if (dispatchRequested) {
+ context.requestDispatch();
+ dispatchRequested = false;
+ }
+ }
+ }
+
+ if (ownerChange) {
+ context.onForeignThreadUpdate();
+ }
+ }
+
+ public void close() {
+ tracker.close();
+ synchronized (this) {
+ IDispatcher callingDispatcher = dispatcher.get();
+
+ // If the owner of this context is the calling thread, then
+ // delegate to the dispatcher.
+ if (currentOwner == callingDispatcher) {
+ context.close();
+ return;
+ }
+ }
+ context.onForeignThreadUpdate();
+ }
+
+ public final String toString() {
+ return context.toString();
+ }
+
+ public Dispatchable getDispatchable() {
+ return context.getDispatchable();
+ }
+
+ public String getName() {
+ return context.getName();
+ }
+ }
+
+ public String toString() {
+ return name;
+ }
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java?rev=743900&r1=743899&r2=743900&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java Thu Feb 12 22:07:16 2009
@@ -16,277 +16,120 @@
*/
package org.apache.activemq.dispatch;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
+import org.apache.activemq.dispatch.PooledDispatcher.PoolableDispatcher;
+import org.apache.activemq.dispatch.PooledDispatcher.PooledDispatchContext;
-/**
- *
- */
public class SimpleLoadBalancer implements ExecutionLoadBalancer {
- private static final ThreadLocal<ExecutionGraphNode> dispatchContext = new ThreadLocal<ExecutionGraphNode>();
- private static final ThreadLocal<PoolableDispatcher> dispatcher = new ThreadLocal<PoolableDispatcher>();
-
- private final ArrayList<PoolableDispatcher> dispatchers = new ArrayList<PoolableDispatcher>();
+ private final boolean DEBUG = false;
- private final String name;
- private final boolean DEBUG = false;
+ SimpleLoadBalancer() {
+ }
- SimpleLoadBalancer(String name) {
- this.name = name;
- }
-
- public void start() throws InterruptedException {
- }
-
- public void shutdown() {
- }
-
- public LoadBalancedDispatchContext createLoadBalancedDispatchContext(PoolableDispatchContext context) {
- ExecutionGraphNode egn = new ExecutionGraphNode(context);
- return egn;
- }
-
- public synchronized final void addDispatcher(PoolableDispatcher dispatcher) {
- dispatchers.add(dispatcher);
- }
-
- /**
- * A Dispatcher must call this to indicate that is has started it's dispatch
- * loop.
- */
- public void onDispatcherStarted(PoolableDispatcher d) {
- dispatcher.set(d);
- }
-
- /**
- * A Dispatcher must call this when exiting it's dispatch loop
- */
- public void onDispatcherStopped(PoolableDispatcher d) {
-
- }
-
- private class ExecutionGraphEdge {
- final ExecutionGraphNode target;
- final ExecutionGraphNode source;
- int count;
-
- ExecutionGraphEdge(ExecutionGraphNode source, ExecutionGraphNode target) {
- this.target = target;
- this.source = source;
- }
-
- public String toString() {
- return "Connection from: " + source + " to " + target;
- }
- }
-
- /**
- * ExecutionGraphNode tracks dispatch information for a
- * MappableDispatchContext.
- *
- */
- public class ExecutionGraphNode implements LoadBalancedDispatchContext {
- protected PoolableDispatchContext context;
- private ExecutionGraphNode singleSource;
- private final HashMap<ExecutionGraphNode, ExecutionGraphEdge> sources = new HashMap<ExecutionGraphNode, ExecutionGraphEdge>();
- protected PoolableDispatcher currentOwner;
- private final AtomicInteger work = new AtomicInteger(0);
-
- private int priority;
- private boolean dispatchRequested = false;
- private PoolableDispatcher updateDispatcher = null;
-
- ExecutionGraphNode(PoolableDispatchContext context) {
- this.context = context;
- this.context.setLoadBalancedDispatchContext(this);
- this.currentOwner = context.getDispatcher();
- if (DEBUG) {
- System.out.println(getName() + " Assigned to " + context.getDispatcher());
- }
- }
-
- public final void startingDispatch() {
- dispatchContext.set(this);
- }
-
- public final void finishedDispatch() {
- dispatchContext.set(null);
- }
-
- /**
- * This method is called to track which dispatch contexts are requesting
- * dispatch for the target context represented by this node.
- *
- * This method is not threadsafe, the caller must ensure serialized
- * access to this method.
- *
- * @param callingDispatcher
- * The calling dispatcher.
- * @return True if this method resulted in the dispatch request being
- * assigned to another dispatcher.
- */
- public final boolean onDispatchRequest(final PoolableDispatcher callingDispatcher) {
-
- /*
- * if (callingDispatcher == currentOwner) { return false; }
- */
-
- ExecutionGraphNode callingContext = dispatchContext.get();
- if (callingContext != null) {
- // Make sure we are being called by another node:
- if (callingContext == null || callingContext == context) {
- return false;
- }
-
- // Optimize for single source case:
- if (singleSource != callingContext) {
- if (singleSource == null && sources.isEmpty()) {
- singleSource = callingContext;
- ExecutionGraphEdge edge = new ExecutionGraphEdge(callingContext, this);
- sources.put(callingContext, edge);
-
- // If this context only has a single source
- // immediately assign it to the
- // dispatcher of the source:
- boolean reassigned = false;
- synchronized (this) {
- if (callingDispatcher != currentOwner && updateDispatcher == null) {
- updateDispatcher = callingDispatcher;
- reassigned = true;
- if (DEBUG)
- System.out.println("Assigning: " + this + " to " + callingContext + "'s dispatcher: " + callingDispatcher);
-
- }
- }
- if (reassigned) {
- assignToNewDispatcher(callingDispatcher);
- }
- return true;
- } else {
-
- ExecutionGraphEdge stats = sources.get(callingContext);
- if (stats == null) {
- stats = new ExecutionGraphEdge(callingContext, this);
- sources.put(callingContext, stats);
- }
-
- if (singleSource != null) {
- singleSource = null;
- }
- }
- }
- work.incrementAndGet();
- }
- return false;
- }
-
- final void assignToNewDispatcher(PoolableDispatcher newDispatcher) {
- synchronized (this) {
- if (newDispatcher != currentOwner) {
- updateDispatcher = newDispatcher;
- }
- }
- context.onForeignThreadUpdate();
- }
-
- public void requestDispatch() {
-
- PoolableDispatcher callingDispatcher = dispatcher.get();
-
- if (onDispatchRequest(callingDispatcher)) {
- return;
- }
-
- // Otherwise this is coming off another thread, so we need to
- // synchronize
- // to protect against ownership changes:
- synchronized (this) {
- // If the owner of this context is the calling thread, then
- // delegate to the dispatcher.
- if (currentOwner == callingDispatcher) {
-
- context.requestDispatch();
- return;
- }
-
- dispatchRequested = true;
- }
- context.onForeignThreadUpdate();
- }
-
- public void updatePriority(int priority) {
- if (this.priority == priority) {
- return;
- }
- // Otherwise this is coming off another thread, so we need to
- // synchronize
- // to protect against ownership changes:
- synchronized (this) {
- this.priority = priority;
-
- IDispatcher callingDispatcher = dispatcher.get();
-
- // If the owner of this context is the calling thread, then
- // delegate to the dispatcher.
- if (currentOwner == callingDispatcher) {
-
- context.updatePriority(priority);
- return;
- }
- }
- context.onForeignThreadUpdate();
- }
-
- public void processForeignUpdates() {
- boolean ownerChange = false;
- synchronized (this) {
- if (updateDispatcher != null) {
- // Close the old context:
- if (DEBUG) {
- System.out.println("Assigning " + getName() + " to " + updateDispatcher);
- }
- context.close();
-
- currentOwner = updateDispatcher;
- updateDispatcher = null;
- context = currentOwner.createPoolablDispatchContext(context.getDispatchable(), context.getName());
- dispatchRequested = true;
- context.updatePriority(priority);
- context.setLoadBalancedDispatchContext(this);
- ownerChange = true;
- } else {
- context.updatePriority(priority);
-
- if (dispatchRequested) {
- context.requestDispatch();
- dispatchRequested = false;
- }
- }
- }
-
- if (ownerChange) {
- context.onForeignThreadUpdate();
- }
- }
-
- public void close() {
- sources.clear();
- }
-
- public final String toString() {
- return context.toString();
- }
-
- public Dispatchable getDispatchable() {
- return context.getDispatchable();
- }
-
- public String getName() {
- return context.getName();
- }
- }
+ private class ExecutionStats {
+ final PooledDispatchContext target;
+ final PooledDispatchContext source;
+ int count;
+
+ ExecutionStats(PooledDispatchContext source, PooledDispatchContext target) {
+ this.target = target;
+ this.source = source;
+ }
+
+ public String toString() {
+ return "Connection from: " + source + " to " + target;
+ }
+ }
+
+ public void addDispatcher(PoolableDispatcher dispatcher) {
+
+ }
+
+ public void removeDispatcher(PoolableDispatcher dispatcher) {
+
+ }
+
+ public void start() {
+ }
+
+ public void stop() {
+ }
+
+ public ExecutionTracker createExecutionTracker(PooledDispatchContext context) {
+ return new SimpleExecutionTracker(context);
+ }
+
+ private class SimpleExecutionTracker implements ExecutionTracker {
+ private final HashMap<PooledDispatchContext, ExecutionStats> sources = new HashMap<PooledDispatchContext, ExecutionStats>();
+ private final PooledDispatchContext context;
+ private final AtomicInteger work = new AtomicInteger(0);
+
+ private PooledDispatchContext singleSource;
+ private PoolableDispatcher currentOwner;
+
+ SimpleExecutionTracker(PooledDispatchContext context) {
+ this.context = context;
+ }
+
+ /**
+ * This method is called to track which dispatch contexts are requesting
+ * dispatch for the target context represented by this node.
+ *
+ * This method is not threadsafe, the caller must ensure serialized
+ * access to this method.
+ *
+ * @param callngDispatcher
+ * The calling dispatcher.
+ * @param context
+ * the originating dispatch context
+ * @return True if this method resulted in the dispatch request being
+ * assigned to another dispatcher.
+ */
+ public void onDispatchRequest(PoolableDispatcher callingDispatcher, PooledDispatchContext callingContext) {
+
+ if (callingContext != null) {
+ // Make sure we are being called by another node:
+ if (callingContext == null || callingContext == context) {
+ return;
+ }
+
+ // Optimize for single source case:
+ if (singleSource != callingContext) {
+ if (singleSource == null && sources.isEmpty()) {
+ singleSource = callingContext;
+ ExecutionStats stats = new ExecutionStats(callingContext, context);
+ sources.put(callingContext, stats);
+
+ // If this context only has a single source
+ // assign it to that source to minimize contention:
+ if (callingDispatcher != currentOwner) {
+ currentOwner = callingDispatcher;
+ if (DEBUG)
+ System.out.println("Assigning: " + context + " to " + callingContext + "'s dispatcher: " + callingDispatcher);
+ context.assignToNewDispatcher(callingDispatcher);
+ }
+
+ } else {
+
+ ExecutionStats stats = sources.get(callingContext);
+ if (stats == null) {
+ stats = new ExecutionStats(callingContext, context);
+ sources.put(callingContext, stats);
+ }
+
+ if (singleSource != null) {
+ singleSource = null;
+ }
+ }
+ }
+ work.incrementAndGet();
+ }
+ }
+
+ public void close() {
+ }
+ }
}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=743900&r1=743899&r2=743900&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java Thu Feb 12 22:07:16 2009
@@ -47,7 +47,7 @@
boolean ptp = false;
// Set to use tcp IO
- boolean tcp = false;
+ boolean tcp = true;
// Can be set to BLOCKING, POLLING or ASYNC
public final static int DISPATCH_MODE = AbstractTestConnection.ASYNC;