You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/02/11 21:12:30 UTC

svn commit: r743476 [1/4] - in /activemq/sandbox/activemq-flow: ./ src/ src/main/ src/main/java/ src/main/java/com/ src/main/java/com/progress/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/activemq/ src/main/java/org/apache/act...

Author: chirino
Date: Wed Feb 11 20:12:28 2009
New Revision: 743476

URL: http://svn.apache.org/viewvc?rev=743476&view=rev
Log:
Initial spike of the flow module.  This is basically an experiment of a different flow control and threading model to see if we can make a 
faster/simpler version of the broker.  This code was jointly developed between Colin Macnaughton (yes, he has an icla on file) and me.

Will follow up with a post to the dev list to try to entice more eyes to look at this to see if it sparks any interest.


Added:
    activemq/sandbox/activemq-flow/pom.xml
    activemq/sandbox/activemq-flow/src/
    activemq/sandbox/activemq-flow/src/main/
    activemq/sandbox/activemq-flow/src/main/java/
    activemq/sandbox/activemq-flow/src/main/java/com/
    activemq/sandbox/activemq-flow/src/main/java/com/progress/
    activemq/sandbox/activemq-flow/src/main/java/org/
    activemq/sandbox/activemq-flow/src/main/java/org/apache/
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityLinkedList.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityMap.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowSource.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimiter.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/Flow.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowLimiter.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowResource.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSource.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Metric.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricAggregator.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricCounter.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Period.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IAsynchronousFlowSource.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IBlockingFlowSource.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPollableFlowSource.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Mapper.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MemoryStore.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowPriorityQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Store.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Subscription.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/TreeMemoryStore.java
    activemq/sandbox/activemq-flow/src/test/
    activemq/sandbox/activemq-flow/src/test/java/
    activemq/sandbox/activemq-flow/src/test/java/com/
    activemq/sandbox/activemq-flow/src/test/java/com/progress/
    activemq/sandbox/activemq-flow/src/test/java/org/
    activemq/sandbox/activemq-flow/src/test/java/org/apache/
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Destination.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MessageGenerator.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Pipe.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/TestFlowManager.java

