You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/02/12 23:07:17 UTC

svn commit: r743900 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/dispatch/ test/java/org/apache/activemq/flow/

Author: chirino
Date: Thu Feb 12 22:07:16 2009
New Revision: 743900

URL: http://svn.apache.org/viewvc?rev=743900&view=rev
Log:
Applying colins patch https://issues.apache.org/activemq/browse/AMQ-2107


Added:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java
Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java?rev=743900&r1=743899&r2=743900&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java Thu Feb 12 22:07:16 2009
@@ -16,82 +16,24 @@
  */
 package org.apache.activemq.dispatch;
 
-import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
+import org.apache.activemq.dispatch.PooledDispatcher.PoolableDispatcher;
+import org.apache.activemq.dispatch.PooledDispatcher.PooledDispatchContext;
 
 public interface ExecutionLoadBalancer {
 
-    /**
-     * A Load Balanced Dispatch context can be moved between different
-     * dispatchers.
-     */
-    public interface LoadBalancedDispatchContext extends DispatchContext {
-        /**
-         * A dispatcher must call this when it starts dispatch for this context
-         */
-        public void startingDispatch();
-
-        /**
-         * A dispatcher must call this when it has finished dispatching a
-         * context
-         */
-        public void finishedDispatch();
-
-        /**
-		 * 
-		 */
-        public void processForeignUpdates();
-    }
-
-    public interface PoolableDispatchContext extends DispatchContext {
-
-        public void setLoadBalancedDispatchContext(LoadBalancedDispatchContext context);
-
-        /**
-         * Indicates that another thread has made an update to the dispatch
-         * context.
-         * 
-         */
-        public void onForeignThreadUpdate();
-
-        public PoolableDispatcher getDispatcher();
-    }
-
-    public interface PoolableDispatcher extends IDispatcher {
-
-        /**
-         * Indicates that another thread has made an update to the dispatch
-         * context.
-         * 
-         */
-        public PoolableDispatchContext createPoolablDispatchContext(Dispatchable dispatchable, String name);
-    }
-
-    /**
-     * This wraps the dispatch context into one that is load balanced by the
-     * LoadBalancer
-     * 
-     * @param context
-     *            The context to wrap.
-     * @return
-     */
-    public LoadBalancedDispatchContext createLoadBalancedDispatchContext(PoolableDispatchContext context);
-
-    /**
-     * Adds a Dispatcher to the list of dispatchers managed by the load balancer
-     * 
-     * @param dispatcher
-     */
-    public void addDispatcher(PoolableDispatcher dispatcher);
-
-    /**
-     * A Dispatcher must call this from it's dispatcher thread to indicate that
-     * is has started it's dispatch has started.
-     */
-    public void onDispatcherStarted(PoolableDispatcher dispatcher);
-
-    /**
-     * A Dispatcher must call this from it's dispatcher thread when exiting it's
-     * dispatch loop
-     */
-    public void onDispatcherStopped(PoolableDispatcher dispatcher);
+	public interface ExecutionTracker {
+		public void onDispatchRequest(PoolableDispatcher caller, PooledDispatchContext context);
+
+		public void close();
+	}
+
+	public void addDispatcher(PoolableDispatcher dispatcher);
+
+	public void removeDispatcher(PoolableDispatcher dispatcher);
+
+	public ExecutionTracker createExecutionTracker(PooledDispatchContext context);
+
+	public void start();
+
+	public void stop();
 }

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java?rev=743900&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java Thu Feb 12 22:07:16 2009
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
+
+public interface PooledDispatcher {
+
+	/**
+	 * A {@link PooledDispatchContext}s can be moved between different
+	 * dispatchers.
+	 */
+	public interface PooledDispatchContext extends DispatchContext {
+		/**
+		 * Called to transfer a {@link PooledDispatchContext} to a new
+		 * Dispatcher.
+		 */
+		public void assignToNewDispatcher(PoolableDispatcher newDispatcher);
+
+		/**
+		 * A dispatcher must call this when it starts dispatch for this context
+		 */
+		public void startingDispatch();
+
+		/**
+		 * A dispatcher must call this when it has finished dispatching a
+		 * context
+		 */
+		public void finishedDispatch();
+
+		/**
+		 * Called by the dispatch thread to let the pooled context set any info
+		 * set by other threads.
+		 */
+		public void processForeignUpdates();
+	}
+
+	public interface PoolableDispatchContext extends DispatchContext {
+
+		public void setPooledDispatchContext(PooledDispatchContext context);
+
+		/**
+		 * Indicates that another thread has made an update to the dispatch
+		 * context.
+		 * 
+		 */
+		public void onForeignThreadUpdate();
+
+		public PoolableDispatcher getDispatcher();
+	}
+
+	/**
+	 * A PoolableDispatcher is one that can be owned by an
+	 * {@link PooledDispatcher}.
+	 */
+	public interface PoolableDispatcher extends IDispatcher {
+
+		/**
+		 * Indicates that another thread has made an update to the dispatch
+		 * context.
+		 * 
+		 */
+		public PoolableDispatchContext createPoolableDispatchContext(Dispatchable dispatchable, String name);
+	}
+
+	/**
+	 * This wraps the dispatch context into one that is load balanced by the
+	 * LoadBalancer
+	 * 
+	 * @param context
+	 *            The context to wrap.
+	 * @return
+	 */
+	public PooledDispatchContext createPooledDispatchContext(PoolableDispatchContext context);
+
+	/**
+	 * A Dispatcher must call this from it's dispatcher thread to indicate that
+	 * is has started it's dispatch has started.
+	 */
+	public void onDispatcherStarted(PoolableDispatcher dispatcher);
+
+	/**
+	 * A Dispatcher must call this from it's dispatcher thread when exiting it's
+	 * dispatch loop
+	 */
+	public void onDispatcherStopped(PoolableDispatcher dispatcher);
+}

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java?rev=743900&r1=743899&r2=743900&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java Thu Feb 12 22:07:16 2009
@@ -23,431 +23,442 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.activemq.dispatch.ExecutionLoadBalancer.LoadBalancedDispatchContext;
-import org.apache.activemq.dispatch.ExecutionLoadBalancer.PoolableDispatchContext;
-import org.apache.activemq.dispatch.ExecutionLoadBalancer.PoolableDispatcher;
+import org.apache.activemq.dispatch.PooledDispatcher.PoolableDispatchContext;
+import org.apache.activemq.dispatch.PooledDispatcher.PoolableDispatcher;
+import org.apache.activemq.dispatch.PooledDispatcher.PooledDispatchContext;
 import org.apache.activemq.queue.Mapper;
 import org.apache.kahadb.util.LinkedNode;
 import org.apache.kahadb.util.LinkedNodeList;
 
 public class PriorityDispatcher implements Runnable, PoolableDispatcher {
 
-    private Thread thread;
-    private boolean running = false;
-    private boolean threaded = false;
-    private final int MAX_USER_PRIORITY;
-
-    static final ThreadLocal<PriorityDispatcher> dispatcher = new ThreadLocal<PriorityDispatcher>();
-
-    private final ExecutionLoadBalancer loadBalancer;
-
-    // The local dispatch queue:
-    private final PriorityLinkedList<PriorityDispatchContext> priorityQueue;
-
-    // Dispatch queue for requests from other threads:
-    private final LinkedNodeList<ForeignEvent> foreignQueue = new LinkedNodeList<ForeignEvent>();
-
-    // Timed Execution List
-    private final TimerHeap timerHeap = new TimerHeap();
-
-    private final String name;
-    private final AtomicBoolean foreignAvailable = new AtomicBoolean(false);
-    private final Semaphore foreignPermits = new Semaphore(0);
-
-    private final Mapper<Integer, PriorityDispatchContext> PRIORITY_MAPPER = new Mapper<Integer, PriorityDispatchContext>() {
-        public Integer map(PriorityDispatchContext element) {
-            return element.listPrio;
-        }
-    };
-
-    public PriorityDispatcher(String name, int priorities, ExecutionLoadBalancer loadBalancer) {
-        this.name = name;
-        MAX_USER_PRIORITY = priorities;
-        priorityQueue = new PriorityLinkedList<PriorityDispatchContext>(MAX_USER_PRIORITY + 1, PRIORITY_MAPPER);
-        this.loadBalancer = loadBalancer;
-        loadBalancer.addDispatcher(this);
-    }
-
-    private abstract class ForeignEvent extends LinkedNode<ForeignEvent> {
-        public abstract void execute();
-
-        final void addToList() {
-            synchronized (foreignQueue) {
-                if (!this.isLinked()) {
-                    foreignQueue.addLast(this);
-                    if (!foreignAvailable.getAndSet(true)) {
-                        foreignPermits.release();
-                    }
-                }
-            }
-        }
-    }
-
-    public boolean isThreaded() {
-        return threaded;
-    }
-
-    public void setThreaded(boolean threaded) {
-        this.threaded = threaded;
-    }
-
-    private class UpdateEvent extends ForeignEvent {
-        private final PriorityDispatchContext pdc;
-
-        UpdateEvent(PriorityDispatchContext pdc) {
-            this.pdc = pdc;
-        }
-
-        // Can only be called by the owner of this dispatch context:
-        public void execute() {
-            pdc.lbContext.processForeignUpdates();
-        }
-    }
-
-    class PriorityDispatchContext extends LinkedNode<PriorityDispatchContext> implements PoolableDispatchContext {
-        // The dispatchable target:
-        final Dispatchable dispatchable;
-        LoadBalancedDispatchContext lbContext;
-        // The name of this context:
-        final String name;
-        // list prio can only be updated in the thread of of this dispatcher:
-        int listPrio;
-        // The update event is used to update fields in the dispatch context
-        // from foreign threads:
-        final UpdateEvent updateEvent = new UpdateEvent(this);
-
-        // Marked by the caller when this is closed.
-        boolean closed = false;
-
-        private PriorityDispatchContext(Dispatchable dispatchable, boolean persistent, String name) {
-            super();
-            this.dispatchable = dispatchable;
-            this.name = name;
-        }
-
-        // The load balancer will guarantee that this is on our thread:
-        public final void requestDispatch() {
-            if (!isLinked()) {
-                priorityQueue.add(this, listPrio);
-            }
-            return;
-        }
-
-        // The load balancer guarantees that this is called on our thread:
-        public final void updatePriority(int priority) {
-            if (priority != listPrio) {
-
-                listPrio = priority;
-                // If there is a priority change relink the context
-                // at the new priority:
-                if (isLinked()) {
-                    unlink();
-                    priorityQueue.add(this, listPrio);
-                }
-            }
-            return;
-
-        }
-
-        public void onForeignThreadUpdate() {
-            updateEvent.addToList();
-        }
-
-        // Will only be called on this thread:
-        public void close() {
-            if (isLinked()) {
-                unlink();
-            }
-            synchronized (foreignQueue) {
-                if (updateEvent.isLinked()) {
-                    updateEvent.unlink();
-                }
-            }
-
-            closed = true;
-        }
-
-        /**
-         * This can only be called by the owning dispatch thread:
-         * 
-         * @return False if the dispatchable has more work to do.
-         */
-        public final boolean dispatch() {
-            return dispatchable.dispatch();
-        }
-
-        public String toString() {
-            return name;
-        }
-
-        public Dispatchable getDispatchable() {
-            return dispatchable;
-        }
-
-        public void setLoadBalancedDispatchContext(LoadBalancedDispatchContext context) {
-            this.lbContext = context;
-        }
-
-        public String getName() {
-            return name;
-        }
-
-        public PoolableDispatcher getDispatcher() {
-            return PriorityDispatcher.this;
-        }
-    }
-
-    public DispatchContext register(Dispatchable dispatchable, String name) {
-        return loadBalancer.createLoadBalancedDispatchContext(createPoolablDispatchContext(dispatchable, name));
-    }
-
-    public PoolableDispatchContext createPoolablDispatchContext(Dispatchable dispatchable, String name) {
-        return new PriorityDispatchContext(dispatchable, true, name);
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.dispatch.IDispatcher#start()
-     */
-    public synchronized final void start() {
-        if (thread == null) {
-            running = true;
-            thread = new Thread(this, name);
-            thread.start();
-        }
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
-     */
-    public synchronized final void shutdown() throws InterruptedException {
-        if (thread != null) {
-            dispatch(new RunnableAdapter(new Runnable() {
-
-                public void run() {
-                    running = false;
-                }
-
-            }), MAX_USER_PRIORITY + 1);
-            thread.interrupt();
-            thread.join();
-            thread = null;
-        }
-    }
-
-    public void run() {
-
-        // Inform the dispatcher that we have started:
-        loadBalancer.onDispatcherStarted(this);
-        dispatcher.set(this);
-        PriorityDispatchContext pdc;
-        try {
-            while (running) {
-                pdc = priorityQueue.poll();
-                // If no local work available wait for foreign work:
-                if (pdc == null) {
-                    foreignPermits.acquire();
-                } else {
-                    pdc.lbContext.startingDispatch();
-
-                    while (!pdc.dispatch()) {
-                        // If there is a higher priority dispatchable stop
-                        // processing this one:
-                        if (pdc.listPrio < priorityQueue.getHighestPriority()) {
-                            // May have gotten relinked by the caller:
-                            if (!pdc.isLinked()) {
-                                priorityQueue.add(pdc, pdc.listPrio);
-                            }
-                            break;
-                        }
-                    }
-
-                    pdc.lbContext.finishedDispatch();
-
-                }
-
-                // Execute delayed events:
-                timerHeap.executeReadyEvents();
-
-                // Check for foreign dispatch requests:
-                if (foreignAvailable.get()) {
-                    synchronized (foreignQueue) {
-                        // Drain the foreign queue:
-                        while (true) {
-                            ForeignEvent fe = foreignQueue.getHead();
-                            // TODO should probably swap foreign queue here:
-                            if (fe == null) {
-                                foreignAvailable.set(false);
-                                foreignPermits.drainPermits();
-                                break;
-                            }
-
-                            fe.unlink();
-                            fe.execute();
-                        }
-                    }
-                }
-            }
-        } catch (InterruptedException e) {
-            return;
-        } catch (Throwable thrown) {
-            thrown.printStackTrace();
-        } finally {
-            loadBalancer.onDispatcherStopped(this);
-        }
-    }
-
-    class ThreadSafeDispatchContext implements LoadBalancedDispatchContext {
-        final PriorityDispatchContext delegate;
-
-        ThreadSafeDispatchContext(PriorityDispatchContext context) {
-            this.delegate = context;
-            delegate.setLoadBalancedDispatchContext(this);
-        }
-
-        public void finishedDispatch() {
-            // NOOP
-
-        }
-
-        public void startingDispatch() {
-            // Noop
-
-        }
-
-        public void close() {
-            // Noop this is always transient:
-        }
-
-        public void processForeignUpdates() {
-            requestDispatch();
-        }
-
-        public Dispatchable getDispatchable() {
-            return delegate.getDispatchable();
-        }
-
-        public void requestDispatch() {
-            if (dispatcher.get() == PriorityDispatcher.this) {
-                delegate.requestDispatch();
-            } else {
-                delegate.onForeignThreadUpdate();
-            }
-        }
-
-        public void updatePriority(int priority) {
-            throw new UnsupportedOperationException("Not implemented");
-        }
-
-        public String getName() {
-            return delegate.name;
-        }
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * org.apache.activemq.dispatch.IDispatcher#dispatch(org.apache.activemq
-     * .dispatch.Dispatcher.Dispatchable)
-     */
-    final void dispatch(Dispatchable dispatchable, int priority) {
-        ThreadSafeDispatchContext context = new ThreadSafeDispatchContext(new PriorityDispatchContext(dispatchable, false, name));
-        context.delegate.updatePriority(priority);
-        context.requestDispatch();
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.dispatch.IDispatcher#createPriorityExecutor(int)
-     */
-    public Executor createPriorityExecutor(final int priority) {
-
-        return new Executor() {
-
-            public void execute(final Runnable runnable) {
-                dispatch(new RunnableAdapter(runnable), priority);
-            }
-        };
-    }
-
-    public void execute(final Runnable runnable) {
-        dispatch(new RunnableAdapter(runnable), 0);
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable,
-     * long, java.util.concurrent.TimeUnit)
-     */
-    public void schedule(final Runnable runnable, final long delay, final TimeUnit timeUnit) {
-        if (dispatcher.get() == this) {
-            timerHeap.add(runnable, delay, timeUnit);
-        } else {
-            new ForeignEvent() {
-                public void execute() {
-                    timerHeap.add(runnable, delay, timeUnit);
-                }
-            }.addToList();
-        }
-    }
-
-    public String toString() {
-        return name;
-    }
-
-    private class TimerHeap {
-
-        final TreeMap<Long, LinkedList<Runnable>> timers = new TreeMap<Long, LinkedList<Runnable>>();
-
-        private void add(Runnable runnable, long delay, TimeUnit timeUnit) {
-
-            long nanoDelay = timeUnit.convert(delay, TimeUnit.NANOSECONDS);
-            long eTime = System.nanoTime() + nanoDelay;
-            LinkedList<Runnable> list = new LinkedList<Runnable>();
-            list.add(runnable);
-
-            LinkedList<Runnable> old = timers.put(eTime, list);
-            if (old != null) {
-                list.addAll(old);
-            }
-        }
-
-        private void executeReadyEvents() {
-            LinkedList<Runnable> ready = null;
-            if (timers.isEmpty()) {
-                return;
-            } else {
-                long now = System.nanoTime();
-                long first = timers.firstKey();
-                if (first > now) {
-                    return;
-                }
-                ready = new LinkedList<Runnable>();
-
-                while (first < now) {
-                    ready.addAll(timers.remove(first));
-                    if (timers.isEmpty()) {
-                        break;
-                    }
-                    first = timers.firstKey();
-
-                }
-            }
-
-            for (Runnable runnable : ready) {
-                try {
-                    runnable.run();
-                } catch (Throwable thrown) {
-                    thrown.printStackTrace();
-                }
-            }
-        }
-    }
+	private Thread thread;
+	private boolean running = false;
+	private boolean threaded = false;
+	private final int MAX_USER_PRIORITY;
+
+	static final ThreadLocal<PriorityDispatcher> dispatcher = new ThreadLocal<PriorityDispatcher>();
+
+	private final PooledDispatcher pooledDispatcher;
+
+	// The local dispatch queue:
+	private final PriorityLinkedList<PriorityDispatchContext> priorityQueue;
+
+	// Dispatch queue for requests from other threads:
+	private final LinkedNodeList<ForeignEvent>[] foreignQueue;
+	private static final int[] TOGGLE = new int[] { 1, 0 };
+	private int foreignToggle = 0;
+
+	// Timed Execution List
+	private final TimerHeap timerHeap = new TimerHeap();
+
+	private final String name;
+	private final AtomicBoolean foreignAvailable = new AtomicBoolean(false);
+	private final Semaphore foreignPermits = new Semaphore(0);
+
+	private final Mapper<Integer, PriorityDispatchContext> PRIORITY_MAPPER = new Mapper<Integer, PriorityDispatchContext>() {
+		public Integer map(PriorityDispatchContext element) {
+			return element.listPrio;
+		}
+	};
+
+	public PriorityDispatcher(String name, int priorities, PooledDispatcher pooledDispactcher) {
+		this.name = name;
+		MAX_USER_PRIORITY = priorities;
+		priorityQueue = new PriorityLinkedList<PriorityDispatchContext>(MAX_USER_PRIORITY + 1, PRIORITY_MAPPER);
+		foreignQueue = createForeignEventQueue();
+		for (int i = 0; i < 2; i++) {
+			foreignQueue[i] = new LinkedNodeList<ForeignEvent>();
+		}
+		this.pooledDispatcher = pooledDispactcher;
+	}
+
+	@SuppressWarnings("unchecked")
+	private LinkedNodeList<ForeignEvent>[] createForeignEventQueue() {
+		return new LinkedNodeList[2];
+	}
+
+	private abstract class ForeignEvent extends LinkedNode<ForeignEvent> {
+		public abstract void execute();
+
+		final void addToList() {
+			synchronized (foreignQueue) {
+				if (!this.isLinked()) {
+					foreignQueue[foreignToggle].addLast(this);
+					if (!foreignAvailable.getAndSet(true)) {
+						foreignPermits.release();
+					}
+				}
+			}
+		}
+	}
+
+	public boolean isThreaded() {
+		return threaded;
+	}
+
+	public void setThreaded(boolean threaded) {
+		this.threaded = threaded;
+	}
+
+	private class UpdateEvent extends ForeignEvent {
+		private final PriorityDispatchContext pdc;
+
+		UpdateEvent(PriorityDispatchContext pdc) {
+			this.pdc = pdc;
+		}
+
+		// Can only be called by the owner of this dispatch context:
+		public void execute() {
+			pdc.poolContext.processForeignUpdates();
+		}
+	}
+
+	class PriorityDispatchContext extends LinkedNode<PriorityDispatchContext> implements PoolableDispatchContext {
+		// The dispatchable target:
+		final Dispatchable dispatchable;
+		PooledDispatchContext poolContext;
+		// The name of this context:
+		final String name;
+		// list prio can only be updated in the thread of of this dispatcher:
+		int listPrio;
+		// The update events are used to update fields in the dispatch context
+		// from foreign threads:
+		final UpdateEvent updateEvent[] = new UpdateEvent[] { new UpdateEvent(this), new UpdateEvent(this) };
+
+		private PriorityDispatchContext(Dispatchable dispatchable, boolean persistent, String name) {
+			super();
+			this.dispatchable = dispatchable;
+			this.name = name;
+		}
+
+		// This can only be called on this thread
+		public final void requestDispatch() {
+			if (!isLinked()) {
+				priorityQueue.add(this, listPrio);
+			}
+			return;
+		}
+
+		// This can only be called on this thread
+		public final void updatePriority(int priority) {
+			if (priority != listPrio) {
+
+				listPrio = priority;
+				// If there is a priority change relink the context
+				// at the new priority:
+				if (isLinked()) {
+					unlink();
+					priorityQueue.add(this, listPrio);
+				}
+			}
+			return;
+
+		}
+
+		public void onForeignThreadUpdate() {
+			synchronized (foreignQueue) {
+				updateEvent[foreignToggle].addToList();
+			}
+		}
+
+		// This can only be called on this thread
+		public void close() {
+			if (isLinked()) {
+				unlink();
+			}
+			synchronized (foreignQueue) {
+				if (updateEvent[foreignToggle].isLinked()) {
+					updateEvent[foreignToggle].unlink();
+				}
+			}
+		}
+
+		/**
+		 * This can only be called by the owning dispatch thread:
+		 * 
+		 * @return False if the dispatchable has more work to do.
+		 */
+		public final boolean dispatch() {
+			return dispatchable.dispatch();
+		}
+
+		public String toString() {
+			return name;
+		}
+
+		public Dispatchable getDispatchable() {
+			return dispatchable;
+		}
+
+		public void setPooledDispatchContext(PooledDispatchContext context) {
+			this.poolContext = context;
+		}
+
+		public String getName() {
+			return name;
+		}
+
+		public PoolableDispatcher getDispatcher() {
+			return PriorityDispatcher.this;
+		}
+	}
+
+	public DispatchContext register(Dispatchable dispatchable, String name) {
+		return createPoolableDispatchContext(dispatchable, name);
+	}
+
+	public PoolableDispatchContext createPoolableDispatchContext(Dispatchable dispatchable, String name) {
+		return new PriorityDispatchContext(dispatchable, true, name);
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.activemq.dispatch.IDispatcher#start()
+	 */
+	public synchronized final void start() {
+		if (thread == null) {
+			running = true;
+			thread = new Thread(this, name);
+			thread.start();
+		}
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
+	 */
+	public synchronized final void shutdown() throws InterruptedException {
+		if (thread != null) {
+			dispatch(new RunnableAdapter(new Runnable() {
+
+				public void run() {
+					running = false;
+				}
+
+			}), MAX_USER_PRIORITY + 1);
+			thread.interrupt();
+			thread.join();
+			thread = null;
+		}
+	}
+
+	public void run() {
+
+		// Inform the dispatcher that we have started:
+		pooledDispatcher.onDispatcherStarted(this);
+		dispatcher.set(this);
+		PriorityDispatchContext pdc;
+		try {
+			while (running) {
+				pdc = priorityQueue.poll();
+				// If no local work available wait for foreign work:
+				if (pdc == null) {
+					foreignPermits.acquire();
+				} else {
+					pdc.poolContext.startingDispatch();
+
+					while (!pdc.dispatch()) {
+						// If there is a higher priority dispatchable stop
+						// processing this one:
+						if (pdc.listPrio < priorityQueue.getHighestPriority()) {
+							// May have gotten relinked by the caller:
+							if (!pdc.isLinked()) {
+								priorityQueue.add(pdc, pdc.listPrio);
+							}
+							break;
+						}
+					}
+
+					pdc.poolContext.finishedDispatch();
+
+				}
+
+				// Execute delayed events:
+				timerHeap.executeReadyEvents();
+
+				// Check for foreign dispatch requests:
+				if (foreignAvailable.get()) {
+					LinkedNodeList<ForeignEvent> foreign;
+					synchronized (foreignQueue) {
+						// Swap foreign queues and drain permits;
+						foreign = foreignQueue[foreignToggle];
+						foreignToggle = TOGGLE[foreignToggle];
+						foreignAvailable.set(false);
+						foreignPermits.drainPermits();
+					}
+					while (true) {
+						ForeignEvent fe = foreign.getHead();
+						if (fe == null) {
+							break;
+						}
+
+						fe.unlink();
+						fe.execute();
+					}
+
+				}
+			}
+		} catch (InterruptedException e) {
+			return;
+		} catch (Throwable thrown) {
+			thrown.printStackTrace();
+		} finally {
+			pooledDispatcher.onDispatcherStopped(this);
+		}
+	}
+
+	class ThreadSafeDispatchContext implements PooledDispatchContext {
+		final PriorityDispatchContext delegate;
+
+		ThreadSafeDispatchContext(PriorityDispatchContext context) {
+			this.delegate = context;
+			delegate.setPooledDispatchContext(this);
+		}
+
+		public void finishedDispatch() {
+			// NOOP
+
+		}
+
+		public void startingDispatch() {
+			// Noop
+
+		}
+
+		public void close() {
+			// Noop this is always transient:
+		}
+
+		public void processForeignUpdates() {
+			requestDispatch();
+		}
+
+		public Dispatchable getDispatchable() {
+			return delegate.getDispatchable();
+		}
+
+		public void requestDispatch() {
+			if (dispatcher.get() == PriorityDispatcher.this) {
+				delegate.requestDispatch();
+			} else {
+				delegate.onForeignThreadUpdate();
+			}
+		}
+
+		public void updatePriority(int priority) {
+			throw new UnsupportedOperationException("Not implemented");
+		}
+
+		public String getName() {
+			return delegate.name;
+		}
+
+		public void assignToNewDispatcher(PoolableDispatcher newDispatcher) {
+		}
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.activemq.dispatch.IDispatcher#dispatch(org.apache.activemq
+	 *      .dispatch.Dispatcher.Dispatchable)
+	 */
+	final void dispatch(Dispatchable dispatchable, int priority) {
+		ThreadSafeDispatchContext context = new ThreadSafeDispatchContext(new PriorityDispatchContext(dispatchable, false, name));
+		context.delegate.updatePriority(priority);
+		context.requestDispatch();
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.activemq.dispatch.IDispatcher#createPriorityExecutor(int)
+	 */
+	public Executor createPriorityExecutor(final int priority) {
+
+		return new Executor() {
+
+			public void execute(final Runnable runnable) {
+				dispatch(new RunnableAdapter(runnable), priority);
+			}
+		};
+	}
+
+	public void execute(final Runnable runnable) {
+		dispatch(new RunnableAdapter(runnable), 0);
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable,
+	 *      long, java.util.concurrent.TimeUnit)
+	 */
+	public void schedule(final Runnable runnable, final long delay, final TimeUnit timeUnit) {
+		if (dispatcher.get() == this) {
+			timerHeap.add(runnable, delay, timeUnit);
+		} else {
+			new ForeignEvent() {
+				public void execute() {
+					timerHeap.add(runnable, delay, timeUnit);
+				}
+			}.addToList();
+		}
+	}
+
+	public String toString() {
+		return name;
+	}
+
+	private class TimerHeap {
+
+		final TreeMap<Long, LinkedList<Runnable>> timers = new TreeMap<Long, LinkedList<Runnable>>();
+
+		private void add(Runnable runnable, long delay, TimeUnit timeUnit) {
+
+			long nanoDelay = timeUnit.convert(delay, TimeUnit.NANOSECONDS);
+			long eTime = System.nanoTime() + nanoDelay;
+			LinkedList<Runnable> list = new LinkedList<Runnable>();
+			list.add(runnable);
+
+			LinkedList<Runnable> old = timers.put(eTime, list);
+			if (old != null) {
+				list.addAll(old);
+			}
+		}
+
+		private void executeReadyEvents() {
+			LinkedList<Runnable> ready = null;
+			if (timers.isEmpty()) {
+				return;
+			} else {
+				long now = System.nanoTime();
+				long first = timers.firstKey();
+				if (first > now) {
+					return;
+				}
+				ready = new LinkedList<Runnable>();
+
+				while (first < now) {
+					ready.addAll(timers.remove(first));
+					if (timers.isEmpty()) {
+						break;
+					}
+					first = timers.firstKey();
+
+				}
+			}
+
+			for (Runnable runnable : ready) {
+				try {
+					runnable.run();
+				} catch (Throwable thrown) {
+					thrown.printStackTrace();
+				}
+			}
+		}
+	}
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java?rev=743900&r1=743899&r2=743900&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java Thu Feb 12 22:07:16 2009
@@ -21,107 +21,287 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-public class PriorityPooledDispatcher implements IDispatcher {
-    private final String name;
+import org.apache.activemq.dispatch.ExecutionLoadBalancer.ExecutionTracker;
 
-    final AtomicBoolean started = new AtomicBoolean();
-    final AtomicBoolean shutdown = new AtomicBoolean();
+public class PriorityPooledDispatcher implements IDispatcher, PooledDispatcher {
+	private final String name;
 
-    ArrayList<PriorityDispatcher> dispatchers = new ArrayList<PriorityDispatcher>();
-    private int roundRobinCounter = 0;
-    private final int size;
-
-    private final SimpleLoadBalancer executionGraphLoadBalancer;
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.dispatch.IDispatcher#createPriorityExecutor(int)
-     */
-    public Executor createPriorityExecutor(final int priority) {
-        return new Executor() {
-            public void execute(final Runnable runnable) {
-                chooseDispatcher().dispatch(new RunnableAdapter(runnable), 0);
-            }
-        };
-    }
-
-    public PriorityPooledDispatcher(String name, int size, int priorities) {
-        this.name = name;
-        this.size = size;
-        executionGraphLoadBalancer = new SimpleLoadBalancer(name);
-        // Create all the workers.
-        for (int i = 0; i < size; i++) {
-            PriorityDispatcher dispatcher = new PriorityDispatcher(name + "-" + (i + 1), priorities, executionGraphLoadBalancer);
-            dispatchers.add(dispatcher);
-        }
-    }
-
-    public DispatchContext register(Dispatchable dispatchable, String name) {
-        return chooseDispatcher().register(dispatchable, name);
-    }
-
-    /**
-     * @see org.apache.activemq.dispatch.IDispatcher#start()
-     */
-    public synchronized final void start() {
-        if (started.compareAndSet(false, true)) {
-            // Create all the workers.
-            for (int i = 0; i < size; i++) {
-                dispatchers.get(i).start();
-            }
-        }
-        try {
-            executionGraphLoadBalancer.start();
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
-     */
-    public synchronized final void shutdown() throws InterruptedException {
-        shutdown.set(true);
-        for (PriorityDispatcher dispatcher : dispatchers) {
-            dispatcher.shutdown();
-        }
-        executionGraphLoadBalancer.shutdown();
-    }
-
-    private PriorityDispatcher chooseDispatcher() {
-        PriorityDispatcher d = PriorityDispatcher.dispatcher.get();
-        if (d == null) {
-            synchronized (dispatchers) {
-                if (++roundRobinCounter >= size) {
-                    roundRobinCounter = 0;
-                }
-                return dispatchers.get(roundRobinCounter);
-            }
-        } else {
-            return d;
-        }
-    }
-
-    public void execute(final Runnable runnable) {
-        chooseDispatcher().dispatch(new RunnableAdapter(runnable), 0);
-    }
-
-    // TODO Implement
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable,
-     * long, java.util.concurrent.TimeUnit)
-     */
-    public void schedule(final Runnable runnable, long delay, TimeUnit timeUnit) {
-        chooseDispatcher().schedule(runnable, delay, timeUnit);
-    }
-
-    public String toString() {
-        return name;
-    }
+	private static final ThreadLocal<PooledDispatchContext> dispatchContext = new ThreadLocal<PooledDispatchContext>();
+	private static final ThreadLocal<PoolableDispatcher> dispatcher = new ThreadLocal<PoolableDispatcher>();
+
+	private final ArrayList<PriorityDispatcher> dispatchers = new ArrayList<PriorityDispatcher>();
+
+	final AtomicBoolean started = new AtomicBoolean();
+	final AtomicBoolean shutdown = new AtomicBoolean();
+
+	private int roundRobinCounter = 0;
+	private final int size;
+	private final boolean DEBUG = false;
+
+	private final ExecutionLoadBalancer loadBalancer;
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.activemq.dispatch.IDispatcher#createPriorityExecutor(int)
+	 */
+	public Executor createPriorityExecutor(final int priority) {
+		return new Executor() {
+			public void execute(final Runnable runnable) {
+				chooseDispatcher().dispatch(new RunnableAdapter(runnable), priority);
+			}
+		};
+	}
+
+	public PriorityPooledDispatcher(String name, int size, int priorities) {
+		this.name = name;
+		this.size = size;
+		loadBalancer = new SimpleLoadBalancer();
+		// Create all the workers.
+		for (int i = 0; i < size; i++) {
+			PriorityDispatcher dispatcher = new PriorityDispatcher(name + "-" + (i + 1), priorities, this);
+			dispatchers.add(dispatcher);
+		}
+	}
+
+	public DispatchContext register(Dispatchable dispatchable, String name) {
+		return createPooledDispatchContext(chooseDispatcher().createPoolableDispatchContext(dispatchable, name));
+	}
+
+	/**
+	 * @see org.apache.activemq.dispatch.IDispatcher#start()
+	 */
+	public synchronized final void start() {
+		loadBalancer.start();
+		if (started.compareAndSet(false, true)) {
+			// Create all the workers.
+			for (int i = 0; i < size; i++) {
+				dispatchers.get(i).start();
+			}
+		}
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
+	 */
+	public synchronized final void shutdown() throws InterruptedException {
+		shutdown.set(true);
+		for (PriorityDispatcher dispatcher : dispatchers) {
+			dispatcher.shutdown();
+		}
+		loadBalancer.stop();
+	}
+
+	private PriorityDispatcher chooseDispatcher() {
+		PriorityDispatcher d = PriorityDispatcher.dispatcher.get();
+		if (d == null) {
+			synchronized (dispatchers) {
+				if (++roundRobinCounter >= size) {
+					roundRobinCounter = 0;
+				}
+				return dispatchers.get(roundRobinCounter);
+			}
+		} else {
+			return d;
+		}
+	}
+
+	public void execute(final Runnable runnable) {
+		chooseDispatcher().dispatch(new RunnableAdapter(runnable), 0);
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable,
+	 *      long, java.util.concurrent.TimeUnit)
+	 */
+	public void schedule(final Runnable runnable, long delay, TimeUnit timeUnit) {
+		chooseDispatcher().schedule(runnable, delay, timeUnit);
+	}
+
+	public PooledDispatchContext createPooledDispatchContext(PoolableDispatchContext context) {
+		return new PriorityPooledDispatchContext(context);
+	}
+
+	/**
+	 * A Dispatcher must call this to indicate that is has started it's dispatch
+	 * loop.
+	 */
+	public void onDispatcherStarted(PoolableDispatcher d) {
+		dispatcher.set(d);
+		loadBalancer.addDispatcher(d);
+	}
+
+	/**
+	 * A Dispatcher must call this when exiting it's dispatch loop
+	 */
+	public void onDispatcherStopped(PoolableDispatcher d) {
+		loadBalancer.removeDispatcher(d);
+	}
+
+	/**
+	 * ExecutionGraphNode tracks dispatch information for a
+	 * MappableDispatchContext.
+	 * 
+	 */
+	public class PriorityPooledDispatchContext implements PooledDispatchContext {
+		private final ExecutionTracker tracker;
+
+		private PoolableDispatchContext context;
+		private PoolableDispatcher currentOwner;
+		private int priority;
+		private boolean dispatchRequested = false;
+		private PoolableDispatcher updateDispatcher = null;
+		private boolean closed = false;
+
+		PriorityPooledDispatchContext(PoolableDispatchContext context) {
+			this.context = context;
+			this.context.setPooledDispatchContext(this);
+			this.currentOwner = context.getDispatcher();
+			this.tracker = loadBalancer.createExecutionTracker(this);
+
+		}
+
+		public final void startingDispatch() {
+			dispatchContext.set(this);
+		}
+
+		public final void finishedDispatch() {
+			dispatchContext.set(null);
+		}
+
+		public final void assignToNewDispatcher(PoolableDispatcher newDispatcher) {
+			synchronized (this) {
+
+				// If we're already set to this dispatcher
+				if (newDispatcher == currentOwner) {
+					if (updateDispatcher == null || updateDispatcher == newDispatcher) {
+						return;
+					}
+				}
+
+				updateDispatcher = newDispatcher;
+				if (DEBUG)
+					System.out.println(getName() + " updating to " + context.getDispatcher());
+			}
+			context.onForeignThreadUpdate();
+		}
+
+		public void requestDispatch() {
+
+			PoolableDispatcher callingDispatcher = dispatcher.get();
+
+			tracker.onDispatchRequest(callingDispatcher, dispatchContext.get());
+
+			// Otherwise this is coming off another thread, so we need to
+			// synchronize
+			// to protect against ownership changes:
+			synchronized (this) {
+				// If the owner of this context is the calling thread, then
+				// delegate to the dispatcher.
+				if (currentOwner == callingDispatcher) {
+
+					context.requestDispatch();
+					return;
+				}
+
+				dispatchRequested = true;
+			}
+			context.onForeignThreadUpdate();
+		}
+
+		public void updatePriority(int priority) {
+			if (this.priority == priority) {
+				return;
+			}
+			// Otherwise this is coming off another thread, so we need to
+			// synchronize to protect against ownership changes:
+			synchronized (this) {
+				this.priority = priority;
+
+				IDispatcher callingDispatcher = dispatcher.get();
+
+				// If the owner of this context is the calling thread, then
+				// delegate to the dispatcher.
+				if (currentOwner == callingDispatcher) {
+
+					context.updatePriority(priority);
+					return;
+				}
+			}
+			context.onForeignThreadUpdate();
+		}
+
+		public void processForeignUpdates() {
+			boolean ownerChange = false;
+			synchronized (this) {
+
+				if (closed) {
+					context.close();
+					return;
+				}
+
+				if (updateDispatcher != null) {
+					// Close the old context:
+					if (DEBUG) {
+						System.out.println("Assigning " + getName() + " to " + updateDispatcher);
+					}
+					context.close();
+
+					currentOwner = updateDispatcher;
+					updateDispatcher = null;
+					context = currentOwner.createPoolableDispatchContext(context.getDispatchable(), context.getName());
+					dispatchRequested = true;
+					context.updatePriority(priority);
+					context.setPooledDispatchContext(this);
+					ownerChange = true;
+				} else {
+					context.updatePriority(priority);
+
+					if (dispatchRequested) {
+						context.requestDispatch();
+						dispatchRequested = false;
+					}
+				}
+			}
+
+			if (ownerChange) {
+				context.onForeignThreadUpdate();
+			}
+		}
+
+		public void close() {
+			tracker.close();
+			synchronized (this) {
+				IDispatcher callingDispatcher = dispatcher.get();
+
+				// If the owner of this context is the calling thread, then
+				// delegate to the dispatcher.
+				if (currentOwner == callingDispatcher) {
+					context.close();
+					return;
+				}
+			}
+			context.onForeignThreadUpdate();
+		}
+
+		public final String toString() {
+			return context.toString();
+		}
+
+		public Dispatchable getDispatchable() {
+			return context.getDispatchable();
+		}
+
+		public String getName() {
+			return context.getName();
+		}
+	}
+
+	public String toString() {
+		return name;
+	}
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java?rev=743900&r1=743899&r2=743900&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java Thu Feb 12 22:07:16 2009
@@ -16,277 +16,120 @@
  */
 package org.apache.activemq.dispatch;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
+import org.apache.activemq.dispatch.PooledDispatcher.PoolableDispatcher;
+import org.apache.activemq.dispatch.PooledDispatcher.PooledDispatchContext;
 
-/**
- * 
- */
 public class SimpleLoadBalancer implements ExecutionLoadBalancer {
 
-    private static final ThreadLocal<ExecutionGraphNode> dispatchContext = new ThreadLocal<ExecutionGraphNode>();
-    private static final ThreadLocal<PoolableDispatcher> dispatcher = new ThreadLocal<PoolableDispatcher>();
-
-    private final ArrayList<PoolableDispatcher> dispatchers = new ArrayList<PoolableDispatcher>();
+	private final boolean DEBUG = false;
 
-    private final String name;
-    private final boolean DEBUG = false;
+	SimpleLoadBalancer() {
+	}
 
-    SimpleLoadBalancer(String name) {
-        this.name = name;
-    }
-
-    public void start() throws InterruptedException {
-    }
-
-    public void shutdown() {
-    }
-
-    public LoadBalancedDispatchContext createLoadBalancedDispatchContext(PoolableDispatchContext context) {
-        ExecutionGraphNode egn = new ExecutionGraphNode(context);
-        return egn;
-    }
-
-    public synchronized final void addDispatcher(PoolableDispatcher dispatcher) {
-        dispatchers.add(dispatcher);
-    }
-
-    /**
-     * A Dispatcher must call this to indicate that is has started it's dispatch
-     * loop.
-     */
-    public void onDispatcherStarted(PoolableDispatcher d) {
-        dispatcher.set(d);
-    }
-
-    /**
-     * A Dispatcher must call this when exiting it's dispatch loop
-     */
-    public void onDispatcherStopped(PoolableDispatcher d) {
-
-    }
-
-    private class ExecutionGraphEdge {
-        final ExecutionGraphNode target;
-        final ExecutionGraphNode source;
-        int count;
-
-        ExecutionGraphEdge(ExecutionGraphNode source, ExecutionGraphNode target) {
-            this.target = target;
-            this.source = source;
-        }
-
-        public String toString() {
-            return "Connection from: " + source + " to " + target;
-        }
-    }
-
-    /**
-     * ExecutionGraphNode tracks dispatch information for a
-     * MappableDispatchContext.
-     * 
-     */
-    public class ExecutionGraphNode implements LoadBalancedDispatchContext {
-        protected PoolableDispatchContext context;
-        private ExecutionGraphNode singleSource;
-        private final HashMap<ExecutionGraphNode, ExecutionGraphEdge> sources = new HashMap<ExecutionGraphNode, ExecutionGraphEdge>();
-        protected PoolableDispatcher currentOwner;
-        private final AtomicInteger work = new AtomicInteger(0);
-
-        private int priority;
-        private boolean dispatchRequested = false;
-        private PoolableDispatcher updateDispatcher = null;
-
-        ExecutionGraphNode(PoolableDispatchContext context) {
-            this.context = context;
-            this.context.setLoadBalancedDispatchContext(this);
-            this.currentOwner = context.getDispatcher();
-            if (DEBUG) {
-                System.out.println(getName() + " Assigned to " + context.getDispatcher());
-            }
-        }
-
-        public final void startingDispatch() {
-            dispatchContext.set(this);
-        }
-
-        public final void finishedDispatch() {
-            dispatchContext.set(null);
-        }
-
-        /**
-         * This method is called to track which dispatch contexts are requesting
-         * dispatch for the target context represented by this node.
-         * 
-         * This method is not threadsafe, the caller must ensure serialized
-         * access to this method.
-         * 
-         * @param callingDispatcher
-         *            The calling dispatcher.
-         * @return True if this method resulted in the dispatch request being
-         *         assigned to another dispatcher.
-         */
-        public final boolean onDispatchRequest(final PoolableDispatcher callingDispatcher) {
-
-            /*
-             * if (callingDispatcher == currentOwner) { return false; }
-             */
-
-            ExecutionGraphNode callingContext = dispatchContext.get();
-            if (callingContext != null) {
-                // Make sure we are being called by another node:
-                if (callingContext == null || callingContext == context) {
-                    return false;
-                }
-
-                // Optimize for single source case:
-                if (singleSource != callingContext) {
-                    if (singleSource == null && sources.isEmpty()) {
-                        singleSource = callingContext;
-                        ExecutionGraphEdge edge = new ExecutionGraphEdge(callingContext, this);
-                        sources.put(callingContext, edge);
-
-                        // If this context only has a single source
-                        // immediately assign it to the
-                        // dispatcher of the source:
-                        boolean reassigned = false;
-                        synchronized (this) {
-                            if (callingDispatcher != currentOwner && updateDispatcher == null) {
-                                updateDispatcher = callingDispatcher;
-                                reassigned = true;
-                                if (DEBUG)
-                                    System.out.println("Assigning: " + this + " to " + callingContext + "'s  dispatcher: " + callingDispatcher);
-
-                            }
-                        }
-                        if (reassigned) {
-                            assignToNewDispatcher(callingDispatcher);
-                        }
-                        return true;
-                    } else {
-
-                        ExecutionGraphEdge stats = sources.get(callingContext);
-                        if (stats == null) {
-                            stats = new ExecutionGraphEdge(callingContext, this);
-                            sources.put(callingContext, stats);
-                        }
-
-                        if (singleSource != null) {
-                            singleSource = null;
-                        }
-                    }
-                }
-                work.incrementAndGet();
-            }
-            return false;
-        }
-
-        final void assignToNewDispatcher(PoolableDispatcher newDispatcher) {
-            synchronized (this) {
-                if (newDispatcher != currentOwner) {
-                    updateDispatcher = newDispatcher;
-                }
-            }
-            context.onForeignThreadUpdate();
-        }
-
-        public void requestDispatch() {
-
-            PoolableDispatcher callingDispatcher = dispatcher.get();
-
-            if (onDispatchRequest(callingDispatcher)) {
-                return;
-            }
-
-            // Otherwise this is coming off another thread, so we need to
-            // synchronize
-            // to protect against ownership changes:
-            synchronized (this) {
-                // If the owner of this context is the calling thread, then
-                // delegate to the dispatcher.
-                if (currentOwner == callingDispatcher) {
-
-                    context.requestDispatch();
-                    return;
-                }
-
-                dispatchRequested = true;
-            }
-            context.onForeignThreadUpdate();
-        }
-
-        public void updatePriority(int priority) {
-            if (this.priority == priority) {
-                return;
-            }
-            // Otherwise this is coming off another thread, so we need to
-            // synchronize
-            // to protect against ownership changes:
-            synchronized (this) {
-                this.priority = priority;
-
-                IDispatcher callingDispatcher = dispatcher.get();
-
-                // If the owner of this context is the calling thread, then
-                // delegate to the dispatcher.
-                if (currentOwner == callingDispatcher) {
-
-                    context.updatePriority(priority);
-                    return;
-                }
-            }
-            context.onForeignThreadUpdate();
-        }
-
-        public void processForeignUpdates() {
-            boolean ownerChange = false;
-            synchronized (this) {
-                if (updateDispatcher != null) {
-                    // Close the old context:
-                    if (DEBUG) {
-                        System.out.println("Assigning " + getName() + " to " + updateDispatcher);
-                    }
-                    context.close();
-
-                    currentOwner = updateDispatcher;
-                    updateDispatcher = null;
-                    context = currentOwner.createPoolablDispatchContext(context.getDispatchable(), context.getName());
-                    dispatchRequested = true;
-                    context.updatePriority(priority);
-                    context.setLoadBalancedDispatchContext(this);
-                    ownerChange = true;
-                } else {
-                    context.updatePriority(priority);
-
-                    if (dispatchRequested) {
-                        context.requestDispatch();
-                        dispatchRequested = false;
-                    }
-                }
-            }
-
-            if (ownerChange) {
-                context.onForeignThreadUpdate();
-            }
-        }
-
-        public void close() {
-            sources.clear();
-        }
-
-        public final String toString() {
-            return context.toString();
-        }
-
-        public Dispatchable getDispatchable() {
-            return context.getDispatchable();
-        }
-
-        public String getName() {
-            return context.getName();
-        }
-    }
+	private class ExecutionStats {
+		final PooledDispatchContext target;
+		final PooledDispatchContext source;
+		int count;
+
+		ExecutionStats(PooledDispatchContext source, PooledDispatchContext target) {
+			this.target = target;
+			this.source = source;
+		}
+
+		public String toString() {
+			return "Connection from: " + source + " to " + target;
+		}
+	}
+
+	public void addDispatcher(PoolableDispatcher dispatcher) {
+
+	}
+
+	public void removeDispatcher(PoolableDispatcher dispatcher) {
+
+	}
+
+	public void start() {
+	}
+
+	public void stop() {
+	}
+
+	public ExecutionTracker createExecutionTracker(PooledDispatchContext context) {
+		return new SimpleExecutionTracker(context);
+	}
+
+	private class SimpleExecutionTracker implements ExecutionTracker {
+		private final HashMap<PooledDispatchContext, ExecutionStats> sources = new HashMap<PooledDispatchContext, ExecutionStats>();
+		private final PooledDispatchContext context;
+		private final AtomicInteger work = new AtomicInteger(0);
+
+		private PooledDispatchContext singleSource;
+		private PoolableDispatcher currentOwner;
+
+		SimpleExecutionTracker(PooledDispatchContext context) {
+			this.context = context;
+		}
+
+		/**
+		 * This method is called to track which dispatch contexts are requesting
+		 * dispatch for the target context represented by this node.
+		 * 
+		 * This method is not threadsafe, the caller must ensure serialized
+		 * access to this method.
+		 * 
+		 * @param callngDispatcher
+		 *            The calling dispatcher.
+		 * @param context
+		 *            the originating dispatch context
+		 * @return True if this method resulted in the dispatch request being
+		 *         assigned to another dispatcher.
+		 */
+		public void onDispatchRequest(PoolableDispatcher callingDispatcher, PooledDispatchContext callingContext) {
+
+			if (callingContext != null) {
+				// Make sure we are being called by another node:
+				if (callingContext == null || callingContext == context) {
+					return;
+				}
+
+				// Optimize for single source case:
+				if (singleSource != callingContext) {
+					if (singleSource == null && sources.isEmpty()) {
+						singleSource = callingContext;
+						ExecutionStats stats = new ExecutionStats(callingContext, context);
+						sources.put(callingContext, stats);
+
+						// If this context only has a single source
+						// assign it to that source to minimize contention:
+						if (callingDispatcher != currentOwner) {
+							currentOwner = callingDispatcher;
+							if (DEBUG)
+								System.out.println("Assigning: " + context + " to " + callingContext + "'s  dispatcher: " + callingDispatcher);
+							context.assignToNewDispatcher(callingDispatcher);
+						}
+
+					} else {
+
+						ExecutionStats stats = sources.get(callingContext);
+						if (stats == null) {
+							stats = new ExecutionStats(callingContext, context);
+							sources.put(callingContext, stats);
+						}
+
+						if (singleSource != null) {
+							singleSource = null;
+						}
+					}
+				}
+				work.incrementAndGet();
+			}
+		}
+
+		public void close() {
+		}
+	}
 }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=743900&r1=743899&r2=743900&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java Thu Feb 12 22:07:16 2009
@@ -47,7 +47,7 @@
     boolean ptp = false;
 
     // Set to use tcp IO
-    boolean tcp = false;
+    boolean tcp = true;
 
     // Can be set to BLOCKING, POLLING or ASYNC
     public final static int DISPATCH_MODE = AbstractTestConnection.ASYNC;