You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/02/11 21:12:30 UTC

svn commit: r743476 [2/4] - in /activemq/sandbox/activemq-flow: ./ src/ src/main/ src/main/java/ src/main/java/com/ src/main/java/com/progress/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/activemq/ src/main/java/org/apache/act...

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowLimiter.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowLimiter.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowLimiter.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+public interface IFlowLimiter<E> {
+    public interface UnThrottleListener {
+        // Called when a throttled limiter falls below the throttle
+        // threshold.
+        public void onUnthrottled();
+    }
+
+    public void reserve(E elem);
+
+    public void releaseReserved();
+
+    public boolean canAdd(E elem);
+
+    /**
+     * Adds an element to the limiter, returning true if this results in the
+     * limiter being throttled.
+     * 
+     * @param elem
+     *            The element to add.
+     * @return True if this triggered throttling.
+     */
+    public boolean add(E elem);
+
+    public void remove(E elem);
+
+    public boolean getThrottled();
+
+    public void addUnThrottleListener(UnThrottleListener listener);
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowResource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowResource.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowResource.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowResource.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public interface IFlowResource {
+
+    /**
+     * A listener for when new flows are opened and closed for this resource.
+     */
+    public interface FlowLifeCycleListener {
+        public void onFlowOpened(IFlowResource source, Flow flow);
+
+        public void onFlowClosed(IFlowResource source, Flow flow);
+    }
+
+    /**
+     * Adds a {@link FlowLifeCycleListener} for flows opened by the source.
+     * 
+     * @param listener
+     *            The listener.
+     */
+    public void addFlowLifeCycleListener(FlowLifeCycleListener listener);
+
+    /**
+     * Called to remove a previously added {@link FlowLifeCycleListener}.
+     * 
+     * @param listener
+     *            The listener.
+     */
+    public void removeFlowLifeCycleListener(FlowLifeCycleListener listener);
+
+    static final public AtomicLong RESOURCE_COUNTER = new AtomicLong();
+
+    /**
+     * Gets the unique resource id associated with this resource
+     * 
+     * @retrun the unique resource id.
+     */
+    public long getResourceId();
+
+    public String getResourceName();
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+public interface IFlowSink<E> extends IFlowResource {
+    /**
+     * Adds an element to the sink. If limiter space in the sink is overflowed
+     * by the element then it will block the source controller.
+     * 
+     * @param elem
+     *            The element to add to the sink.
+     * @param source
+     *            The source's flow controller.
+     */
+    public void add(E elem, ISourceController<E> source);
+
+    /**
+     * Offers an element to the sink. If there is no room available the source's
+     * controller will be blocked and the element will not be added.
+     * 
+     * @param elem
+     *            The element to offer
+     * @param source
+     *            The source's controller.
+     * @return false if the element wasn't accepted.
+     */
+    public boolean offer(E elem, ISourceController<E> source);
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSource.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSource.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSource.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+/**
+ * 
+ */
+public interface IFlowSource<E> extends IFlowResource {
+
+    /**
+     * Gets the {@link ISourceController} for the specified flow.
+     * 
+     * @param flow
+     *            The flow.
+     * @return The flow controller for the specified flow.
+     */
+    public FlowController<E> getFlowController(Flow flow);
+
+    /**
+     * If set to true the source will automatically release limiter space
+     * associated with {@link IFlowElem}s as they are dispacthed. If set to
+     * false then the {@link IFlowDrain} must release space via a call to
+     * {@link ISourceController#elementDispatched(IFlowElem)}.
+     * 
+     * @param autoRelease
+     *            If the source should release limiter space for elements.
+     */
+    public void setAutoRelease(boolean autoRelease);
+
+/**
+	 * Returns whether or not this {@link IFlowSource} is set to automatically
+	 * release elements via {@link FlowController#elementDispatched(Object) during
+	 * dispatch. When auto release is set the caller <i>must</i> not call 
+	 * {@link FlowController#elementDispatched(Object). 
+	 * 
+	 * @return true if auto release is set, false otherwise. 
+	 */
+    public boolean getAutoRelease();
+
+    /**
+     * Sets the default drain for elements from this flow source. It will be
+     * invoked to dispatch elements from the source.
+     * 
+     * @param drain
+     *            The drain.
+     */
+    public void setDrain(IFlowDrain<E> drain);
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+public interface ISinkController<E> {
+    /**
+     * Defines required attributes for an entity that can be flow controlled.
+     * 
+     * @param <E>
+     */
+    public interface FlowControllable<E> {
+        public void flowElemAccepted(ISourceController<E> controller, E elem);
+
+        public IFlowSink<E> getFlowSink();
+
+        public IFlowSource<E> getFlowSource();
+    }
+
+    /**
+     * Used to get a notification when a blocked controller becomes unblocked
+     * 
+     * @param <E>
+     */
+    public interface FlowUnblockListener<E> {
+        public void onFlowUnblocked(ISinkController<E> controller);
+    }
+
+    /**
+     * Offers an element to the sink associated with this resource if space is
+     * available. If no space is available false is returned. The element does
+     * not get added to the overflow list.
+     * 
+     * @param elem
+     *            The element to add.
+     * @param controller
+     *            the source flow controller.
+     */
+    public boolean offer(E elem, ISourceController<E> sourceController);
+
+    /**
+     * Adds an element to the sink associated with this resource if space is
+     * available. If no space is available the source controller will be
+     * blocked, and the source is responsible for tracking the space until this
+     * controller resumes.
+     * 
+     * @param elem
+     *            The element to add.
+     * @param controller
+     *            the source flow controller.
+     */
+    public void add(E elem, ISourceController<E> controller);
+
+    /**
+     * Called to check if this FlowController is currently being blocked
+     * 
+     * @return True if the flow is blocked.
+     */
+    public boolean isSinkBlocked();
+
+    /**
+     * Waits for a flow to become unblocked.
+     * 
+     * @param flow
+     *            The flow.
+     * @throws InterruptedException
+     *             If interrupted while waiting.
+     */
+    public void waitForFlowUnblock() throws InterruptedException;
+
+    /**
+     * Sets a callback for the listener if this controller is currently blocked.
+     * 
+     * @param listener
+     *            The listener.
+     * @return True if a listener was registered false otherwise.
+     */
+    public boolean addUnblockListener(FlowUnblockListener<E> listener);
+
+    public IFlowSink<E> getFlowSink();
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+/**
+ * The control interface to a source. Sinks typically call back to a
+ * ISourceController to suspend or resumed the dispatching of messages.
+ * 
+ * @param <E>
+ */
+public interface ISourceController<E> {
+
+    /**
+     * Returns the source that this FlowController is controlling.
+     * 
+     * @return The source that the flow controller is controlling.
+     */
+    public IFlowSource<E> getFlowSource();
+
+    /**
+     * Gets the flow that this controller is controlling.
+     * 
+     * @return
+     */
+    public Flow getFlow();
+
+    /**
+     * This is called when a particular flow is blocked for a resource
+     * 
+     * @param sink
+     *            The sink blocking this source
+     */
+    public void onFlowBlock(ISinkController<E> sink);
+
+    /**
+     * Callback used with FlowControllers to get a notification that an
+     * IFlowController has been resumed.
+     * 
+     * @param controller
+     *            The IFlowController that was unblocked.
+     */
+    public void onFlowResume(ISinkController<E> sink);
+
+    public boolean isSourceBlocked();
+
+    /**
+     * Must be called once the elements have been sent to downstream sinks.
+     * 
+     * @param elem
+     *            The dispatched element.
+     * @return
+     */
+    public void elementDispatched(E elem);
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+public class NoOpFlowController<E> implements ISinkController<E> {
+    private final IFlowSource<E> source;
+    private final Flow flow;
+
+    public NoOpFlowController(IFlowSource<E> source, Flow flow) {
+        this.source = source;
+        this.flow = flow;
+    }
+
+    public IFlowSource<E> getFlowSource() {
+        return source;
+    }
+
+    public Flow getFlow() {
+        // TODO Auto-generated method stub
+        return flow;
+    }
+
+    public boolean isSinkBlocked() {
+        return false;
+    }
+
+    public void onFlowBlock(IFlowSink<E> sink) {
+        // Noop
+    }
+
+    public void onFlowResume(IFlowSink<E> sink) {
+        // Noop
+    }
+
+    /**
+     * Must be called once the elements have been sent to downstream sinks.
+     * 
+     * @param elem
+     *            The dispatched element.
+     * @return
+     */
+    public void elementDispatched(E elem) {
+        // No op for basic flow controller
+    }
+
+    public String toString() {
+        return "DISABLED Flow Controller for: " + source;
+    }
+
+    public boolean offer(E elem, ISourceController<E> sourceController) {
+        throw new UnsupportedOperationException();
+    }
+
+    public void add(E elem, ISourceController<E> controller) {
+        throw new UnsupportedOperationException();
+    }
+
+    public void waitForFlowUnblock() throws InterruptedException {
+        // TODO Auto-generated method stub
+
+    }
+
+    /**
+     * Sets a callback for the listener if this controller is currently blocked.
+     * 
+     * @param listener
+     *            The listener.
+     * @return True if a listener was registered false otherwise.
+     */
+    public boolean addUnblockListener(FlowUnblockListener<E> listener) {
+        return false;
+    }
+
+    public IFlowSink<E> getFlowSink() {
+        return null;
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+import org.apache.activemq.queue.Mapper;
+
+public class PriorityFlowController<E> implements ISourceController<E>, ISinkController<E> {
+
+    private final Object mutex;
+    private final FlowController<E> controllers[];
+    private final PrioritySizeLimiter<E> limiter;
+
+    private Mapper<Integer, E> priorityMapper;
+
+    private final Flow flow;
+    private final FlowControllable<E> controllable;
+
+    public PriorityFlowController(int priorities, FlowControllable<E> controllable, Flow flow, Object mutex, int capacity, int resume) {
+        this.controllable = controllable;
+        this.flow = flow;
+        this.mutex = mutex;
+        this.limiter = new PrioritySizeLimiter<E>(capacity, resume, priorities);
+        this.limiter.setPriorityMapper(priorityMapper);
+        this.controllers = createControlerArray(priorities);
+        for (int i = 0; i < priorities; i++) {
+            this.controllers[i] = new FlowController<E>(controllable, flow, limiter.getPriorityLimter(i), mutex);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private FlowController<E>[] createControlerArray(int priorities) {
+        return new FlowController[priorities];
+    }
+
+    // /////////////////////////////////////////////////////////////////
+    // ISinkController interface impl.
+    // /////////////////////////////////////////////////////////////////
+
+    public boolean offer(E elem, ISourceController<E> controller) {
+        int prio = priorityMapper.map(elem);
+        return controllers[prio].offer(elem, controller);
+    }
+
+    public void add(E elem, ISourceController<E> controller) {
+        int prio = priorityMapper.map(elem);
+        controllers[prio].add(elem, controller);
+    }
+
+    public boolean isSinkBlocked() {
+        synchronized (mutex) {
+            return limiter.getThrottled();
+        }
+    }
+
+    public boolean addUnblockListener(org.apache.activemq.flow.ISinkController.FlowUnblockListener<E> listener) {
+        boolean rc = false;
+        for (int i = 0; i < controllers.length; i++) {
+            rc |= this.controllers[i].addUnblockListener(listener);
+        }
+        return rc;
+    }
+
+    public void waitForFlowUnblock() throws InterruptedException {
+        throw new UnsupportedOperationException();
+    }
+
+    // /////////////////////////////////////////////////////////////////
+    // ISourceController interface impl.
+    // /////////////////////////////////////////////////////////////////
+
+    public void elementDispatched(E elem) {
+        FlowController<E> controler = controllers[priorityMapper.map(elem)];
+        controler.elementDispatched(elem);
+    }
+
+    public Flow getFlow() {
+        return flow;
+    }
+
+    public IFlowSource<E> getFlowSource() {
+        return controllable.getFlowSource();
+    }
+
+    public void onFlowBlock(ISinkController<E> sink) {
+        for (int i = 0; i < controllers.length; i++) {
+            controllers[i].onFlowBlock(sink);
+        }
+    }
+
+    public void onFlowResume(ISinkController<E> sink) {
+        for (int i = 0; i < controllers.length; i++) {
+            controllers[i].onFlowBlock(sink);
+        }
+    }
+
+    public boolean isSourceBlocked() {
+        return false;
+    }
+
+    // /////////////////////////////////////////////////////////////////
+    // Getters and Setters
+    // /////////////////////////////////////////////////////////////////
+
+    public Mapper<Integer, E> getPriorityMapper() {
+        return priorityMapper;
+    }
+
+    public void setPriorityMapper(Mapper<Integer, E> priorityMapper) {
+        this.priorityMapper = priorityMapper;
+        limiter.setPriorityMapper(priorityMapper);
+    }
+
+    public IFlowSink<E> getFlowSink() {
+        return controllable.getFlowSink();
+    }
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+import java.util.ArrayList;
+
+import org.apache.activemq.queue.Mapper;
+
+public class PrioritySizeLimiter<E> {
+
+    final private ArrayList<Priority> priorities = new ArrayList<Priority>();
+    final protected int capacity;
+    final protected int resumeThreshold;
+
+    private int totalSize;
+    private int throttledCount = 0;
+    private int highestPriority;
+
+    private Mapper<Integer, E> sizeMapper = new Mapper<Integer, E>() {
+        public Integer map(E element) {
+            return 1;
+        }
+    };
+
+    private Mapper<Integer, E> priorityMapper = sizeMapper;
+
+    private class Priority extends AbstractLimiter<E> {
+        final int priority;
+        int size;
+        int reserved;
+        private boolean throttled;
+
+        public Priority(int priority) {
+            this.priority = priority;
+        }
+
+        public boolean add(E elem) {
+            int elementSize = sizeMapper.map(elem);
+            totalSize += elementSize;
+            size += elementSize;
+            if (totalSize >= capacity) {
+                if (!throttled) {
+                    throttled = true;
+                    throttledCount++;
+                }
+            }
+            if (priority >= highestPriority) {
+                highestPriority = priority;
+            }
+            return throttled;
+        }
+
+        public boolean canAdd(E elem) {
+            if (throttled)
+                return false;
+
+            int prio = priorityMapper.map(elem);
+            if (prio < highestPriority) {
+                if (!throttled) {
+                    throttled = true;
+                    throttledCount++;
+                }
+                return false;
+            }
+
+            return true;
+        }
+
+        public boolean getThrottled() {
+            return throttled;
+        }
+
+        public void releaseReserved() {
+            if (reserved > 0) {
+                int res = reserved;
+                reserved = 0;
+                remove(res);
+            }
+        }
+
+        public void remove(E elem) {
+            int size = sizeMapper.map(elem);
+            remove(size);
+        }
+
+        protected void remove(int s) {
+            size -= s;
+            totalSize -= s;
+
+            assert size >= 0 : "Negative limiter size: " + size;
+            assert totalSize >= 0 : "Negative limiter total size:" + totalSize;
+
+            if (totalSize <= resumeThreshold) {
+                priorities.get(highestPriority).unThrottle();
+            }
+        }
+
+        public void unThrottle() {
+            if (throttled) {
+                throttled = false;
+                throttledCount--;
+                notifyUnThrottleListeners();
+
+                // Has the highest priority level emptied out?
+                if (size == 0 && priority == highestPriority) {
+                    // Set highestPriority to the new highest priority level
+                    highestPriority = 0;
+                    for (int i = priority - 1; i >= 0; i--) {
+                        Priority p = priorities.get(i);
+                        if (p.size > 0 || p.throttled) {
+                            highestPriority = i;
+                            if (totalSize <= resumeThreshold) {
+                                p.unThrottle();
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        public void reserve(E elem) {
+            reserved += sizeMapper.map(elem);
+        }
+    }
+
+    public PrioritySizeLimiter(int capacity, int resumeThreshold, int priorities) {
+        this.capacity = capacity;
+        this.resumeThreshold = resumeThreshold;
+        for (int i = 0; i < priorities; i++) {
+            this.priorities.add(new Priority(i));
+        }
+    }
+
+    public IFlowLimiter<E> getPriorityLimter(int priority) {
+        return priorities.get(priority);
+    }
+
+    public Mapper<Integer, E> getSizeMapper() {
+        return sizeMapper;
+    }
+
+    public void setSizeMapper(Mapper<Integer, E> sizeMapper) {
+        this.sizeMapper = sizeMapper;
+    }
+
+    public Mapper<Integer, E> getPriorityMapper() {
+        return priorityMapper;
+    }
+
+    public void setPriorityMapper(Mapper<Integer, E> priorityMapper) {
+        this.priorityMapper = priorityMapper;
+    }
+
+    public boolean getThrottled() {
+        return throttledCount > 0;
+    }
+
+    public int getPriorities() {
+        return priorities.size();
+    }
+
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+public class SizeLimiter<E> extends AbstractLimiter<E> {
+
+    protected int capacity;
+    protected int resumeThreshold;
+
+    private int size;
+    private boolean throttled;
+    private int reserved;
+
+    public SizeLimiter(int capacity, int resumeThreshold) {
+        this.capacity = capacity;
+        throttled = false;
+        this.resumeThreshold = resumeThreshold;
+    }
+
+    public final boolean add(E elem) {
+        this.size += getElementSize(elem);
+
+        if (this.size >= capacity) {
+            throttled = true;
+        }
+        return throttled;
+    }
+
+    public final void remove(E elem) {
+        remove(getElementSize(elem));
+    }
+
+    public void reserve(E elem) {
+        reserved += getElementSize(elem);
+    }
+
+    public void releaseReserved() {
+        if (reserved > 0) {
+            int res = reserved;
+            reserved = 0;
+            remove(res);
+        }
+    }
+
+    protected void remove(int s) {
+        this.size -= s;
+        if (size < 0) {
+            Exception ie = new IllegalStateException("Size Negative!" + size);
+            ie.printStackTrace();
+        }
+
+        if (throttled && this.size <= resumeThreshold) {
+            throttled = false;
+            notifyUnThrottleListeners();
+        }
+    }
+
+    public final void reset() {
+        size = 0;
+        notifyUnThrottleListeners();
+    }
+
+    /**
+     * Subclasses should override to return the size of an element
+     * 
+     * @param elem
+     */
+    public int getElementSize(E elem) {
+        return 1;
+    }
+
+    public final boolean getThrottled() {
+        return throttled;
+    }
+
+    public boolean canAdd(E elem) {
+        return !throttled;
+    }
+
+    public int getCapacity() {
+        return capacity;
+    }
+
+    public int getResumeThreshold() {
+        return resumeThreshold;
+    }
+
+    public int getSize() {
+        return size;
+    }
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Metric.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Metric.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Metric.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Metric.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.metric;
+
+abstract public class Metric {
+
+    private String name;
+    private String unit = "items";
+
+    public Metric name(String name) {
+        this.name = name;
+        return this;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public abstract long counter();
+
+    public Metric unit(String unit) {
+        this.unit = unit;
+        return this;
+    }
+
+    public String getUnit() {
+        return unit;
+    }
+
+    public void setUnit(String unit) {
+        this.unit = unit;
+    }
+
+    public String getRateSummary(Period period) {
+        return String.format("%s: %(,.2f %s/s", name, period.rate(counter()), unit);
+    }
+
+    abstract public void reset();
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricAggregator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricAggregator.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricAggregator.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricAggregator.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.metric;
+
+import java.util.ArrayList;
+
+public class MetricAggregator extends Metric {
+
+    ArrayList<Metric> metrics = new ArrayList<Metric>();
+
+    public MetricAggregator name(String name) {
+        return (MetricAggregator) super.name(name);
+    }
+
+    public MetricAggregator unit(String unit) {
+        return (MetricAggregator) super.unit(unit);
+    }
+
+    public void add(Metric metric) {
+        metrics.add(metric);
+        if (getUnit() != null) {
+            metric.setUnit(getUnit());
+        }
+    }
+
+    public boolean remove(Metric metric) {
+        return metrics.remove(metric);
+    }
+
+    public Float average() {
+        if (metrics.isEmpty()) {
+            return null;
+        }
+        long rc = 0;
+        int count = 0;
+        for (Metric metric : metrics) {
+            rc += metric.counter();
+            count++;
+        }
+        return rc * 1.0f / count;
+    }
+
+    public long total() {
+        long rc = 0;
+        for (Metric metric : metrics) {
+            rc += metric.counter();
+        }
+        return rc;
+    }
+
+    public Long min() {
+        if (metrics.isEmpty()) {
+            return null;
+        }
+        long rc = Long.MAX_VALUE;
+        for (Metric metric : metrics) {
+            long t = metric.counter();
+            if (t < rc) {
+                rc = t;
+            }
+        }
+        return rc;
+    }
+
+    public Long max() {
+        if (metrics.isEmpty()) {
+            return null;
+        }
+        long rc = Long.MIN_VALUE;
+        for (Metric metric : metrics) {
+            long t = metric.counter();
+            if (t > rc) {
+                rc = t;
+            }
+        }
+        return rc;
+    }
+
+    @Override
+    public long counter() {
+        return total();
+    }
+
+    public String getRateSummary(Period period) {
+        return String
+                .format("%s: total=%(,.2f, avg=%(,.2f, min=%(,.2f, max=%(,.2f in %s/s", getName(), period.rate(total()), period.rate(average()), period.rate(min()), period.rate(max()), getUnit());
+    }
+
+    public String getChildRateSummary(Period period) {
+        StringBuilder rc = new StringBuilder();
+        rc.append("{\n");
+        for (Metric metric : metrics) {
+            rc.append("  ");
+            rc.append(metric.getRateSummary(period));
+            rc.append("\n");
+        }
+        rc.append("}");
+        return rc.toString();
+    }
+
+    @Override
+    public void reset() {
+        for (Metric metric : metrics) {
+            metric.reset();
+        }
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricCounter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricCounter.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricCounter.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricCounter.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.metric;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class MetricCounter extends Metric {
+
+    AtomicLong counter = new AtomicLong();
+
+    public MetricCounter name(String name) {
+        return (MetricCounter) super.name(name);
+    }
+
+    public final long increment(long delta) {
+        return counter.addAndGet(delta);
+    }
+
+    public final long increment() {
+        return counter.incrementAndGet();
+    }
+
+    @Override
+    public final long counter() {
+        return counter.get();
+    }
+
+    @Override
+    public void reset() {
+        counter.set(0);
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Period.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Period.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Period.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Period.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.metric;
+
+public class Period {
+
+    long start = System.currentTimeMillis();
+    long end;
+
+    public long getStart() {
+        return start;
+    }
+
+    public void setStart(long start) {
+        this.start = start;
+    }
+
+    public long getEnd() {
+        if (end == 0) {
+            end = System.currentTimeMillis();
+        }
+        return end;
+    }
+
+    public void setEnd(long end) {
+        this.end = end;
+    }
+
+    public void reset() {
+        start = System.currentTimeMillis();
+        end = 0;
+    }
+
+    public long duration() {
+        return getEnd() - getStart();
+    }
+
+    public Float rate(Long counter) {
+        if (counter == null) {
+            return null;
+        }
+        return ((counter * 1000f) / duration());
+    }
+
+    public Float rate(Float counter) {
+        if (counter == null) {
+            return null;
+        }
+        return ((counter * 1000f) / duration());
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
+import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
+import org.apache.activemq.flow.AbstractLimitedFlowSource;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.IFlowSource;
+import org.apache.activemq.flow.ISinkController.FlowControllable;
+
+/**
+ * Base class for a {@link Dispatchable} {@link FlowControllable}
+ * {@link IFlowQueue}.
+ * 
+ * @param <E>
+ */
+public abstract class AbstractFlowQueue<E> extends AbstractLimitedFlowSource<E> implements FlowControllable<E>, IFlowQueue<E>, Dispatchable {
+
+    protected IDispatcher dispatcher;
+    protected DispatchContext dispatchContext;
+    protected final Collection<IPollableFlowSource.FlowReadyListener<E>> readyListeners = new ArrayList<IPollableFlowSource.FlowReadyListener<E>>();
+    private boolean notifyReady = false;
+    protected boolean dispatching = false;
+    protected int dispatchPriority = 0;
+
+    AbstractFlowQueue() {
+        super();
+    }
+
+    protected AbstractFlowQueue(String name) {
+        super(name);
+    }
+
+    public final boolean dispatch() {
+
+        while (pollingDispatch())
+            ;
+
+        return true;
+
+        // return !pollingDispatch();
+    }
+
+    public final IFlowSink<E> getFlowSink() {
+        // TODO Auto-generated method stub
+        return this;
+    }
+
+    public final IFlowSource<E> getFlowSource() {
+        // TODO Auto-generated method stub
+        return this;
+    }
+
+    protected final FlowControllable<E> getFlowControllableHook() {
+        return this;
+    }
+
+    /**
+     * Sets an asynchronous dispatcher for this source. As elements become
+     * available they will be dispatched to the worker pool.
+     * 
+     * @param workers
+     *            The executor thread pool.
+     * @param dispatcher
+     *            The dispatcher to handle messages.
+     */
+    public synchronized void setDispatcher(IDispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+        dispatchContext = dispatcher.register(this, getResourceName());
+        dispatchContext.updatePriority(dispatchPriority);
+    }
+
+    public synchronized final void setDispatchPriority(int priority) {
+        dispatchPriority = priority;
+        if (dispatchContext != null) {
+            dispatchContext.updatePriority(priority);
+        }
+    }
+
+    public synchronized void addFlowReadyListener(IPollableFlowSource.FlowReadyListener<E> watcher) {
+
+        readyListeners.add(watcher);
+        if (isDispatchReady()) {
+            notifyReady();
+        }
+    }
+
+    /**
+     * Dispatches an element potentialy blocking until an element is available
+     * for dispatch.
+     */
+    public final void blockingDispatch() throws InterruptedException {
+
+        while (!pollingDispatch()) {
+            waitForDispatchReady();
+        }
+    }
+
+    /**
+     * Indicates that there are elements ready for dispatch.
+     */
+    protected void notifyReady() {
+        if (dispatchContext != null) {
+            dispatchContext.requestDispatch();
+            return;
+        }
+
+        synchronized (this) {
+            if (dispatchContext != null) {
+                if (!dispatching) {
+                    dispatching = true;
+                    dispatchContext.requestDispatch();
+                }
+                return;
+            }
+
+            if (notifyReady) {
+                notify();
+            }
+
+            if (!readyListeners.isEmpty()) {
+                for (FlowReadyListener<E> listener : readyListeners) {
+                    listener.onFlowReady(this);
+                }
+            }
+
+            readyListeners.clear();
+        }
+    }
+
+    protected synchronized void waitForDispatchReady() throws InterruptedException {
+        while (!isDispatchReady()) {
+            notifyReady = true;
+            wait();
+        }
+        notifyReady = false;
+    }
+
+    public String toString() {
+        return getResourceName();
+    }
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import org.apache.activemq.dispatch.PriorityLinkedList;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.FlowController;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.PriorityFlowController;
+import org.apache.kahadb.util.LinkedNode;
+
+/**
+ */
+public class ExclusivePriorityQueue<E> extends AbstractFlowQueue<E> implements IFlowQueue<E> {
+
+    private final PriorityLinkedList<PriorityNode> queue;
+    private Mapper<Integer, E> priorityMapper;
+
+    private class PriorityNode extends LinkedNode<PriorityNode> {
+        E elem;
+        int prio;
+    }
+
+    private final PriorityFlowController<E> controller;
+
+    /**
+     * Creates a flow queue that can handle multiple flows.
+     * 
+     * @param priority
+     * @param flow
+     *            The {@link Flow}
+     * @param capacity
+     * @param resume
+     * @param controller
+     *            The FlowController if this queue is flow controlled:
+     */
+    public ExclusivePriorityQueue(int priority, Flow flow, String name, int capacity, int resume) {
+        super(name);
+        this.queue = new PriorityLinkedList<PriorityNode>(10);
+        this.controller = new PriorityFlowController<E>(priority, getFlowControllableHook(), flow, this, capacity, resume);
+
+    }
+
+    public boolean offer(E elem, ISourceController<E> source) {
+        return controller.offer(elem, source);
+    }
+
+    /**
+     * Performs a limited add to the queue.
+     */
+    public final void add(E elem, ISourceController<E> source) {
+        controller.add(elem, source);
+    }
+
+    /**
+     * Called when the controller accepts a message for this queue.
+     */
+    public synchronized void flowElemAccepted(ISourceController<E> controller, E elem) {
+        PriorityNode node = new PriorityNode();
+        node.elem = elem;
+        node.prio = priorityMapper.map(elem);
+
+        queue.add(node, node.prio);
+        notifyReady();
+    }
+
+    public FlowController<E> getFlowController(Flow flow) {
+        // TODO:
+        return null;
+    }
+
+    public boolean isDispatchReady() {
+        return !queue.isEmpty();
+    }
+
+    public boolean pollingDispatch() {
+        PriorityNode node = null;
+        synchronized (this) {
+            node = queue.poll();
+            // FIXME the release should really be done after dispatch.
+            // doing it here saves us from having to resynchronize
+            // after dispatch, but release limiter space too soon.
+            if (autoRelease && node != null) {
+                controller.elementDispatched(node.elem);
+            }
+        }
+
+        if (node != null) {
+            drain.drain(node.elem, controller);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    public Mapper<Integer, E> getPriorityMapper() {
+        return priorityMapper;
+    }
+
+    public void setPriorityMapper(Mapper<Integer, E> priorityMapper) {
+        this.priorityMapper = priorityMapper;
+        controller.setPriorityMapper(priorityMapper);
+    }
+
+    @Override
+    public String toString() {
+        return getResourceName();
+    }
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import java.util.LinkedList;
+
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.FlowController;
+import org.apache.activemq.flow.IFlowLimiter;
+import org.apache.activemq.flow.ISourceController;
+
+public class ExclusiveQueue<E> extends AbstractFlowQueue<E> {
+    private final LinkedList<E> queue = new LinkedList<E>();
+    private final FlowController<E> controller;
+
+    /**
+     * Creates a flow queue that can handle multiple flows.
+     * 
+     * @param flow
+     *            The {@link Flow}
+     * @param controller
+     *            The FlowController if this queue is flow controlled:
+     */
+    public ExclusiveQueue(Flow flow, String name, IFlowLimiter<E> limiter) {
+        super(name);
+        this.controller = new FlowController<E>(getFlowControllableHook(), flow, limiter, this);
+        super.onFlowOpened(controller);
+    }
+
+    public boolean offer(E elem, ISourceController<E> source) {
+        return controller.offer(elem, source);
+    }
+
+    /**
+     * Performs a limited add to the queue.
+     */
+    public final void add(E elem, ISourceController<E> source) {
+        controller.add(elem, source);
+    }
+
+    /**
+     * Called when the controller accepts a message for this queue.
+     */
+    public synchronized void flowElemAccepted(ISourceController<E> controller, E elem) {
+        queue.add(elem);
+        notifyReady();
+    }
+
+    public FlowController<E> getFlowController(Flow flow) {
+        return controller;
+    }
+
+    public final boolean isDispatchReady() {
+        return !queue.isEmpty();
+    }
+
+    public final boolean pollingDispatch() {
+        E elem = null;
+        synchronized (this) {
+            elem = queue.poll();
+            // FIXME the release should really be done after dispatch.
+            // doing it here saves us from having to resynchronize
+            // after dispatch, but release limiter space too soon.
+            if (autoRelease && elem != null) {
+                controller.elementDispatched(elem);
+            }
+        }
+
+        if (elem != null) {
+            drain.drain(elem, controller);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "SingleFlowQueue:" + getResourceName();
+    }
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IAsynchronousFlowSource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IAsynchronousFlowSource.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IAsynchronousFlowSource.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IAsynchronousFlowSource.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.flow.IFlowSource;
+
+public interface IAsynchronousFlowSource<E> extends IFlowSource<E> {
+
+    /**
+     * Sets an asynchronous dispatcher for this source. As elements become
+     * available they will be dispatched to the worker pool.
+     * 
+     * @param workers
+     *            The executor thread pool.
+     * @param dispatcher
+     *            The dispatcher to handle messages.
+     * @param controller
+     *            The controller for the flow to process in the worker pool.
+     */
+    public void setDispatcher(IDispatcher workers);
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IBlockingFlowSource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IBlockingFlowSource.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IBlockingFlowSource.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IBlockingFlowSource.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import org.apache.activemq.flow.IFlowSource;
+
+public interface IBlockingFlowSource<E> extends IFlowSource<E> {
+
+    /**
+     * Dispatches the next available element to the source's dispatcher,
+     * blocking until an element is available.
+     * 
+     * @throws InterruptedException
+     */
+    public void blockingDispatch() throws InterruptedException;
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import org.apache.activemq.flow.IFlowSink;
+
+public interface IFlowQueue<E> extends IBlockingFlowSource<E>, IPollableFlowSource<E>, IAsynchronousFlowSource<E>, IFlowSink<E> {
+
+    public void setDispatchPriority(int priority);
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPollableFlowSource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPollableFlowSource.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPollableFlowSource.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPollableFlowSource.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import org.apache.activemq.flow.IFlowSource;
+
+public interface IPollableFlowSource<E> extends IFlowSource<E> {
+
+    /**
+     * Callback used to indicate that a PollableFlowSource is ready for
+     * dispatch.
+     * 
+     * @param <E>
+     */
+    public interface FlowReadyListener<E> {
+        public void onFlowReady(IPollableFlowSource<E> source);
+    }
+
+    /**
+     * Sets a listener to indicate when there are elements available for
+     * dispatch from this source.
+     * 
+     * @param listener
+     *            The listener.
+     */
+    public void addFlowReadyListener(FlowReadyListener<E> listener);
+
+    /**
+     * Dispatches the next available element returning false if there were no
+     * elements available for dispatch.
+     * 
+     * @return False if there were no elements to dispatch.
+     */
+    public boolean pollingDispatch();
+
+    /**
+     * @return True if there are elements ready to dispatch.
+     */
+    public boolean isDispatchReady();
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import org.apache.activemq.flow.IFlowSink;
+
+public interface IQueue<K, V> extends IFlowSink<V> {
+
+    public void addSubscription(Subscription<V> sub);
+
+    public boolean removeSubscription(Subscription<V> sub);
+
+    public boolean removeByValue(V value);
+
+    public boolean removeByKey(K key);
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.FlowController;
+import org.apache.activemq.flow.IFlowLimiter;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.IFlowSource;
+import org.apache.activemq.flow.ISinkController;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.kahadb.util.LinkedNode;
+import org.apache.kahadb.util.LinkedNodeList;
+
+/**
+ */
+public class LoadBalancedFlowQueue<E> extends AbstractFlowQueue<E> {
+    private final LinkedList<E> queue = new LinkedList<E>();
+    private final LinkedNodeList<SinkNode> readyConsumers = new LinkedNodeList<SinkNode>();
+    private final HashMap<IFlowSink<E>, SinkNode> consumers = new HashMap<IFlowSink<E>, SinkNode>();
+
+    private boolean strcitDispatch = true;
+
+    private final FlowController<E> sinkController;
+
+    private final ISourceController<E> sourceControler = new ISourceController<E>() {
+
+        public Flow getFlow() {
+            return sinkController.getFlow();
+        }
+
+        public IFlowSource<E> getFlowSource() {
+            return LoadBalancedFlowQueue.this;
+        }
+
+        public void onFlowBlock(ISinkController<E> sink) {
+            synchronized (LoadBalancedFlowQueue.this) {
+                SinkNode node = consumers.get(sink);
+                if (node != null) {
+                    node.unlink();
+                }
+                // controller.onFlowBlock(sink);
+            }
+
+        }
+
+        public void onFlowResume(ISinkController<E> sink) {
+            synchronized (LoadBalancedFlowQueue.this) {
+                SinkNode node = consumers.get(sink);
+                if (node != null) {
+                    // controller.onFlowResume(sink);
+                    // Add to ready list if not there:
+                    if (!node.isLinked()) {
+                        boolean notify = false;
+                        if (readyConsumers.isEmpty()) {
+                            notify = true;
+                        }
+
+                        readyConsumers.addLast(node);
+                        if (notify && !queue.isEmpty()) {
+                            notifyReady();
+                        }
+                    }
+                }
+            }
+        }
+
+        public void elementDispatched(E elem) {
+            // TODO Auto-generated method stub
+
+        }
+
+        public boolean isSourceBlocked() {
+            // TODO Auto-generated method stub
+            return false;
+        }
+
+    };
+
+    /**
+     * Creates a flow queue that can handle multiple flows.
+     * 
+     * @param flow
+     *            The {@link Flow}
+     * @param controller
+     *            The FlowController if this queue is flow controlled:
+     */
+    public LoadBalancedFlowQueue(Flow flow, String name, long resourceId, IFlowLimiter<E> limiter) {
+        super(name);
+        this.sinkController = new FlowController<E>(getFlowControllableHook(), flow, limiter, this);
+        super.onFlowOpened(sinkController);
+    }
+
+    public boolean offer(E elem, ISourceController<E> source) {
+        return sinkController.offer(elem, source);
+    }
+
+    /**
+     * Performs a limited add to the queue.
+     */
+    public final void add(E elem, ISourceController<E> source) {
+        sinkController.add(elem, source);
+    }
+
+    /**
+     * Called when the controller accepts a message for this queue.
+     */
+    public synchronized void flowElemAccepted(ISourceController<E> controller, E elem) {
+        queue.add(elem);
+        if (!readyConsumers.isEmpty()) {
+            notifyReady();
+        }
+    }
+
+    public FlowController<E> getFlowController(Flow flow) {
+        return sinkController;
+    }
+
+    public boolean isDispatchReady() {
+        return !queue.isEmpty() && !readyConsumers.isEmpty();
+    }
+
+    public boolean pollingDispatch() {
+        if (strcitDispatch) {
+            return strictPollingDispatch();
+        } else {
+            return loosePollingDispatch();
+        }
+    }
+
+    private boolean strictPollingDispatch() {
+
+        SinkNode node = null;
+        E elem = null;
+        synchronized (this) {
+            if (readyConsumers.isEmpty()) {
+                return false;
+            }
+            // Get the next elem:
+            elem = queue.peek();
+            if (elem == null) {
+                return false;
+            }
+
+            node = readyConsumers.getHead();
+        }
+
+        while (true) {
+
+            boolean accepted = node.sink.offer(elem, sourceControler);
+
+            synchronized (this) {
+                if (accepted) {
+                    queue.remove();
+                    if (autoRelease) {
+                        sinkController.elementDispatched(elem);
+                    }
+                    if (!readyConsumers.isEmpty()) {
+                        readyConsumers.rotate();
+                    }
+                    return true;
+                } else {
+                    if (readyConsumers.isEmpty()) {
+                        return false;
+                    }
+                    node = readyConsumers.getHead();
+                }
+            }
+        }
+    }
+
+    private boolean loosePollingDispatch() {
+        E elem = null;
+        IFlowSink<E> sink = null;
+        synchronized (this) {
+            if (readyConsumers.isEmpty()) {
+                return false;
+            }
+
+            // Get the next sink:
+            sink = readyConsumers.getHead().sink;
+
+            // Get the next elem:
+            elem = queue.poll();
+            if (elem == null) {
+                return false;
+            }
+
+            readyConsumers.rotate();
+
+            // FIXME the release should really be done after dispatch.
+            // doing it here saves us from having to resynchronize
+            // after dispatch, but releases limiter space too soon.
+            if (autoRelease) {
+                sinkController.elementDispatched(elem);
+            }
+
+        }
+
+        sink.add(elem, sourceControler);
+        return true;
+    }
+
+    public final void addSink(IFlowSink<E> sink) {
+        synchronized (this) {
+            SinkNode node = consumers.get(sink);
+            if (node == null) {
+                node = new SinkNode(sink);
+                consumers.put(sink, node);
+                readyConsumers.addLast(node);
+                if (!queue.isEmpty()) {
+                    notifyReady();
+                }
+            }
+        }
+    }
+
+    private class SinkNode extends LinkedNode<SinkNode> {
+        public final IFlowSink<E> sink;
+
+        public SinkNode(IFlowSink<E> sink) {
+            this.sink = sink;
+        }
+
+        @Override
+        public String toString() {
+            return sink.toString();
+        }
+    }
+
+    public boolean isStrcitDispatch() {
+        return strcitDispatch;
+    }
+
+    public void setStrcitDispatch(boolean strcitDispatch) {
+        this.strcitDispatch = strcitDispatch;
+    }
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Mapper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Mapper.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Mapper.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Mapper.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+public interface Mapper<K, V> {
+    K map(V element);
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MemoryStore.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MemoryStore.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MemoryStore.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.kahadb.util.LinkedNode;
+import org.apache.kahadb.util.LinkedNodeList;
+
+public class MemoryStore<K, V> implements Store<K, V> {
+
+    AtomicLong counter = new AtomicLong();
+
+    class MemoryStoreNode extends LinkedNode<MemoryStoreNode> implements StoreNode<K, V> {
+        private Subscription<V> owner;
+        private final K key;
+        private final V value;
+        private long id = counter.getAndIncrement();
+
+        public MemoryStoreNode(K key, V value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        public boolean acquire(Subscription<V> owner) {
+            if (this.owner == null) {
+                this.owner = owner;
+            }
+            return true;
+        }
+
+        public K getKey() {
+            return key;
+        }
+
+        public V getValue() {
+            return value;
+        }
+
+        @Override
+        public String toString() {
+            return "node:" + id;
+        }
+
+        public void unacquire() {
+            this.owner = null;
+        }
+
+    }
+
+    class MemoryStoreCursor implements StoreCursor<K, V> {
+        private MemoryStoreNode last;
+        private MemoryStoreNode next;
+
+        public MemoryStoreCursor() {
+        }
+
+        public MemoryStoreCursor(MemoryStoreNode next) {
+            this.next = next;
+        }
+
+        public void setNext(StoreNode<K, V> next) {
+            this.next = (MemoryStoreNode) next;
+        }
+
+        public boolean hasNext() {
+            if (next != null)
+                return true;
+
+            if (last == null || last.getNextCircular() == last) {
+                next = (MemoryStoreNode) elements.getHead();
+                return next != null;
+            }
+
+            while (true) {
+                MemoryStoreNode t = last.getNextCircular();
+                if (t.id > last.id) {
+                    next = t;
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+        }
+
+        public StoreNode<K, V> peekNext() {
+            hasNext();
+            return next;
+        }
+
+        public StoreNode<K, V> next() {
+            try {
+                hasNext();
+                return next;
+            } finally {
+                last = next;
+                next = null;
+            }
+        }
+
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+    }
+
+    protected HashMap<K, MemoryStoreNode> map = new HashMap<K, MemoryStoreNode>();
+    protected LinkedNodeList<MemoryStoreNode> elements = new LinkedNodeList<MemoryStoreNode>();
+
+    public StoreNode<K, V> add(K key, V value) {
+        MemoryStoreNode rc = new MemoryStoreNode(key, value);
+        map.put(key, rc);
+        elements.addLast(rc);
+        return rc;
+    }
+
+    public StoreNode<K, V> remove(K key) {
+        MemoryStoreNode node = (MemoryStoreNode) map.remove(key);
+        if (node != null) {
+            node.unlink();
+        }
+        return node;
+    }
+
+    public boolean isEmpty() {
+        return elements.isEmpty();
+    }
+
+    public org.apache.activemq.queue.Store.StoreCursor<K, V> openCursor() {
+        MemoryStoreCursor cursor = new MemoryStoreCursor();
+        return cursor;
+    }
+
+    public org.apache.activemq.queue.Store.StoreCursor<K, V> openCursorAt(org.apache.activemq.queue.Store.StoreNode<K, V> next) {
+        MemoryStoreCursor cursor = new MemoryStoreCursor((MemoryStoreNode) next);
+        return cursor;
+    }
+
+    public int size() {
+        return map.size();
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.FlowController;
+import org.apache.activemq.flow.IFlowLimiter;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.IFlowSource;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.SizeLimiter;
+import org.apache.kahadb.util.LinkedNode;
+import org.apache.kahadb.util.LinkedNodeList;
+
+public class MultiFlowQueue<E> extends AbstractFlowQueue<E> {
+    private final HashMap<Flow, SingleFlowQueue> flowQueues = new HashMap<Flow, SingleFlowQueue>();
+    private final LinkedNodeList<SingleFlowQueue> readyQueues = new LinkedNodeList<SingleFlowQueue>();
+
+    private final int perFlowWindow;
+    private final int resumeThreshold;
+
+    public MultiFlowQueue(String name, int perFlowWindow, int resumeThreshold) {
+        super(name);
+        this.perFlowWindow = perFlowWindow;
+        this.resumeThreshold = resumeThreshold;
+    }
+
+    public final void flowElemAccepted(ISourceController<E> controller, E elem) {
+        // We don't currently create a flow controller for this,
+        // so this shouldn't be called.
+        throw new UnsupportedOperationException();
+    }
+
+    public boolean offer(E elem, ISourceController<E> source) {
+        throw new UnsupportedOperationException("Not yet implemented");
+    }
+
+    public synchronized void add(E elem, ISourceController<E> source) {
+        SingleFlowQueue queue = flowQueues.get(source.getFlow());
+        if (queue == null) {
+            queue = new SingleFlowQueue(source.getFlow(), new SizeLimiter<E>(perFlowWindow, resumeThreshold));
+            flowQueues.put(source.getFlow(), queue);
+            super.onFlowOpened(queue.controller);
+        }
+        queue.enqueue(elem, source);
+    }
+
+    public boolean pollingDispatch() {
+        SingleFlowQueue queue = null;
+        E elem = null;
+        synchronized (this) {
+            queue = peekReadyQueue();
+            if (queue == null) {
+                return false;
+            }
+
+            elem = queue.poll();
+            if (elem == null) {
+
+                unreadyQueue(queue);
+                return false;
+            }
+
+            // rotate to have fair dispatch.
+            queue.getList().rotate();
+        }
+
+        drain.drain(elem, queue.controller);
+        return true;
+    }
+
+    public final boolean isDispatchReady() {
+        return !readyQueues.isEmpty();
+    }
+
+    private SingleFlowQueue peekReadyQueue() {
+        if (readyQueues.isEmpty()) {
+            return null;
+        }
+        return readyQueues.getHead();
+    }
+
+    private void unreadyQueue(SingleFlowQueue node) {
+        node.unlink();
+    }
+
+    private void addReadyQueue(SingleFlowQueue node) {
+        readyQueues.addLast(node);
+    }
+
+    /**
+     * Limits a flow that has potentially multiple sources.
+     */
+    private class SingleFlowQueue extends LinkedNode<SingleFlowQueue> implements FlowController.FlowControllable<E> {
+        private final LinkedList<E> queue = new LinkedList<E>();
+        final FlowController<E> controller;
+        private boolean ready = false;
+
+        SingleFlowQueue(Flow flow, IFlowLimiter<E> limiter) {
+            this.controller = new FlowController<E>(this, flow, limiter, MultiFlowQueue.this);
+        }
+
+        final void enqueue(E elem, ISourceController<E> source) {
+            controller.add(elem, source);
+        }
+
+        public IFlowSource<E> getFlowSource() {
+            return MultiFlowQueue.this;
+        }
+
+        public IFlowSink<E> getFlowSink() {
+            return MultiFlowQueue.this;
+        }
+
+        public void flowElemAccepted(ISourceController<E> controller, E elem) {
+
+            synchronized (MultiFlowQueue.this) {
+                queue.add(elem);
+                if (!ready) {
+                    addReadyQueue(this);
+                    ready = true;
+                }
+                // Always request on new elements:
+                notifyReady();
+            }
+        }
+
+        private E poll() {
+            E e = queue.poll();
+            if (e == null) {
+                ready = false;
+            } else if (autoRelease) {
+                controller.elementDispatched(e);
+            }
+            return e;
+        }
+    }
+
+}