Added: activemq/sandbox/activemq-flow/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/pom.xml?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/pom.xml (added)
+++ activemq/sandbox/activemq-flow/pom.xml Wed Feb 11 20:12:28 2009
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.activemq</groupId>
+    <artifactId>activemq-parent</artifactId>
+    <version>5.3-SNAPSHOT</version>
+  </parent>
+  
+	<groupId>org.apache.activemq.flow</groupId>
+	<artifactId>activemq-flow</artifactId>
+	<packaging>jar</packaging>
+	<version>1.0-SNAPSHOT</version>
+
+	<name>ActiveMQ :: Flow</name>
+
+	<dependencies>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>colt</groupId>
+			<artifactId>colt</artifactId>
+			<version>1.2.0</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.activemq</groupId>
+			<artifactId>activemq-core</artifactId>
+		</dependency>
+	</dependencies>
+	
+</project>

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
+
+public interface ExecutionLoadBalancer {
+
+    /**
+     * A Load Balanced Dispatch context can be moved between different
+     * dispatchers.
+     */
+    public interface LoadBalancedDispatchContext extends DispatchContext {
+        /**
+         * A dispatcher must call this when it starts dispatch for this context
+         */
+        public void startingDispatch();
+
+        /**
+         * A dispatcher must call this when it has finished dispatching a
+         * context
+         */
+        public void finishedDispatch();
+
+        /**
+		 * 
+		 */
+        public void processForeignUpdates();
+    }
+
+    public interface PoolableDispatchContext extends DispatchContext {
+
+        public void setLoadBalancedDispatchContext(LoadBalancedDispatchContext context);
+
+        /**
+         * Indicates that another thread has made an update to the dispatch
+         * context.
+         * 
+         */
+        public void onForeignThreadUpdate();
+
+        public PoolableDispatcher getDispatcher();
+    }
+
+    public interface PoolableDispatcher extends IDispatcher {
+
+        /**
+         * Indicates that another thread has made an update to the dispatch
+         * context.
+         * 
+         */
+        public PoolableDispatchContext createPoolablDispatchContext(Dispatchable dispatchable, String name);
+    }
+
+    /**
+     * This wraps the dispatch context into one that is load balanced by the
+     * LoadBalancer
+     * 
+     * @param context
+     *            The context to wrap.
+     * @return
+     */
+    public LoadBalancedDispatchContext createLoadBalancedDispatchContext(PoolableDispatchContext context);
+
+    /**
+     * Adds a Dispatcher to the list of dispatchers managed by the load balancer
+     * 
+     * @param dispatcher
+     */
+    public void addDispatcher(PoolableDispatcher dispatcher);
+
+    /**
+     * A Dispatcher must call this from it's dispatcher thread to indicate that
+     * is has started it's dispatch has started.
+     */
+    public void onDispatcherStarted(PoolableDispatcher dispatcher);
+
+    /**
+     * A Dispatcher must call this from it's dispatcher thread when exiting it's
+     * dispatch loop
+     */
+    public void onDispatcherStopped(PoolableDispatcher dispatcher);
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+public interface IDispatcher {
+
+    /**
+     * This interface is implemented by Dispatchable entities. A Dispatchable
+     * entity registers with an {@link IDispatcher} and is returned a
+     * {@link DispatchContext} which it can use to request the
+     * {@link IDispatcher} to invoke {@link Dispatchable#dispatch()}
+     * 
+     * {@link IDispatcher} guarantees that {@link #dispatch()} will never invoke
+     * dispatch concurrently unless the {@link Dispatchable} is registered with
+     * more than one {@link IDispatcher};
+     */
+    public interface Dispatchable {
+        public boolean dispatch();
+    }
+
+    /**
+     * Returned to callers registered with this dispathcer. Used by the caller
+     * to inform the dispatcher that it is ready for dispatch.
+     * 
+     * Note that DispatchContext is not safe for concurrent access by multiple
+     * threads.
+     */
+    public interface DispatchContext {
+        /**
+         * Once registered with a dispatcher, this can be called to request
+         * dispatch. The {@link Dispatchable} will remain in the dispatch queue
+         * until a subsequent call to {@link Dispatchable#dispatch()} returns
+         * false;
+         */
+        public void requestDispatch();
+
+        /**
+         * This can be called to update the dispatch priority.
+         * 
+         * @param priority
+         */
+        public void updatePriority(int priority);
+
+        /**
+         * Gets the Dispatchable that this context represents.
+         * 
+         * @return The dispatchable
+         */
+        public Dispatchable getDispatchable();
+
+        /**
+         * Gets the name of the dispatch context
+         * 
+         * @return The dispatchable
+         */
+        public String getName();
+
+        /**
+         * This must be called to release any resource the dispatcher is holding
+         * on behalf of this context.
+         */
+        public void close();
+    }
+
+    class RunnableAdapter implements Dispatchable {
+        final Runnable runnable;
+
+        RunnableAdapter(Runnable runnable) {
+            this.runnable = runnable;
+        }
+
+        public boolean dispatch() {
+            runnable.run();
+            return true;
+        }
+    }
+
+    /**
+     * Registers a {@link Dispatchable} with this dispatcher, and returns a
+     * {@link DispatchContext} that the caller can use to request dispatch.
+     * 
+     * @param dispatchable
+     *            The {@link Dispatchable}
+     * @param name
+     *            An identifier for the dispatcher.
+     * @return A {@link DispatchContext} that can be used to request dispatch
+     */
+    public DispatchContext register(Dispatchable dispatchable, String name);
+
+    /**
+     * Creates an executor that will execute its tasks at the specified
+     * priority.
+     * 
+     * @param priority
+     *            The priority
+     * @return A prioritized executor.
+     */
+    public Executor createPriorityExecutor(int priority);
+
+    /**
+     * Starts the dispatcher.
+     */
+    public void start();
+
+    /**
+     * Shuts down the dispatcher, this may result in previous dispatch requests
+     * going unserved.
+     */
+    public void shutdown() throws InterruptedException;
+
+    /**
+     * Schedules the given {@link Runnable} to be run at the specified time in
+     * the future on this {@link IDispatcher}.
+     * 
+     * @param runnable
+     *            The Runnable to execute
+     * @param delay
+     *            The delay
+     * @param timeUnit
+     *            The TimeUnit used to interpret delay.
+     */
+    public void schedule(final Runnable runnable, long delay, TimeUnit timeUnit);
+
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,453 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+import java.util.LinkedList;
+import java.util.TreeMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.dispatch.ExecutionLoadBalancer.LoadBalancedDispatchContext;
+import org.apache.activemq.dispatch.ExecutionLoadBalancer.PoolableDispatchContext;
+import org.apache.activemq.dispatch.ExecutionLoadBalancer.PoolableDispatcher;
+import org.apache.activemq.queue.Mapper;
+import org.apache.kahadb.util.LinkedNode;
+import org.apache.kahadb.util.LinkedNodeList;
+
+public class PriorityDispatcher implements Runnable, PoolableDispatcher {
+
+    private Thread thread;
+    private boolean running = false;
+    private boolean threaded = false;
+    private final int MAX_USER_PRIORITY;
+
+    static final ThreadLocal<PriorityDispatcher> dispatcher = new ThreadLocal<PriorityDispatcher>();
+
+    private final ExecutionLoadBalancer loadBalancer;
+
+    // The local dispatch queue:
+    private final PriorityLinkedList<PriorityDispatchContext> priorityQueue;
+
+    // Dispatch queue for requests from other threads:
+    private final LinkedNodeList<ForeignEvent> foreignQueue = new LinkedNodeList<ForeignEvent>();
+
+    // Timed Execution List
+    private final TimerHeap timerHeap = new TimerHeap();
+
+    private final String name;
+    private final AtomicBoolean foreignAvailable = new AtomicBoolean(false);
+    private final Semaphore foreignPermits = new Semaphore(0);
+
+    private final Mapper<Integer, PriorityDispatchContext> PRIORITY_MAPPER = new Mapper<Integer, PriorityDispatchContext>() {
+        public Integer map(PriorityDispatchContext element) {
+            return element.listPrio;
+        }
+    };
+
+    public PriorityDispatcher(String name, int priorities, ExecutionLoadBalancer loadBalancer) {
+        this.name = name;
+        MAX_USER_PRIORITY = priorities;
+        priorityQueue = new PriorityLinkedList<PriorityDispatchContext>(MAX_USER_PRIORITY + 1, PRIORITY_MAPPER);
+        this.loadBalancer = loadBalancer;
+        loadBalancer.addDispatcher(this);
+    }
+
+    private abstract class ForeignEvent extends LinkedNode<ForeignEvent> {
+        public abstract void execute();
+
+        final void addToList() {
+            synchronized (foreignQueue) {
+                if (!this.isLinked()) {
+                    foreignQueue.addLast(this);
+                    if (!foreignAvailable.getAndSet(true)) {
+                        foreignPermits.release();
+                    }
+                }
+            }
+        }
+    }
+
+    public boolean isThreaded() {
+        return threaded;
+    }
+
+    public void setThreaded(boolean threaded) {
+        this.threaded = threaded;
+    }
+
+    private class UpdateEvent extends ForeignEvent {
+        private final PriorityDispatchContext pdc;
+
+        UpdateEvent(PriorityDispatchContext pdc) {
+            this.pdc = pdc;
+        }
+
+        // Can only be called by the owner of this dispatch context:
+        public void execute() {
+            pdc.lbContext.processForeignUpdates();
+        }
+    }
+
+    class PriorityDispatchContext extends LinkedNode<PriorityDispatchContext> implements PoolableDispatchContext {
+        // The dispatchable target:
+        final Dispatchable dispatchable;
+        LoadBalancedDispatchContext lbContext;
+        // The name of this context:
+        final String name;
+        // list prio can only be updated in the thread of of this dispatcher:
+        int listPrio;
+        // The update event is used to update fields in the dispatch context
+        // from foreign threads:
+        final UpdateEvent updateEvent = new UpdateEvent(this);
+
+        // Marked by the caller when this is closed.
+        boolean closed = false;
+
+        private PriorityDispatchContext(Dispatchable dispatchable, boolean persistent, String name) {
+            super();
+            this.dispatchable = dispatchable;
+            this.name = name;
+        }
+
+        // The load balancer will guarantee that this is on our thread:
+        public final void requestDispatch() {
+            if (!isLinked()) {
+                priorityQueue.add(this, listPrio);
+            }
+            return;
+        }
+
+        // The load balancer guarantees that this is called on our thread:
+        public final void updatePriority(int priority) {
+            if (priority != listPrio) {
+
+                listPrio = priority;
+                // If there is a priority change relink the context
+                // at the new priority:
+                if (isLinked()) {
+                    unlink();
+                    priorityQueue.add(this, listPrio);
+                }
+            }
+            return;
+
+        }
+
+        public void onForeignThreadUpdate() {
+            updateEvent.addToList();
+        }
+
+        // Will only be called on this thread:
+        public void close() {
+            if (isLinked()) {
+                unlink();
+            }
+            synchronized (foreignQueue) {
+                if (updateEvent.isLinked()) {
+                    updateEvent.unlink();
+                }
+            }
+
+            closed = true;
+        }
+
+        /**
+         * This can only be called by the owning dispatch thread:
+         * 
+         * @return False if the dispatchable has more work to do.
+         */
+        public final boolean dispatch() {
+            return dispatchable.dispatch();
+        }
+
+        public String toString() {
+            return name;
+        }
+
+        public Dispatchable getDispatchable() {
+            return dispatchable;
+        }
+
+        public void setLoadBalancedDispatchContext(LoadBalancedDispatchContext context) {
+            this.lbContext = context;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public PoolableDispatcher getDispatcher() {
+            return PriorityDispatcher.this;
+        }
+    }
+
+    public DispatchContext register(Dispatchable dispatchable, String name) {
+        return loadBalancer.createLoadBalancedDispatchContext(createPoolablDispatchContext(dispatchable, name));
+    }
+
+    public PoolableDispatchContext createPoolablDispatchContext(Dispatchable dispatchable, String name) {
+        return new PriorityDispatchContext(dispatchable, true, name);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.dispatch.IDispatcher#start()
+     */
+    public synchronized final void start() {
+        if (thread == null) {
+            running = true;
+            thread = new Thread(this, name);
+            thread.start();
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
+     */
+    public synchronized final void shutdown() throws InterruptedException {
+        if (thread != null) {
+            dispatch(new RunnableAdapter(new Runnable() {
+
+                public void run() {
+                    running = false;
+                }
+
+            }), MAX_USER_PRIORITY + 1);
+            thread.interrupt();
+            thread.join();
+            thread = null;
+        }
+    }
+
+    public void run() {
+
+        // Inform the dispatcher that we have started:
+        loadBalancer.onDispatcherStarted(this);
+        dispatcher.set(this);
+        PriorityDispatchContext pdc;
+        try {
+            while (running) {
+                pdc = priorityQueue.poll();
+                // If no local work available wait for foreign work:
+                if (pdc == null) {
+                    foreignPermits.acquire();
+                } else {
+                    pdc.lbContext.startingDispatch();
+
+                    while (!pdc.dispatch()) {
+                        // If there is a higher priority dispatchable stop
+                        // processing this one:
+                        if (pdc.listPrio < priorityQueue.getHighestPriority()) {
+                            // May have gotten relinked by the caller:
+                            if (!pdc.isLinked()) {
+                                priorityQueue.add(pdc, pdc.listPrio);
+                            }
+                            break;
+                        }
+                    }
+
+                    pdc.lbContext.finishedDispatch();
+
+                }
+
+                // Execute delayed events:
+                timerHeap.executeReadyEvents();
+
+                // Check for foreign dispatch requests:
+                if (foreignAvailable.get()) {
+                    synchronized (foreignQueue) {
+                        // Drain the foreign queue:
+                        while (true) {
+                            ForeignEvent fe = foreignQueue.getHead();
+                            // TODO should probably swap foreign queue here:
+                            if (fe == null) {
+                                foreignAvailable.set(false);
+                                foreignPermits.drainPermits();
+                                break;
+                            }
+
+                            fe.unlink();
+                            fe.execute();
+                        }
+                    }
+                }
+            }
+        } catch (InterruptedException e) {
+            return;
+        } catch (Throwable thrown) {
+            thrown.printStackTrace();
+        } finally {
+            loadBalancer.onDispatcherStopped(this);
+        }
+    }
+
+    class ThreadSafeDispatchContext implements LoadBalancedDispatchContext {
+        final PriorityDispatchContext delegate;
+
+        ThreadSafeDispatchContext(PriorityDispatchContext context) {
+            this.delegate = context;
+            delegate.setLoadBalancedDispatchContext(this);
+        }
+
+        public void finishedDispatch() {
+            // NOOP
+
+        }
+
+        public void startingDispatch() {
+            // Noop
+
+        }
+
+        public void close() {
+            // Noop this is always transient:
+        }
+
+        public void processForeignUpdates() {
+            requestDispatch();
+        }
+
+        public Dispatchable getDispatchable() {
+            return delegate.getDispatchable();
+        }
+
+        public void requestDispatch() {
+            if (dispatcher.get() == PriorityDispatcher.this) {
+                delegate.requestDispatch();
+            } else {
+                delegate.onForeignThreadUpdate();
+            }
+        }
+
+        public void updatePriority(int priority) {
+            throw new UnsupportedOperationException("Not implemented");
+        }
+
+        public String getName() {
+            return delegate.name;
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.activemq.dispatch.IDispatcher#dispatch(org.apache.activemq
+     * .dispatch.Dispatcher.Dispatchable)
+     */
+    final void dispatch(Dispatchable dispatchable, int priority) {
+        ThreadSafeDispatchContext context = new ThreadSafeDispatchContext(new PriorityDispatchContext(dispatchable, false, name));
+        context.delegate.updatePriority(priority);
+        context.requestDispatch();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.dispatch.IDispatcher#createPriorityExecutor(int)
+     */
+    public Executor createPriorityExecutor(final int priority) {
+
+        return new Executor() {
+
+            public void execute(final Runnable runnable) {
+                dispatch(new RunnableAdapter(runnable), priority);
+            }
+        };
+    }
+
+    public void execute(final Runnable runnable) {
+        dispatch(new RunnableAdapter(runnable), 0);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable,
+     * long, java.util.concurrent.TimeUnit)
+     */
+    public void schedule(final Runnable runnable, final long delay, final TimeUnit timeUnit) {
+        if (dispatcher.get() == this) {
+            timerHeap.add(runnable, delay, timeUnit);
+        } else {
+            new ForeignEvent() {
+                public void execute() {
+                    timerHeap.add(runnable, delay, timeUnit);
+                }
+            }.addToList();
+        }
+    }
+
+    public String toString() {
+        return name;
+    }
+
+    private class TimerHeap {
+
+        final TreeMap<Long, LinkedList<Runnable>> timers = new TreeMap<Long, LinkedList<Runnable>>();
+
+        private void add(Runnable runnable, long delay, TimeUnit timeUnit) {
+
+            long nanoDelay = timeUnit.convert(delay, TimeUnit.NANOSECONDS);
+            long eTime = System.nanoTime() + nanoDelay;
+            LinkedList<Runnable> list = new LinkedList<Runnable>();
+            list.add(runnable);
+
+            LinkedList<Runnable> old = timers.put(eTime, list);
+            if (old != null) {
+                list.addAll(old);
+            }
+        }
+
+        private void executeReadyEvents() {
+            LinkedList<Runnable> ready = null;
+            if (timers.isEmpty()) {
+                return;
+            } else {
+                long now = System.nanoTime();
+                long first = timers.firstKey();
+                if (first > now) {
+                    return;
+                }
+                ready = new LinkedList<Runnable>();
+
+                while (first < now) {
+                    ready.addAll(timers.remove(first));
+                    if (timers.isEmpty()) {
+                        break;
+                    }
+                    first = timers.firstKey();
+
+                }
+            }
+
+            for (Runnable runnable : ready) {
+                try {
+                    runnable.run();
+                } catch (Throwable thrown) {
+                    thrown.printStackTrace();
+                }
+            }
+        }
+    }
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityLinkedList.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityLinkedList.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityLinkedList.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityLinkedList.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+import java.util.ArrayList;
+
+import org.apache.activemq.queue.Mapper;
+import org.apache.kahadb.util.LinkedNode;
+import org.apache.kahadb.util.LinkedNodeList;
+
+public class PriorityLinkedList<E extends LinkedNode<E>> {
+
+    private Mapper<Integer, E> priorityMapper;
+    private final ArrayList<LinkedNodeList<E>> priorityLists;
+    private int highesPriority = 0;
+
+    public PriorityLinkedList(int numPriorities) {
+        this(numPriorities, null);
+    }
+
+    public PriorityLinkedList(int numPriorities, Mapper<Integer, E> priorityMapper) {
+        this.priorityMapper = priorityMapper;
+        priorityLists = new ArrayList<LinkedNodeList<E>>();
+        for (int i = 0; i <= numPriorities; i++) {
+            priorityLists.add(new LinkedNodeList<E>());
+        }
+    }
+
+    public final int getHighestPriority() {
+        return highesPriority;
+    }
+
+    /**
+     * Gets the element at the front of the list:
+     * 
+     * @return
+     */
+    public final E poll() {
+        LinkedNodeList<E> ll = getHighestPriorityList();
+        if (ll == null) {
+            return null;
+        }
+        E node = ll.getHead();
+        node.unlink();
+
+        return node;
+    }
+
+    public final boolean isEmpty() {
+        return peek() != null;
+    }
+
+    /**
+     * Gets the element at the front of the list:
+     * 
+     * @return
+     */
+    public final E peek() {
+        LinkedNodeList<E> ll = getHighestPriorityList();
+        if (ll == null) {
+            return null;
+        }
+
+        return ll.getHead();
+    }
+
+    public final void add(E element) {
+        int prio = priorityMapper.map(element);
+        add(element, prio);
+    }
+
+    public final void add(E element, int prio) {
+        LinkedNodeList<E> ll = priorityLists.get(prio);
+        ll.addLast(element);
+        if (prio > highesPriority) {
+            highesPriority = prio;
+        }
+    }
+
+    private final LinkedNodeList<E> getHighestPriorityList() {
+        LinkedNodeList<E> ll = priorityLists.get(highesPriority);
+        while (ll.isEmpty()) {
+            if (highesPriority == 0) {
+                return null;
+            }
+            highesPriority--;
+            ll = priorityLists.get(highesPriority);
+        }
+
+        return ll;
+    }
+
+    public Mapper<Integer, E> getPriorityMapper() {
+        return priorityMapper;
+    }
+
+    public void setPriorityMapper(Mapper<Integer, E> priorityMapper) {
+        this.priorityMapper = priorityMapper;
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityMap.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityMap.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityMap.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityMap.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+import java.util.Arrays;
+
+public class PriorityMap<E> {
+
+    int first;
+    int base;
+    int size;
+
+    Object elements[] = new Object[1];
+
+    public E put(int key, E value) {
+        E rc = null;
+        if (isEmpty()) {
+            // This will be the first base prioritu..
+            base = key;
+            elements[0] = value;
+            first = 0;
+        } else {
+            if (key > base) {
+                // New priority is after the current base, we may need to
+                // expaned the
+                // priority array to fit this new one in.
+                int index = key - base;
+                if (elements.length <= index) {
+                    // The funky thing is if the original base was removed,
+                    // resizing
+                    // will rebase the at the first.
+                    resize(index + 1, 0);
+                }
+                if (index < first) {
+                    first = index;
+                }
+                rc = element(index);
+                elements[index] = value;
+            } else {
+                // Ok this element is before the current base so we need to
+                // resize/rebase
+                // using this element as the base.
+                int oldLastIndex = indexOfLast();
+                int newLastIndex = (base + oldLastIndex) - key;
+                resize(newLastIndex + 1, first + (base - key), (oldLastIndex - first) + 1);
+                elements[0] = value;
+                first = 0;
+            }
+        }
+        if (rc == null) {
+            size++;
+        }
+        return rc;
+    }
+
+    private int indexOfLast() {
+        int i = elements.length - 1;
+        while (i >= 0) {
+            if (elements[i] != null) {
+                return i;
+            }
+            i--;
+        }
+        return -1;
+    }
+
+    private void resize(int newSize, int firstOffset) {
+        int count = Math.min(elements.length - first, newSize);
+        resize(newSize, firstOffset, count);
+    }
+
+    private void resize(int newSize, int firstOffset, int copyCount) {
+        Object t[];
+        if (elements.length == newSize) {
+            t = elements;
+            System.arraycopy(elements, first, t, firstOffset, copyCount);
+            Arrays.fill(t, 0, firstOffset, null);
+        } else {
+            t = new Object[newSize];
+            System.arraycopy(elements, first, t, firstOffset, copyCount);
+        }
+        base += (first - firstOffset);
+        elements = t;
+    }
+
+    public E get(int priority) {
+        int index = priority - base;
+        if (index < 0 || index >= elements.length) {
+            return null;
+        }
+        return element(index);
+    }
+
+    @SuppressWarnings("unchecked")
+    private E element(int index) {
+        return (E) elements[index];
+    }
+
+    public E remove(int priority) {
+        int index = priority - base;
+        if (index < 0 || index >= elements.length) {
+            return null;
+        }
+        E rc = element(index);
+        elements[index] = null;
+        if (rc != null) {
+            size--;
+        }
+        return rc;
+    }
+
+    public boolean isEmpty() {
+        return size == 0;
+    }
+
+    public E firstValue() {
+        if (size == 0) {
+            return null;
+        }
+        E rc = element(first);
+        while (rc == null) {
+            // The first element may have been removed so we need to find it...
+            first++;
+            rc = element(first);
+        }
+        return (E) rc;
+    }
+
+    public Integer firstKey() {
+        if (size == 0) {
+            return null;
+        }
+        E rc = element(first);
+        while (rc == null) {
+            // The first element may have been removed so we need to find it...
+            first++;
+            rc = element(first);
+        }
+        return first;
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+import java.util.ArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class PriorityPooledDispatcher implements IDispatcher {
+    private final String name;
+
+    final AtomicBoolean started = new AtomicBoolean();
+    final AtomicBoolean shutdown = new AtomicBoolean();
+
+    ArrayList<PriorityDispatcher> dispatchers = new ArrayList<PriorityDispatcher>();
+    private int roundRobinCounter = 0;
+    private final int size;
+
+    private final SimpleLoadBalancer executionGraphLoadBalancer;
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.dispatch.IDispatcher#createPriorityExecutor(int)
+     */
+    public Executor createPriorityExecutor(final int priority) {
+        return new Executor() {
+            public void execute(final Runnable runnable) {
+                chooseDispatcher().dispatch(new RunnableAdapter(runnable), 0);
+            }
+        };
+    }
+
+    public PriorityPooledDispatcher(String name, int size, int priorities) {
+        this.name = name;
+        this.size = size;
+        executionGraphLoadBalancer = new SimpleLoadBalancer(name);
+        // Create all the workers.
+        for (int i = 0; i < size; i++) {
+            PriorityDispatcher dispatcher = new PriorityDispatcher(name + "-" + (i + 1), priorities, executionGraphLoadBalancer);
+            dispatchers.add(dispatcher);
+        }
+    }
+
+    public DispatchContext register(Dispatchable dispatchable, String name) {
+        return chooseDispatcher().register(dispatchable, name);
+    }
+
+    /**
+     * @see org.apache.activemq.dispatch.IDispatcher#start()
+     */
+    public synchronized final void start() {
+        if (started.compareAndSet(false, true)) {
+            // Create all the workers.
+            for (int i = 0; i < size; i++) {
+                dispatchers.get(i).start();
+            }
+        }
+        try {
+            executionGraphLoadBalancer.start();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
+     */
+    public synchronized final void shutdown() throws InterruptedException {
+        shutdown.set(true);
+        for (PriorityDispatcher dispatcher : dispatchers) {
+            dispatcher.shutdown();
+        }
+        executionGraphLoadBalancer.shutdown();
+    }
+
+    private PriorityDispatcher chooseDispatcher() {
+        PriorityDispatcher d = PriorityDispatcher.dispatcher.get();
+        if (d == null) {
+            synchronized (dispatchers) {
+                if (++roundRobinCounter >= size) {
+                    roundRobinCounter = 0;
+                }
+                return dispatchers.get(roundRobinCounter);
+            }
+        } else {
+            return d;
+        }
+    }
+
+    public void execute(final Runnable runnable) {
+        chooseDispatcher().dispatch(new RunnableAdapter(runnable), 0);
+    }
+
+    // TODO Implement
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable,
+     * long, java.util.concurrent.TimeUnit)
+     */
+    public void schedule(final Runnable runnable, long delay, TimeUnit timeUnit) {
+        chooseDispatcher().schedule(runnable, delay, timeUnit);
+    }
+
+    public String toString() {
+        return name;
+    }
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
+
+/**
+ * 
+ */
+public class SimpleLoadBalancer implements ExecutionLoadBalancer {
+
+    private static final ThreadLocal<ExecutionGraphNode> dispatchContext = new ThreadLocal<ExecutionGraphNode>();
+    private static final ThreadLocal<PoolableDispatcher> dispatcher = new ThreadLocal<PoolableDispatcher>();
+
+    private final ArrayList<PoolableDispatcher> dispatchers = new ArrayList<PoolableDispatcher>();
+
+    private final String name;
+    private final boolean DEBUG = false;
+
+    SimpleLoadBalancer(String name) {
+        this.name = name;
+    }
+
+    public void start() throws InterruptedException {
+    }
+
+    public void shutdown() {
+    }
+
+    public LoadBalancedDispatchContext createLoadBalancedDispatchContext(PoolableDispatchContext context) {
+        ExecutionGraphNode egn = new ExecutionGraphNode(context);
+        return egn;
+    }
+
+    public synchronized final void addDispatcher(PoolableDispatcher dispatcher) {
+        dispatchers.add(dispatcher);
+    }
+
+    /**
+     * A Dispatcher must call this to indicate that is has started it's dispatch
+     * loop.
+     */
+    public void onDispatcherStarted(PoolableDispatcher d) {
+        dispatcher.set(d);
+    }
+
+    /**
+     * A Dispatcher must call this when exiting it's dispatch loop
+     */
+    public void onDispatcherStopped(PoolableDispatcher d) {
+
+    }
+
+    private class ExecutionGraphEdge {
+        final ExecutionGraphNode target;
+        final ExecutionGraphNode source;
+        int count;
+
+        ExecutionGraphEdge(ExecutionGraphNode source, ExecutionGraphNode target) {
+            this.target = target;
+            this.source = source;
+        }
+
+        public String toString() {
+            return "Connection from: " + source + " to " + target;
+        }
+    }
+
+    /**
+     * ExecutionGraphNode tracks dispatch information for a
+     * MappableDispatchContext.
+     * 
+     */
+    public class ExecutionGraphNode implements LoadBalancedDispatchContext {
+        protected PoolableDispatchContext context;
+        private ExecutionGraphNode singleSource;
+        private final HashMap<ExecutionGraphNode, ExecutionGraphEdge> sources = new HashMap<ExecutionGraphNode, ExecutionGraphEdge>();
+        protected PoolableDispatcher currentOwner;
+        private final AtomicInteger work = new AtomicInteger(0);
+
+        private int priority;
+        private boolean dispatchRequested = false;
+        private PoolableDispatcher updateDispatcher = null;
+
+        ExecutionGraphNode(PoolableDispatchContext context) {
+            this.context = context;
+            this.context.setLoadBalancedDispatchContext(this);
+            this.currentOwner = context.getDispatcher();
+            if (DEBUG) {
+                System.out.println(getName() + " Assigned to " + context.getDispatcher());
+            }
+        }
+
+        public final void startingDispatch() {
+            dispatchContext.set(this);
+        }
+
+        public final void finishedDispatch() {
+            dispatchContext.set(null);
+        }
+
+        /**
+         * This method is called to track which dispatch contexts are requesting
+         * dispatch for the target context represented by this node.
+         * 
+         * This method is not threadsafe, the caller must ensure serialized
+         * access to this method.
+         * 
+         * @param callingDispatcher
+         *            The calling dispatcher.
+         * @return True if this method resulted in the dispatch request being
+         *         assigned to another dispatcher.
+         */
+        public final boolean onDispatchRequest(final PoolableDispatcher callingDispatcher) {
+
+            /*
+             * if (callingDispatcher == currentOwner) { return false; }
+             */
+
+            ExecutionGraphNode callingContext = dispatchContext.get();
+            if (callingContext != null) {
+                // Make sure we are being called by another node:
+                if (callingContext == null || callingContext == context) {
+                    return false;
+                }
+
+                // Optimize for single source case:
+                if (singleSource != callingContext) {
+                    if (singleSource == null && sources.isEmpty()) {
+                        singleSource = callingContext;
+                        ExecutionGraphEdge edge = new ExecutionGraphEdge(callingContext, this);
+                        sources.put(callingContext, edge);
+
+                        // If this context only has a single source
+                        // immediately assign it to the
+                        // dispatcher of the source:
+                        boolean reassigned = false;
+                        synchronized (this) {
+                            if (callingDispatcher != currentOwner && updateDispatcher == null) {
+                                updateDispatcher = callingDispatcher;
+                                reassigned = true;
+                                if (DEBUG)
+                                    System.out.println("Assigning: " + this + " to " + callingContext + "'s  dispatcher: " + callingDispatcher);
+
+                            }
+                        }
+                        if (reassigned) {
+                            assignToNewDispatcher(callingDispatcher);
+                        }
+                        return true;
+                    } else {
+
+                        ExecutionGraphEdge stats = sources.get(callingContext);
+                        if (stats == null) {
+                            stats = new ExecutionGraphEdge(callingContext, this);
+                            sources.put(callingContext, stats);
+                        }
+
+                        if (singleSource != null) {
+                            singleSource = null;
+                        }
+                    }
+                }
+                work.incrementAndGet();
+            }
+            return false;
+        }
+
+        final void assignToNewDispatcher(PoolableDispatcher newDispatcher) {
+            synchronized (this) {
+                if (newDispatcher != currentOwner) {
+                    updateDispatcher = newDispatcher;
+                }
+            }
+            context.onForeignThreadUpdate();
+        }
+
+        public void requestDispatch() {
+
+            PoolableDispatcher callingDispatcher = dispatcher.get();
+
+            if (onDispatchRequest(callingDispatcher)) {
+                return;
+            }
+
+            // Otherwise this is coming off another thread, so we need to
+            // synchronize
+            // to protect against ownership changes:
+            synchronized (this) {
+                // If the owner of this context is the calling thread, then
+                // delegate to the dispatcher.
+                if (currentOwner == callingDispatcher) {
+
+                    context.requestDispatch();
+                    return;
+                }
+
+                dispatchRequested = true;
+            }
+            context.onForeignThreadUpdate();
+        }
+
+        public void updatePriority(int priority) {
+            if (this.priority == priority) {
+                return;
+            }
+            // Otherwise this is coming off another thread, so we need to
+            // synchronize
+            // to protect against ownership changes:
+            synchronized (this) {
+                this.priority = priority;
+
+                IDispatcher callingDispatcher = dispatcher.get();
+
+                // If the owner of this context is the calling thread, then
+                // delegate to the dispatcher.
+                if (currentOwner == callingDispatcher) {
+
+                    context.updatePriority(priority);
+                    return;
+                }
+            }
+            context.onForeignThreadUpdate();
+        }
+
+        public void processForeignUpdates() {
+            boolean ownerChange = false;
+            synchronized (this) {
+                if (updateDispatcher != null) {
+                    // Close the old context:
+                    if (DEBUG) {
+                        System.out.println("Assigning " + getName() + " to " + updateDispatcher);
+                    }
+                    context.close();
+
+                    currentOwner = updateDispatcher;
+                    updateDispatcher = null;
+                    context = currentOwner.createPoolablDispatchContext(context.getDispatchable(), context.getName());
+                    dispatchRequested = true;
+                    context.updatePriority(priority);
+                    context.setLoadBalancedDispatchContext(this);
+                    ownerChange = true;
+                } else {
+                    context.updatePriority(priority);
+
+                    if (dispatchRequested) {
+                        context.requestDispatch();
+                        dispatchRequested = false;
+                    }
+                }
+            }
+
+            if (ownerChange) {
+                context.onForeignThreadUpdate();
+            }
+        }
+
+        public void close() {
+            sources.clear();
+        }
+
+        public final String toString() {
+            return context.toString();
+        }
+
+        public Dispatchable getDispatchable() {
+            return context.getDispatchable();
+        }
+
+        public String getName() {
+            return context.getName();
+        }
+    }
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+import java.util.HashMap;
+import java.util.HashSet;
+
+public abstract class AbstractLimitedFlowResource<E> implements IFlowResource {
+    private final HashSet<FlowLifeCycleListener> lifeCycleWatchers = new HashSet<FlowLifeCycleListener>();
+    private final HashMap<Flow, FlowController<E>> openControllers = new HashMap<Flow, FlowController<E>>();
+
+    private final long resourceId = RESOURCE_COUNTER.incrementAndGet();
+
+    private String resourceName;
+
+    protected AbstractLimitedFlowResource() {
+
+    }
+
+    protected AbstractLimitedFlowResource(String name) {
+        this.resourceName = name;
+    }
+
+    public long getResourceId() {
+        return resourceId;
+    }
+
+    public String getResourceName() {
+        return resourceName;
+    }
+
+    protected void setResourceName(String resourceName) {
+        this.resourceName = resourceName;
+    }
+
+    public synchronized final void addFlowLifeCycleListener(FlowLifeCycleListener listener) {
+        lifeCycleWatchers.add(listener);
+        // Notify the watchers of all flows that are already open:
+        for (FlowController<E> controller : openControllers.values()) {
+            listener.onFlowOpened(this, controller.getFlow());
+        }
+    }
+
+    /**
+     * Subclasses must call this whenever a new {@link ISinkController} is
+     * opened.
+     * 
+     * @param controller
+     *            The new controller.
+     */
+    protected synchronized final void onFlowOpened(FlowController<E> controller) {
+        FlowController<E> existing = openControllers.put(controller.getFlow(), controller);
+        if (existing != null && existing != controller) {
+            // Put the existing controller back:
+            openControllers.put(controller.getFlow(), existing);
+            throw new IllegalStateException("Flow already opened" + existing);
+        }
+
+        for (FlowLifeCycleListener listener : lifeCycleWatchers) {
+            listener.onFlowOpened(this, controller.getFlow());
+        }
+    }
+
+    protected synchronized final void onFlowClosed(Flow flow) {
+        FlowController<E> existing = openControllers.remove(flow);
+
+        if (existing != null) {
+            for (FlowLifeCycleListener listener : lifeCycleWatchers) {
+                listener.onFlowClosed(this, existing.getFlow());
+            }
+        }
+    }
+
+    public synchronized void removeFlowLifeCycleListener(FlowLifeCycleListener listener) {
+        lifeCycleWatchers.remove(listener);
+    }
+
+    /**
+     * Gets the flow controller corresponding to the specified flow.
+     * 
+     * @param flow
+     *            The flow
+     * @return The FlowController
+     */
+    public synchronized FlowController<E> getFlowController(Flow flow) {
+        return openControllers.get(flow);
+    }
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowSource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowSource.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowSource.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowSource.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+public abstract class AbstractLimitedFlowSource<E> extends AbstractLimitedFlowResource<E> {
+
+    protected boolean autoRelease = false;
+    protected IFlowDrain<E> drain;
+
+    protected AbstractLimitedFlowSource() {
+        super();
+    }
+
+    protected AbstractLimitedFlowSource(String name) {
+        super(name);
+    }
+
+    /**
+     * Can be set to automatically release space on dequeue. When set the caller
+     * does not need to call IFlowController.elementDispatched()
+     * 
+     * @param val
+     */
+    public synchronized void setAutoRelease(boolean val) {
+        autoRelease = val;
+    }
+
+    /**
+     * Returns whether or not this {@link IFlowSource} is set to automatically
+     * release elements via {@link FlowController#elementDispatched(Object)
+     * during dispatch. When auto release is set the caller <i>must</i> not call
+     * {@link FlowController#elementDispatched(Object).
+     */
+    public synchronized boolean getAutoRelease() {
+        return autoRelease;
+    }
+
+    public synchronized void setDrain(IFlowDrain<E> drain) {
+
+        this.drain = drain;
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimiter.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimiter.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimiter.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+import java.util.LinkedList;
+
+public abstract class AbstractLimiter<E> implements IFlowLimiter<E> {
+
+    private LinkedList<UnThrottleListener> throttleListeners = new LinkedList<UnThrottleListener>();
+    private boolean resuming;
+
+    public AbstractLimiter() {
+    }
+
+    public final void addUnThrottleListener(UnThrottleListener l) {
+        throttleListeners.add(l);
+
+        if (!resuming && !getThrottled()) {
+            notifyUnThrottleListeners();
+        }
+    }
+
+    public final void notifyUnThrottleListeners() {
+        resuming = true;
+        while (!getThrottled() && !throttleListeners.isEmpty()) {
+            UnThrottleListener l = throttleListeners.remove();
+            l.onUnthrottled();
+        }
+        resuming = false;
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/Flow.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/Flow.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/Flow.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/Flow.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ */
+final public class Flow {
+    static final private AtomicLong FLOW_COUNTER = new AtomicLong();
+
+    final private long flowID;
+    final private String flowName;
+    final private boolean dynamic;
+    final private int hashCode;
+
+    /**
+     * Package scoped constructor.
+     * 
+     * @param name
+     *            The flow name.
+     * @param id
+     *            The flow id.
+     */
+    public Flow(String name, boolean dynamic) {
+        this.flowID = FLOW_COUNTER.incrementAndGet();
+        this.flowName = name;
+        this.dynamic = dynamic;
+        this.hashCode = (int) (flowID ^ (flowID >>> 32));
+    }
+
+    /**
+     * @see org.apache.activemq.flow.Flow#getFlowID()
+     */
+    public long getFlowID() {
+        return flowID;
+    }
+
+    /**
+     * @see Flow#getFlowName()
+     */
+    public String getFlowName() {
+        return flowName;
+    }
+
+    /**
+     * @see Flow#isDynamic()
+     */
+    public boolean isDynamic() {
+        return dynamic;
+    }
+
+    public int hashCode() {
+        return hashCode;
+    }
+
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o instanceof Flow) {
+            return equals((Flow) o);
+        }
+
+        return false;
+    }
+
+    public boolean equals(Flow flow) {
+        return flowID == flow.getFlowID();
+    }
+
+    public String toString() {
+        return getFlowName();
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,440 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import org.apache.activemq.flow.IFlowLimiter.UnThrottleListener;
+
+/**
+ */
+public class FlowController<E> implements ISinkController<E>, ISourceController<E> {
+
+    // Sinks that are blocking us.
+    private final HashSet<ISinkController<E>> blockingSinks = new HashSet<ISinkController<E>>();
+
+    // Holds the sources that this limiter is currently blocking
+    private final HashSet<ISourceController<E>> blockedSources = new HashSet<ISourceController<E>>();
+
+    // Holds the sources that this limiter is currently blocking
+    private final HashSet<FlowUnblockListener<E>> unblockListeners = new HashSet<FlowUnblockListener<E>>();
+
+    // Callback for the IFlowLimiter to notify us that it is unthrottled:
+    private final UnThrottleListener unthrottleListener;
+
+    // The flow being flow controlled:
+    protected Flow flow;
+
+    // Used to synchronize access to this controller by downstream sinks:
+    // private final ReentrantLock sinkLock = new
+    // java.util.concurrent.locks.ReentrantLock();
+
+    // True if the flow is blocked:
+    protected boolean blocked = false;
+
+    // Set to true while resuming. New Sources must wait to enqueue while
+    // old sources are being resumed.
+    private boolean resuming = false;
+
+    // Marks that we have scheduled a resume
+    private boolean resumeScheduled = false;
+
+    // The acceptor for elements from this flow.
+    private FlowControllable<E> controllable;
+
+    // The limiter
+    private IFlowLimiter<E> limiter;
+
+    // Mutex for synchronization
+    private Object mutex;
+
+    private boolean useOverFlowQueue = true;
+
+    // List of elements that were added while the flow is blocked.
+    // These aren't added to the resource until the flow becomes
+    // unblocked.
+    private LinkedList<E> overflowQueue = new LinkedList<E>();
+
+    // true we registered as an unthrottle listener:
+    private boolean throttleReg;
+    private boolean notifyUnblock = false;
+    private String name;
+
+    public FlowController() {
+        this.unthrottleListener = new UnThrottleListener() {
+            public final void onUnthrottled() {
+                FlowController.this.onUnthrottled();
+            }
+        };
+    }
+
+    public FlowController(FlowControllable<E> controllable, Flow flow, IFlowLimiter<E> limiter, Object mutex) {
+        this();
+        this.controllable = controllable;
+        this.flow = flow;
+        this.limiter = limiter == null ? new SizeLimiter<E>(0, 0) : limiter;
+        this.mutex = mutex;
+        this.name = controllable.getFlowSource().toString();
+    }
+
+    public final IFlowLimiter<E> getLimiter() {
+        return limiter;
+    }
+
+    public final void setLimiter(IFlowLimiter<E> limiter) {
+        synchronized (mutex) {
+            this.limiter = limiter;
+            onUnthrottled();
+        }
+    }
+
+    /**
+     * Sets whether the controller uses an overflow queue to prevent overflow.
+     */
+    public final void useOverFlowQueue(boolean val) {
+        useOverFlowQueue = val;
+    }
+
+    public final Flow getFlow() {
+        return flow;
+    }
+
+    /**
+     * Should be called by a resource anytime it's limits are exceeded.
+     */
+    public final void onFlowBlock(ISinkController<E> sinkController) {
+        synchronized (mutex) {
+            if (!blockingSinks.add(sinkController)) {
+                throw new IllegalStateException(sinkController + " has already blocked: " + this);
+            }
+
+            if (!blocked) {
+                blocked = true;
+                // System.out.println(this + " BLOCKED");
+            }
+        }
+    }
+
+    public final void onFlowResume(ISinkController<E> sinkController) {
+        synchronized (mutex) {
+            if (!blockingSinks.remove(sinkController)) {
+                throw new IllegalStateException(sinkController + " can't resume unblocked " + this);
+            }
+
+            if (blockingSinks.isEmpty()) {
+                if (blocked) {
+                    blocked = false;
+                    limiter.releaseReserved();
+                }
+            }
+        }
+    }
+
+    /**
+     * Must be called once the elements have been sent to downstream sinks.
+     * 
+     * @param elem
+     *            The dispatched element.
+     * @return
+     */
+    public final void elementDispatched(E elem) {
+
+        synchronized (mutex) {
+            // If we were blocked in the course of dispatching the message don't
+            // decrement
+            // the limiter space:
+            if (blocked) {
+                limiter.reserve(elem);
+                return;
+            }
+            limiter.remove(elem);
+        }
+    }
+
+    public final boolean isSinkBlocked() {
+        synchronized (mutex) {
+            return limiter.getThrottled();
+        }
+    }
+
+    public final boolean isSourceBlocked() {
+        synchronized (mutex) {
+            return blocked;
+        }
+    }
+
+    /**
+     * Waits for a flow to become unblocked.
+     * 
+     * @param flow
+     *            The flow.
+     * @throws InterruptedException
+     *             If interrupted while waiting.
+     */
+    public void waitForFlowUnblock() throws InterruptedException {
+        synchronized (mutex) {
+            while (limiter.getThrottled()) {
+                notifyUnblock = true;
+                setUnThrottleListener();
+                mutex.wait();
+            }
+            notifyUnblock = false;
+        }
+    }
+
+    public IFlowSource<E> getFlowSource() {
+        return controllable.getFlowSource();
+    }
+
+    /**
+     * Adds an element to the sink associated with this resource if space is
+     * available. If no space is available the source controller will be
+     * blocked, and the source is responsible for tracking the space until this
+     * controller resumes.
+     * 
+     * @param elem
+     *            The element to add.
+     * @param controller
+     *            the source flow controller.
+     */
+    public void add(E elem, ISourceController<E> sourceController) {
+        boolean ok = false;
+        synchronized (mutex) {
+            // If we don't have an fc sink, then just increment the limiter.
+            if (controllable == null) {
+                limiter.add(elem);
+                return;
+            }
+            if (okToAdd(elem)) {
+                ok = true;
+                if (limiter.add(elem)) {
+                    blockSource(sourceController);
+                    setUnThrottleListener();
+                }
+            } else {
+                // Add to overflow queue and block source:
+                overflowQueue.add(elem);
+                blockSource(sourceController);
+                setUnThrottleListener();
+            }
+        }
+        if (ok) {
+            controllable.flowElemAccepted(this, elem);
+        }
+    }
+
+    /**
+     * Offers an element to the sink associated with this resource if space is
+     * available. If no space is available false is returned. The element does
+     * not get added to the overflow list.
+     * 
+     * @param elem
+     *            The element to add.
+     * @param controller
+     *            the source flow controller.
+     */
+    public boolean offer(E elem, ISourceController<E> sourceController) {
+        synchronized (mutex) {
+            // If we don't have an fc sink, then just increment the limiter.
+            if (controllable == null) {
+                limiter.add(elem);
+                return true;
+            }
+
+            if (okToAdd(elem)) {
+                if (limiter.add(elem)) {
+                    blockSource(sourceController);
+                    setUnThrottleListener();
+                }
+                controllable.flowElemAccepted(this, elem);
+                return true;
+            } else {
+                blockSource(sourceController);
+                setUnThrottleListener();
+                return false;
+            }
+        }
+    }
+
+    private boolean okToAdd(E elem) {
+        return !useOverFlowQueue || limiter.canAdd(elem);
+    }
+
+    private void addToResource(E elem) {
+        if (limiter != null)
+            limiter.add(elem);
+        if (controllable != null)
+            controllable.flowElemAccepted(this, elem);
+    }
+
+    private void setUnThrottleListener() {
+        if (!throttleReg) {
+            throttleReg = true;
+            limiter.addUnThrottleListener(unthrottleListener);
+        }
+    }
+
+    public void onUnthrottled() {
+        synchronized (mutex) {
+            throttleReg = false;
+            dispatchOverflowQueue();
+        }
+    }
+
+    /**
+     * Blocks a source.
+     * 
+     * @param source
+     *            The {@link ISinkController} of the source to be blocked.
+     */
+    protected void blockSource(final ISourceController<E> source) {
+        if (source == null) {
+            return;
+        }
+
+        // If we are currently in the process of resuming we
+        // must wait for resume to complete, before we add to
+        // the blocked list:
+        waitForResume();
+
+        if (!blockedSources.contains(source)) {
+            // System.out.println("BLOCKING  : SINK["+this + "], SOURCE[" +
+            // source+"]");
+            blockedSources.add(source);
+            source.onFlowBlock(this);
+        }
+    }
+
+    private void dispatchOverflowQueue() {
+
+        // Dispatch elements on the blocked list into the limited resource
+        while (!overflowQueue.isEmpty()) {
+            E elem = overflowQueue.getFirst();
+            if (limiter.canAdd(elem)) {
+                overflowQueue.removeFirst();
+                addToResource(elem);
+            } else {
+                break;
+            }
+        }
+
+        // See if we can now unblock the sources.
+        checkUnblockSources();
+
+        // If we've exceeded the the throttle threshold, register
+        // a listener so we can resume the blocked sources after
+        // the limiter falls below the threshold:
+        if (!overflowQueue.isEmpty()) {
+            setUnThrottleListener();
+        } else if (notifyUnblock) {
+            mutex.notifyAll();
+        }
+    }
+
+    /**
+     * Called to wait for a flow to become unblocked.
+     * 
+     * @param listener
+     *            The listener.
+     * @return true;
+     */
+    public final boolean addUnblockListener(FlowUnblockListener<E> listener) {
+        synchronized (mutex) {
+            waitForResume();
+            if (limiter.getThrottled() || !overflowQueue.isEmpty()) {
+                unblockListeners.add(listener);
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private final void waitForResume() {
+        boolean interrupted = false;
+        while (resuming) {
+            try {
+                mutex.wait();
+            } catch (InterruptedException e) {
+                interrupted = true;
+            }
+        }
+
+        if (interrupted) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    /**
+     * Releases blocked sources providing the limiter isn't throttled, and there
+     * are no elements on the blocked list.
+     */
+    private void checkUnblockSources() {
+        if (!resumeScheduled && !limiter.getThrottled() && overflowQueue.isEmpty() && (!blockedSources.isEmpty() || !unblockListeners.isEmpty())) {
+            resumeScheduled = true;
+            Runnable resume = new Runnable() {
+                public void run() {
+                    synchronized (mutex) {
+                        resuming = true;
+                    }
+                    String was = Thread.currentThread().getName();
+                    try {
+                        Thread.currentThread().setName(name);
+                        for (ISourceController<E> source : blockedSources) {
+                            // System.out.println("UNBLOCKING: SINK["+FlowController.this
+                            // + "], SOURCE[" + source+"]");
+                            source.onFlowResume(FlowController.this);
+                        }
+                        for (FlowUnblockListener<E> listener : unblockListeners) {
+                            // System.out.println(this + "Unblocking source " +
+                            // source );
+                            listener.onFlowUnblocked(FlowController.this);
+                        }
+
+                    } finally {
+                        synchronized (mutex) {
+                            blockedSources.clear();
+                            unblockListeners.clear();
+                            resuming = false;
+                            resumeScheduled = false;
+                            mutex.notifyAll();
+                        }
+                        Thread.currentThread().setName(was);
+                    }
+                }
+            };
+
+            RESUME_SERVICE.execute(resume);
+        }
+    }
+
+    private static Executor RESUME_SERVICE = Executors.newCachedThreadPool();
+
+    public static final void setFlowExecutor(Executor executor) {
+        RESUME_SERVICE = executor;
+    }
+
+    public String toString() {
+        return name;
+    }
+
+    public IFlowSink<E> getFlowSink() {
+        return controllable.getFlowSink();
+    }
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+/**
+ * Defines an interface for draining a flow source.
+ * 
+ * @param <E>
+ */
+public interface IFlowDrain<E> {
+
+    /**
+	 * Used by a FlowSource that is being dispatched to drain it's elements.
+	 * The implementor is responsible for calling {@link ISourceController#elementDispatched(Object)
+	 * when the element has been dispatched to all downstream sinks if IFlowSource
+	 * @param elem
+	 * @param controller
+	 */
+    public void drain(E elem, ISourceController<E> controller);
+}