You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/02/11 21:12:30 UTC
svn commit: r743476 [2/4] - in /activemq/sandbox/activemq-flow: ./ src/
src/main/ src/main/java/ src/main/java/com/ src/main/java/com/progress/
src/main/java/org/ src/main/java/org/apache/
src/main/java/org/apache/activemq/ src/main/java/org/apache/act...
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowLimiter.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowLimiter.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowLimiter.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+public interface IFlowLimiter<E> {
+ public interface UnThrottleListener {
+ // Called when a throttled limiter falls below the throttle
+ // threshold.
+ public void onUnthrottled();
+ }
+
+ public void reserve(E elem);
+
+ public void releaseReserved();
+
+ public boolean canAdd(E elem);
+
+ /**
+ * Adds an element to the limiter, returning true if this results in the
+ * limiter being throttled.
+ *
+ * @param elem
+ * The element to add.
+ * @return True if this triggered throttling.
+ */
+ public boolean add(E elem);
+
+ public void remove(E elem);
+
+ public boolean getThrottled();
+
+ public void addUnThrottleListener(UnThrottleListener listener);
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowResource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowResource.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowResource.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowResource.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public interface IFlowResource {
+
+ /**
+ * A listener for when new flows are opened and closed for this resource.
+ */
+ public interface FlowLifeCycleListener {
+ public void onFlowOpened(IFlowResource source, Flow flow);
+
+ public void onFlowClosed(IFlowResource source, Flow flow);
+ }
+
+ /**
+ * Adds a {@link FlowLifeCycleListener} for flows opened by the source.
+ *
+ * @param listener
+ * The listener.
+ */
+ public void addFlowLifeCycleListener(FlowLifeCycleListener listener);
+
+ /**
+ * Called to remove a previously added {@link FlowLifeCycleListener}.
+ *
+ * @param listener
+ * The listener.
+ */
+ public void removeFlowLifeCycleListener(FlowLifeCycleListener listener);
+
+ static final public AtomicLong RESOURCE_COUNTER = new AtomicLong();
+
+ /**
+ * Gets the unique resource id associated with this resource
+ *
+ * @retrun the unique resource id.
+ */
+ public long getResourceId();
+
+ public String getResourceName();
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+public interface IFlowSink<E> extends IFlowResource {
+ /**
+ * Adds an element to the sink. If limiter space in the sink is overflowed
+ * by the element then it will block the source controller.
+ *
+ * @param elem
+ * The element to add to the sink.
+ * @param source
+ * The source's flow controller.
+ */
+ public void add(E elem, ISourceController<E> source);
+
+ /**
+ * Offers an element to the sink. If there is no room available the source's
+ * controller will be blocked and the element will not be added.
+ *
+ * @param elem
+ * The element to offer
+ * @param source
+ * The source's controller.
+ * @return false if the element wasn't accepted.
+ */
+ public boolean offer(E elem, ISourceController<E> source);
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSource.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSource.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSource.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+/**
+ *
+ */
+public interface IFlowSource<E> extends IFlowResource {
+
+ /**
+ * Gets the {@link ISourceController} for the specified flow.
+ *
+ * @param flow
+ * The flow.
+ * @return The flow controller for the specified flow.
+ */
+ public FlowController<E> getFlowController(Flow flow);
+
+ /**
+ * If set to true the source will automatically release limiter space
+ * associated with {@link IFlowElem}s as they are dispacthed. If set to
+ * false then the {@link IFlowDrain} must release space via a call to
+ * {@link ISourceController#elementDispatched(IFlowElem)}.
+ *
+ * @param autoRelease
+ * If the source should release limiter space for elements.
+ */
+ public void setAutoRelease(boolean autoRelease);
+
+/**
+ * Returns whether or not this {@link IFlowSource} is set to automatically
+ * release elements via {@link FlowController#elementDispatched(Object) during
+ * dispatch. When auto release is set the caller <i>must</i> not call
+ * {@link FlowController#elementDispatched(Object).
+ *
+ * @return true if auto release is set, false otherwise.
+ */
+ public boolean getAutoRelease();
+
+ /**
+ * Sets the default drain for elements from this flow source. It will be
+ * invoked to dispatch elements from the source.
+ *
+ * @param drain
+ * The drain.
+ */
+ public void setDrain(IFlowDrain<E> drain);
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+public interface ISinkController<E> {
+ /**
+ * Defines required attributes for an entity that can be flow controlled.
+ *
+ * @param <E>
+ */
+ public interface FlowControllable<E> {
+ public void flowElemAccepted(ISourceController<E> controller, E elem);
+
+ public IFlowSink<E> getFlowSink();
+
+ public IFlowSource<E> getFlowSource();
+ }
+
+ /**
+ * Used to get a notification when a blocked controller becomes unblocked
+ *
+ * @param <E>
+ */
+ public interface FlowUnblockListener<E> {
+ public void onFlowUnblocked(ISinkController<E> controller);
+ }
+
+ /**
+ * Offers an element to the sink associated with this resource if space is
+ * available. If no space is available false is returned. The element does
+ * not get added to the overflow list.
+ *
+ * @param elem
+ * The element to add.
+ * @param controller
+ * the source flow controller.
+ */
+ public boolean offer(E elem, ISourceController<E> sourceController);
+
+ /**
+ * Adds an element to the sink associated with this resource if space is
+ * available. If no space is available the source controller will be
+ * blocked, and the source is responsible for tracking the space until this
+ * controller resumes.
+ *
+ * @param elem
+ * The element to add.
+ * @param controller
+ * the source flow controller.
+ */
+ public void add(E elem, ISourceController<E> controller);
+
+ /**
+ * Called to check if this FlowController is currently being blocked
+ *
+ * @return True if the flow is blocked.
+ */
+ public boolean isSinkBlocked();
+
+ /**
+ * Waits for a flow to become unblocked.
+ *
+ * @param flow
+ * The flow.
+ * @throws InterruptedException
+ * If interrupted while waiting.
+ */
+ public void waitForFlowUnblock() throws InterruptedException;
+
+ /**
+ * Sets a callback for the listener if this controller is currently blocked.
+ *
+ * @param listener
+ * The listener.
+ * @return True if a listener was registered false otherwise.
+ */
+ public boolean addUnblockListener(FlowUnblockListener<E> listener);
+
+ public IFlowSink<E> getFlowSink();
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+/**
+ * The control interface to a source. Sinks typically call back to a
+ * ISourceController to suspend or resumed the dispatching of messages.
+ *
+ * @param <E>
+ */
+public interface ISourceController<E> {
+
+ /**
+ * Returns the source that this FlowController is controlling.
+ *
+ * @return The source that the flow controller is controlling.
+ */
+ public IFlowSource<E> getFlowSource();
+
+ /**
+ * Gets the flow that this controller is controlling.
+ *
+ * @return
+ */
+ public Flow getFlow();
+
+ /**
+ * This is called when a particular flow is blocked for a resource
+ *
+ * @param sink
+ * The sink blocking this source
+ */
+ public void onFlowBlock(ISinkController<E> sink);
+
+ /**
+ * Callback used with FlowControllers to get a notification that an
+ * IFlowController has been resumed.
+ *
+ * @param controller
+ * The IFlowController that was unblocked.
+ */
+ public void onFlowResume(ISinkController<E> sink);
+
+ public boolean isSourceBlocked();
+
+ /**
+ * Must be called once the elements have been sent to downstream sinks.
+ *
+ * @param elem
+ * The dispatched element.
+ * @return
+ */
+ public void elementDispatched(E elem);
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+public class NoOpFlowController<E> implements ISinkController<E> {
+ private final IFlowSource<E> source;
+ private final Flow flow;
+
+ public NoOpFlowController(IFlowSource<E> source, Flow flow) {
+ this.source = source;
+ this.flow = flow;
+ }
+
+ public IFlowSource<E> getFlowSource() {
+ return source;
+ }
+
+ public Flow getFlow() {
+ // TODO Auto-generated method stub
+ return flow;
+ }
+
+ public boolean isSinkBlocked() {
+ return false;
+ }
+
+ public void onFlowBlock(IFlowSink<E> sink) {
+ // Noop
+ }
+
+ public void onFlowResume(IFlowSink<E> sink) {
+ // Noop
+ }
+
+ /**
+ * Must be called once the elements have been sent to downstream sinks.
+ *
+ * @param elem
+ * The dispatched element.
+ * @return
+ */
+ public void elementDispatched(E elem) {
+ // No op for basic flow controller
+ }
+
+ public String toString() {
+ return "DISABLED Flow Controller for: " + source;
+ }
+
+ public boolean offer(E elem, ISourceController<E> sourceController) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void add(E elem, ISourceController<E> controller) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void waitForFlowUnblock() throws InterruptedException {
+ // TODO Auto-generated method stub
+
+ }
+
+ /**
+ * Sets a callback for the listener if this controller is currently blocked.
+ *
+ * @param listener
+ * The listener.
+ * @return True if a listener was registered false otherwise.
+ */
+ public boolean addUnblockListener(FlowUnblockListener<E> listener) {
+ return false;
+ }
+
+ public IFlowSink<E> getFlowSink() {
+ return null;
+ }
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+import org.apache.activemq.queue.Mapper;
+
+public class PriorityFlowController<E> implements ISourceController<E>, ISinkController<E> {
+
+ private final Object mutex;
+ private final FlowController<E> controllers[];
+ private final PrioritySizeLimiter<E> limiter;
+
+ private Mapper<Integer, E> priorityMapper;
+
+ private final Flow flow;
+ private final FlowControllable<E> controllable;
+
+ public PriorityFlowController(int priorities, FlowControllable<E> controllable, Flow flow, Object mutex, int capacity, int resume) {
+ this.controllable = controllable;
+ this.flow = flow;
+ this.mutex = mutex;
+ this.limiter = new PrioritySizeLimiter<E>(capacity, resume, priorities);
+ this.limiter.setPriorityMapper(priorityMapper);
+ this.controllers = createControlerArray(priorities);
+ for (int i = 0; i < priorities; i++) {
+ this.controllers[i] = new FlowController<E>(controllable, flow, limiter.getPriorityLimter(i), mutex);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private FlowController<E>[] createControlerArray(int priorities) {
+ return new FlowController[priorities];
+ }
+
+ // /////////////////////////////////////////////////////////////////
+ // ISinkController interface impl.
+ // /////////////////////////////////////////////////////////////////
+
+ public boolean offer(E elem, ISourceController<E> controller) {
+ int prio = priorityMapper.map(elem);
+ return controllers[prio].offer(elem, controller);
+ }
+
+ public void add(E elem, ISourceController<E> controller) {
+ int prio = priorityMapper.map(elem);
+ controllers[prio].add(elem, controller);
+ }
+
+ public boolean isSinkBlocked() {
+ synchronized (mutex) {
+ return limiter.getThrottled();
+ }
+ }
+
+ public boolean addUnblockListener(org.apache.activemq.flow.ISinkController.FlowUnblockListener<E> listener) {
+ boolean rc = false;
+ for (int i = 0; i < controllers.length; i++) {
+ rc |= this.controllers[i].addUnblockListener(listener);
+ }
+ return rc;
+ }
+
+ public void waitForFlowUnblock() throws InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ // /////////////////////////////////////////////////////////////////
+ // ISourceController interface impl.
+ // /////////////////////////////////////////////////////////////////
+
+ public void elementDispatched(E elem) {
+ FlowController<E> controler = controllers[priorityMapper.map(elem)];
+ controler.elementDispatched(elem);
+ }
+
+ public Flow getFlow() {
+ return flow;
+ }
+
+ public IFlowSource<E> getFlowSource() {
+ return controllable.getFlowSource();
+ }
+
+ public void onFlowBlock(ISinkController<E> sink) {
+ for (int i = 0; i < controllers.length; i++) {
+ controllers[i].onFlowBlock(sink);
+ }
+ }
+
+ public void onFlowResume(ISinkController<E> sink) {
+ for (int i = 0; i < controllers.length; i++) {
+ controllers[i].onFlowBlock(sink);
+ }
+ }
+
+ public boolean isSourceBlocked() {
+ return false;
+ }
+
+ // /////////////////////////////////////////////////////////////////
+ // Getters and Setters
+ // /////////////////////////////////////////////////////////////////
+
+ public Mapper<Integer, E> getPriorityMapper() {
+ return priorityMapper;
+ }
+
+ public void setPriorityMapper(Mapper<Integer, E> priorityMapper) {
+ this.priorityMapper = priorityMapper;
+ limiter.setPriorityMapper(priorityMapper);
+ }
+
+ public IFlowSink<E> getFlowSink() {
+ return controllable.getFlowSink();
+ }
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+import java.util.ArrayList;
+
+import org.apache.activemq.queue.Mapper;
+
+public class PrioritySizeLimiter<E> {
+
+ final private ArrayList<Priority> priorities = new ArrayList<Priority>();
+ final protected int capacity;
+ final protected int resumeThreshold;
+
+ private int totalSize;
+ private int throttledCount = 0;
+ private int highestPriority;
+
+ private Mapper<Integer, E> sizeMapper = new Mapper<Integer, E>() {
+ public Integer map(E element) {
+ return 1;
+ }
+ };
+
+ private Mapper<Integer, E> priorityMapper = sizeMapper;
+
+ private class Priority extends AbstractLimiter<E> {
+ final int priority;
+ int size;
+ int reserved;
+ private boolean throttled;
+
+ public Priority(int priority) {
+ this.priority = priority;
+ }
+
+ public boolean add(E elem) {
+ int elementSize = sizeMapper.map(elem);
+ totalSize += elementSize;
+ size += elementSize;
+ if (totalSize >= capacity) {
+ if (!throttled) {
+ throttled = true;
+ throttledCount++;
+ }
+ }
+ if (priority >= highestPriority) {
+ highestPriority = priority;
+ }
+ return throttled;
+ }
+
+ public boolean canAdd(E elem) {
+ if (throttled)
+ return false;
+
+ int prio = priorityMapper.map(elem);
+ if (prio < highestPriority) {
+ if (!throttled) {
+ throttled = true;
+ throttledCount++;
+ }
+ return false;
+ }
+
+ return true;
+ }
+
+ public boolean getThrottled() {
+ return throttled;
+ }
+
+ public void releaseReserved() {
+ if (reserved > 0) {
+ int res = reserved;
+ reserved = 0;
+ remove(res);
+ }
+ }
+
+ public void remove(E elem) {
+ int size = sizeMapper.map(elem);
+ remove(size);
+ }
+
+ protected void remove(int s) {
+ size -= s;
+ totalSize -= s;
+
+ assert size >= 0 : "Negative limiter size: " + size;
+ assert totalSize >= 0 : "Negative limiter total size:" + totalSize;
+
+ if (totalSize <= resumeThreshold) {
+ priorities.get(highestPriority).unThrottle();
+ }
+ }
+
+ public void unThrottle() {
+ if (throttled) {
+ throttled = false;
+ throttledCount--;
+ notifyUnThrottleListeners();
+
+ // Has the highest priority level emptied out?
+ if (size == 0 && priority == highestPriority) {
+ // Set highestPriority to the new highest priority level
+ highestPriority = 0;
+ for (int i = priority - 1; i >= 0; i--) {
+ Priority p = priorities.get(i);
+ if (p.size > 0 || p.throttled) {
+ highestPriority = i;
+ if (totalSize <= resumeThreshold) {
+ p.unThrottle();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public void reserve(E elem) {
+ reserved += sizeMapper.map(elem);
+ }
+ }
+
+ public PrioritySizeLimiter(int capacity, int resumeThreshold, int priorities) {
+ this.capacity = capacity;
+ this.resumeThreshold = resumeThreshold;
+ for (int i = 0; i < priorities; i++) {
+ this.priorities.add(new Priority(i));
+ }
+ }
+
+ public IFlowLimiter<E> getPriorityLimter(int priority) {
+ return priorities.get(priority);
+ }
+
+ public Mapper<Integer, E> getSizeMapper() {
+ return sizeMapper;
+ }
+
+ public void setSizeMapper(Mapper<Integer, E> sizeMapper) {
+ this.sizeMapper = sizeMapper;
+ }
+
+ public Mapper<Integer, E> getPriorityMapper() {
+ return priorityMapper;
+ }
+
+ public void setPriorityMapper(Mapper<Integer, E> priorityMapper) {
+ this.priorityMapper = priorityMapper;
+ }
+
+ public boolean getThrottled() {
+ return throttledCount > 0;
+ }
+
+ public int getPriorities() {
+ return priorities.size();
+ }
+
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+public class SizeLimiter<E> extends AbstractLimiter<E> {
+
+ protected int capacity;
+ protected int resumeThreshold;
+
+ private int size;
+ private boolean throttled;
+ private int reserved;
+
+ public SizeLimiter(int capacity, int resumeThreshold) {
+ this.capacity = capacity;
+ throttled = false;
+ this.resumeThreshold = resumeThreshold;
+ }
+
+ public final boolean add(E elem) {
+ this.size += getElementSize(elem);
+
+ if (this.size >= capacity) {
+ throttled = true;
+ }
+ return throttled;
+ }
+
+ public final void remove(E elem) {
+ remove(getElementSize(elem));
+ }
+
+ public void reserve(E elem) {
+ reserved += getElementSize(elem);
+ }
+
+ public void releaseReserved() {
+ if (reserved > 0) {
+ int res = reserved;
+ reserved = 0;
+ remove(res);
+ }
+ }
+
+ protected void remove(int s) {
+ this.size -= s;
+ if (size < 0) {
+ Exception ie = new IllegalStateException("Size Negative!" + size);
+ ie.printStackTrace();
+ }
+
+ if (throttled && this.size <= resumeThreshold) {
+ throttled = false;
+ notifyUnThrottleListeners();
+ }
+ }
+
+ public final void reset() {
+ size = 0;
+ notifyUnThrottleListeners();
+ }
+
+ /**
+ * Subclasses should override to return the size of an element
+ *
+ * @param elem
+ */
+ public int getElementSize(E elem) {
+ return 1;
+ }
+
+ public final boolean getThrottled() {
+ return throttled;
+ }
+
+ public boolean canAdd(E elem) {
+ return !throttled;
+ }
+
+ public int getCapacity() {
+ return capacity;
+ }
+
+ public int getResumeThreshold() {
+ return resumeThreshold;
+ }
+
+ public int getSize() {
+ return size;
+ }
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Metric.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Metric.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Metric.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Metric.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.metric;
+
+abstract public class Metric {
+
+ private String name;
+ private String unit = "items";
+
+ public Metric name(String name) {
+ this.name = name;
+ return this;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public abstract long counter();
+
+ public Metric unit(String unit) {
+ this.unit = unit;
+ return this;
+ }
+
+ public String getUnit() {
+ return unit;
+ }
+
+ public void setUnit(String unit) {
+ this.unit = unit;
+ }
+
+ public String getRateSummary(Period period) {
+ return String.format("%s: %(,.2f %s/s", name, period.rate(counter()), unit);
+ }
+
+ abstract public void reset();
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricAggregator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricAggregator.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricAggregator.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricAggregator.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.metric;
+
+import java.util.ArrayList;
+
+public class MetricAggregator extends Metric {
+
+ ArrayList<Metric> metrics = new ArrayList<Metric>();
+
+ public MetricAggregator name(String name) {
+ return (MetricAggregator) super.name(name);
+ }
+
+ public MetricAggregator unit(String unit) {
+ return (MetricAggregator) super.unit(unit);
+ }
+
+ public void add(Metric metric) {
+ metrics.add(metric);
+ if (getUnit() != null) {
+ metric.setUnit(getUnit());
+ }
+ }
+
+ public boolean remove(Metric metric) {
+ return metrics.remove(metric);
+ }
+
+ public Float average() {
+ if (metrics.isEmpty()) {
+ return null;
+ }
+ long rc = 0;
+ int count = 0;
+ for (Metric metric : metrics) {
+ rc += metric.counter();
+ count++;
+ }
+ return rc * 1.0f / count;
+ }
+
+ public long total() {
+ long rc = 0;
+ for (Metric metric : metrics) {
+ rc += metric.counter();
+ }
+ return rc;
+ }
+
+ public Long min() {
+ if (metrics.isEmpty()) {
+ return null;
+ }
+ long rc = Long.MAX_VALUE;
+ for (Metric metric : metrics) {
+ long t = metric.counter();
+ if (t < rc) {
+ rc = t;
+ }
+ }
+ return rc;
+ }
+
+ public Long max() {
+ if (metrics.isEmpty()) {
+ return null;
+ }
+ long rc = Long.MIN_VALUE;
+ for (Metric metric : metrics) {
+ long t = metric.counter();
+ if (t > rc) {
+ rc = t;
+ }
+ }
+ return rc;
+ }
+
+ @Override
+ public long counter() {
+ return total();
+ }
+
+ public String getRateSummary(Period period) {
+ return String
+ .format("%s: total=%(,.2f, avg=%(,.2f, min=%(,.2f, max=%(,.2f in %s/s", getName(), period.rate(total()), period.rate(average()), period.rate(min()), period.rate(max()), getUnit());
+ }
+
+ public String getChildRateSummary(Period period) {
+ StringBuilder rc = new StringBuilder();
+ rc.append("{\n");
+ for (Metric metric : metrics) {
+ rc.append(" ");
+ rc.append(metric.getRateSummary(period));
+ rc.append("\n");
+ }
+ rc.append("}");
+ return rc.toString();
+ }
+
+ @Override
+ public void reset() {
+ for (Metric metric : metrics) {
+ metric.reset();
+ }
+ }
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricCounter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricCounter.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricCounter.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricCounter.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.metric;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class MetricCounter extends Metric {
+
+ AtomicLong counter = new AtomicLong();
+
+ public MetricCounter name(String name) {
+ return (MetricCounter) super.name(name);
+ }
+
+ public final long increment(long delta) {
+ return counter.addAndGet(delta);
+ }
+
+ public final long increment() {
+ return counter.incrementAndGet();
+ }
+
+ @Override
+ public final long counter() {
+ return counter.get();
+ }
+
+ @Override
+ public void reset() {
+ counter.set(0);
+ }
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Period.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Period.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Period.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Period.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.metric;
+
+public class Period {
+
+ long start = System.currentTimeMillis();
+ long end;
+
+ public long getStart() {
+ return start;
+ }
+
+ public void setStart(long start) {
+ this.start = start;
+ }
+
+ public long getEnd() {
+ if (end == 0) {
+ end = System.currentTimeMillis();
+ }
+ return end;
+ }
+
+ public void setEnd(long end) {
+ this.end = end;
+ }
+
+ public void reset() {
+ start = System.currentTimeMillis();
+ end = 0;
+ }
+
+ public long duration() {
+ return getEnd() - getStart();
+ }
+
+ public Float rate(Long counter) {
+ if (counter == null) {
+ return null;
+ }
+ return ((counter * 1000f) / duration());
+ }
+
+ public Float rate(Float counter) {
+ if (counter == null) {
+ return null;
+ }
+ return ((counter * 1000f) / duration());
+ }
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
+import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
+import org.apache.activemq.flow.AbstractLimitedFlowSource;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.IFlowSource;
+import org.apache.activemq.flow.ISinkController.FlowControllable;
+
+/**
+ * Base class for a {@link Dispatchable} {@link FlowControllable}
+ * {@link IFlowQueue}.
+ *
+ * @param <E>
+ */
+public abstract class AbstractFlowQueue<E> extends AbstractLimitedFlowSource<E> implements FlowControllable<E>, IFlowQueue<E>, Dispatchable {
+
+ protected IDispatcher dispatcher;
+ protected DispatchContext dispatchContext;
+ protected final Collection<IPollableFlowSource.FlowReadyListener<E>> readyListeners = new ArrayList<IPollableFlowSource.FlowReadyListener<E>>();
+ private boolean notifyReady = false;
+ protected boolean dispatching = false;
+ protected int dispatchPriority = 0;
+
+ AbstractFlowQueue() {
+ super();
+ }
+
+ protected AbstractFlowQueue(String name) {
+ super(name);
+ }
+
+ public final boolean dispatch() {
+
+ while (pollingDispatch())
+ ;
+
+ return true;
+
+ // return !pollingDispatch();
+ }
+
+ public final IFlowSink<E> getFlowSink() {
+ // TODO Auto-generated method stub
+ return this;
+ }
+
+ public final IFlowSource<E> getFlowSource() {
+ // TODO Auto-generated method stub
+ return this;
+ }
+
+ protected final FlowControllable<E> getFlowControllableHook() {
+ return this;
+ }
+
+ /**
+ * Sets an asynchronous dispatcher for this source. As elements become
+ * available they will be dispatched to the worker pool.
+ *
+ * @param workers
+ * The executor thread pool.
+ * @param dispatcher
+ * The dispatcher to handle messages.
+ */
+ public synchronized void setDispatcher(IDispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ dispatchContext = dispatcher.register(this, getResourceName());
+ dispatchContext.updatePriority(dispatchPriority);
+ }
+
+ public synchronized final void setDispatchPriority(int priority) {
+ dispatchPriority = priority;
+ if (dispatchContext != null) {
+ dispatchContext.updatePriority(priority);
+ }
+ }
+
+ public synchronized void addFlowReadyListener(IPollableFlowSource.FlowReadyListener<E> watcher) {
+
+ readyListeners.add(watcher);
+ if (isDispatchReady()) {
+ notifyReady();
+ }
+ }
+
+ /**
+ * Dispatches an element potentialy blocking until an element is available
+ * for dispatch.
+ */
+ public final void blockingDispatch() throws InterruptedException {
+
+ while (!pollingDispatch()) {
+ waitForDispatchReady();
+ }
+ }
+
+ /**
+ * Indicates that there are elements ready for dispatch.
+ */
+ protected void notifyReady() {
+ if (dispatchContext != null) {
+ dispatchContext.requestDispatch();
+ return;
+ }
+
+ synchronized (this) {
+ if (dispatchContext != null) {
+ if (!dispatching) {
+ dispatching = true;
+ dispatchContext.requestDispatch();
+ }
+ return;
+ }
+
+ if (notifyReady) {
+ notify();
+ }
+
+ if (!readyListeners.isEmpty()) {
+ for (FlowReadyListener<E> listener : readyListeners) {
+ listener.onFlowReady(this);
+ }
+ }
+
+ readyListeners.clear();
+ }
+ }
+
+ protected synchronized void waitForDispatchReady() throws InterruptedException {
+ while (!isDispatchReady()) {
+ notifyReady = true;
+ wait();
+ }
+ notifyReady = false;
+ }
+
+ public String toString() {
+ return getResourceName();
+ }
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import org.apache.activemq.dispatch.PriorityLinkedList;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.FlowController;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.PriorityFlowController;
+import org.apache.kahadb.util.LinkedNode;
+
+/**
+ */
+public class ExclusivePriorityQueue<E> extends AbstractFlowQueue<E> implements IFlowQueue<E> {
+
+ private final PriorityLinkedList<PriorityNode> queue;
+ private Mapper<Integer, E> priorityMapper;
+
+ private class PriorityNode extends LinkedNode<PriorityNode> {
+ E elem;
+ int prio;
+ }
+
+ private final PriorityFlowController<E> controller;
+
+ /**
+ * Creates a flow queue that can handle multiple flows.
+ *
+ * @param priority
+ * @param flow
+ * The {@link Flow}
+ * @param capacity
+ * @param resume
+ * @param controller
+ * The FlowController if this queue is flow controlled:
+ */
+ public ExclusivePriorityQueue(int priority, Flow flow, String name, int capacity, int resume) {
+ super(name);
+ this.queue = new PriorityLinkedList<PriorityNode>(10);
+ this.controller = new PriorityFlowController<E>(priority, getFlowControllableHook(), flow, this, capacity, resume);
+
+ }
+
+ public boolean offer(E elem, ISourceController<E> source) {
+ return controller.offer(elem, source);
+ }
+
+ /**
+ * Performs a limited add to the queue.
+ */
+ public final void add(E elem, ISourceController<E> source) {
+ controller.add(elem, source);
+ }
+
+ /**
+ * Called when the controller accepts a message for this queue.
+ */
+ public synchronized void flowElemAccepted(ISourceController<E> controller, E elem) {
+ PriorityNode node = new PriorityNode();
+ node.elem = elem;
+ node.prio = priorityMapper.map(elem);
+
+ queue.add(node, node.prio);
+ notifyReady();
+ }
+
+ public FlowController<E> getFlowController(Flow flow) {
+ // TODO:
+ return null;
+ }
+
+ public boolean isDispatchReady() {
+ return !queue.isEmpty();
+ }
+
+ public boolean pollingDispatch() {
+ PriorityNode node = null;
+ synchronized (this) {
+ node = queue.poll();
+ // FIXME the release should really be done after dispatch.
+ // doing it here saves us from having to resynchronize
+ // after dispatch, but release limiter space too soon.
+ if (autoRelease && node != null) {
+ controller.elementDispatched(node.elem);
+ }
+ }
+
+ if (node != null) {
+ drain.drain(node.elem, controller);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public Mapper<Integer, E> getPriorityMapper() {
+ return priorityMapper;
+ }
+
+ public void setPriorityMapper(Mapper<Integer, E> priorityMapper) {
+ this.priorityMapper = priorityMapper;
+ controller.setPriorityMapper(priorityMapper);
+ }
+
+ @Override
+ public String toString() {
+ return getResourceName();
+ }
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import java.util.LinkedList;
+
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.FlowController;
+import org.apache.activemq.flow.IFlowLimiter;
+import org.apache.activemq.flow.ISourceController;
+
+public class ExclusiveQueue<E> extends AbstractFlowQueue<E> {
+ private final LinkedList<E> queue = new LinkedList<E>();
+ private final FlowController<E> controller;
+
+ /**
+ * Creates a flow queue that can handle multiple flows.
+ *
+ * @param flow
+ * The {@link Flow}
+ * @param controller
+ * The FlowController if this queue is flow controlled:
+ */
+ public ExclusiveQueue(Flow flow, String name, IFlowLimiter<E> limiter) {
+ super(name);
+ this.controller = new FlowController<E>(getFlowControllableHook(), flow, limiter, this);
+ super.onFlowOpened(controller);
+ }
+
+ public boolean offer(E elem, ISourceController<E> source) {
+ return controller.offer(elem, source);
+ }
+
+ /**
+ * Performs a limited add to the queue.
+ */
+ public final void add(E elem, ISourceController<E> source) {
+ controller.add(elem, source);
+ }
+
+ /**
+ * Called when the controller accepts a message for this queue.
+ */
+ public synchronized void flowElemAccepted(ISourceController<E> controller, E elem) {
+ queue.add(elem);
+ notifyReady();
+ }
+
+ public FlowController<E> getFlowController(Flow flow) {
+ return controller;
+ }
+
+ public final boolean isDispatchReady() {
+ return !queue.isEmpty();
+ }
+
+ public final boolean pollingDispatch() {
+ E elem = null;
+ synchronized (this) {
+ elem = queue.poll();
+ // FIXME the release should really be done after dispatch.
+ // doing it here saves us from having to resynchronize
+ // after dispatch, but release limiter space too soon.
+ if (autoRelease && elem != null) {
+ controller.elementDispatched(elem);
+ }
+ }
+
+ if (elem != null) {
+ drain.drain(elem, controller);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "SingleFlowQueue:" + getResourceName();
+ }
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IAsynchronousFlowSource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IAsynchronousFlowSource.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IAsynchronousFlowSource.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IAsynchronousFlowSource.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.flow.IFlowSource;
+
+public interface IAsynchronousFlowSource<E> extends IFlowSource<E> {
+
+ /**
+ * Sets an asynchronous dispatcher for this source. As elements become
+ * available they will be dispatched to the worker pool.
+ *
+ * @param workers
+ * The executor thread pool.
+ * @param dispatcher
+ * The dispatcher to handle messages.
+ * @param controller
+ * The controller for the flow to process in the worker pool.
+ */
+ public void setDispatcher(IDispatcher workers);
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IBlockingFlowSource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IBlockingFlowSource.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IBlockingFlowSource.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IBlockingFlowSource.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import org.apache.activemq.flow.IFlowSource;
+
+public interface IBlockingFlowSource<E> extends IFlowSource<E> {
+
+ /**
+ * Dispatches the next available element to the source's dispatcher,
+ * blocking until an element is available.
+ *
+ * @throws InterruptedException
+ */
+ public void blockingDispatch() throws InterruptedException;
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import org.apache.activemq.flow.IFlowSink;
+
+public interface IFlowQueue<E> extends IBlockingFlowSource<E>, IPollableFlowSource<E>, IAsynchronousFlowSource<E>, IFlowSink<E> {
+
+ public void setDispatchPriority(int priority);
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPollableFlowSource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPollableFlowSource.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPollableFlowSource.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPollableFlowSource.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import org.apache.activemq.flow.IFlowSource;
+
+public interface IPollableFlowSource<E> extends IFlowSource<E> {
+
+ /**
+ * Callback used to indicate that a PollableFlowSource is ready for
+ * dispatch.
+ *
+ * @param <E>
+ */
+ public interface FlowReadyListener<E> {
+ public void onFlowReady(IPollableFlowSource<E> source);
+ }
+
+ /**
+ * Sets a listener to indicate when there are elements available for
+ * dispatch from this source.
+ *
+ * @param listener
+ * The listener.
+ */
+ public void addFlowReadyListener(FlowReadyListener<E> listener);
+
+ /**
+ * Dispatches the next available element returning false if there were no
+ * elements available for dispatch.
+ *
+ * @return False if there were no elements to dispatch.
+ */
+ public boolean pollingDispatch();
+
+ /**
+ * @return True if there are elements ready to dispatch.
+ */
+ public boolean isDispatchReady();
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import org.apache.activemq.flow.IFlowSink;
+
+public interface IQueue<K, V> extends IFlowSink<V> {
+
+ public void addSubscription(Subscription<V> sub);
+
+ public boolean removeSubscription(Subscription<V> sub);
+
+ public boolean removeByValue(V value);
+
+ public boolean removeByKey(K key);
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.FlowController;
+import org.apache.activemq.flow.IFlowLimiter;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.IFlowSource;
+import org.apache.activemq.flow.ISinkController;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.kahadb.util.LinkedNode;
+import org.apache.kahadb.util.LinkedNodeList;
+
+/**
+ */
+public class LoadBalancedFlowQueue<E> extends AbstractFlowQueue<E> {
+ private final LinkedList<E> queue = new LinkedList<E>();
+ private final LinkedNodeList<SinkNode> readyConsumers = new LinkedNodeList<SinkNode>();
+ private final HashMap<IFlowSink<E>, SinkNode> consumers = new HashMap<IFlowSink<E>, SinkNode>();
+
+ private boolean strcitDispatch = true;
+
+ private final FlowController<E> sinkController;
+
+ private final ISourceController<E> sourceControler = new ISourceController<E>() {
+
+ public Flow getFlow() {
+ return sinkController.getFlow();
+ }
+
+ public IFlowSource<E> getFlowSource() {
+ return LoadBalancedFlowQueue.this;
+ }
+
+ public void onFlowBlock(ISinkController<E> sink) {
+ synchronized (LoadBalancedFlowQueue.this) {
+ SinkNode node = consumers.get(sink);
+ if (node != null) {
+ node.unlink();
+ }
+ // controller.onFlowBlock(sink);
+ }
+
+ }
+
+ public void onFlowResume(ISinkController<E> sink) {
+ synchronized (LoadBalancedFlowQueue.this) {
+ SinkNode node = consumers.get(sink);
+ if (node != null) {
+ // controller.onFlowResume(sink);
+ // Add to ready list if not there:
+ if (!node.isLinked()) {
+ boolean notify = false;
+ if (readyConsumers.isEmpty()) {
+ notify = true;
+ }
+
+ readyConsumers.addLast(node);
+ if (notify && !queue.isEmpty()) {
+ notifyReady();
+ }
+ }
+ }
+ }
+ }
+
+ public void elementDispatched(E elem) {
+ // TODO Auto-generated method stub
+
+ }
+
+ public boolean isSourceBlocked() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ };
+
+ /**
+ * Creates a flow queue that can handle multiple flows.
+ *
+ * @param flow
+ * The {@link Flow}
+ * @param controller
+ * The FlowController if this queue is flow controlled:
+ */
+ public LoadBalancedFlowQueue(Flow flow, String name, long resourceId, IFlowLimiter<E> limiter) {
+ super(name);
+ this.sinkController = new FlowController<E>(getFlowControllableHook(), flow, limiter, this);
+ super.onFlowOpened(sinkController);
+ }
+
+ public boolean offer(E elem, ISourceController<E> source) {
+ return sinkController.offer(elem, source);
+ }
+
+ /**
+ * Performs a limited add to the queue.
+ */
+ public final void add(E elem, ISourceController<E> source) {
+ sinkController.add(elem, source);
+ }
+
+ /**
+ * Called when the controller accepts a message for this queue.
+ */
+ public synchronized void flowElemAccepted(ISourceController<E> controller, E elem) {
+ queue.add(elem);
+ if (!readyConsumers.isEmpty()) {
+ notifyReady();
+ }
+ }
+
+ public FlowController<E> getFlowController(Flow flow) {
+ return sinkController;
+ }
+
+ public boolean isDispatchReady() {
+ return !queue.isEmpty() && !readyConsumers.isEmpty();
+ }
+
+ public boolean pollingDispatch() {
+ if (strcitDispatch) {
+ return strictPollingDispatch();
+ } else {
+ return loosePollingDispatch();
+ }
+ }
+
+ private boolean strictPollingDispatch() {
+
+ SinkNode node = null;
+ E elem = null;
+ synchronized (this) {
+ if (readyConsumers.isEmpty()) {
+ return false;
+ }
+ // Get the next elem:
+ elem = queue.peek();
+ if (elem == null) {
+ return false;
+ }
+
+ node = readyConsumers.getHead();
+ }
+
+ while (true) {
+
+ boolean accepted = node.sink.offer(elem, sourceControler);
+
+ synchronized (this) {
+ if (accepted) {
+ queue.remove();
+ if (autoRelease) {
+ sinkController.elementDispatched(elem);
+ }
+ if (!readyConsumers.isEmpty()) {
+ readyConsumers.rotate();
+ }
+ return true;
+ } else {
+ if (readyConsumers.isEmpty()) {
+ return false;
+ }
+ node = readyConsumers.getHead();
+ }
+ }
+ }
+ }
+
+ private boolean loosePollingDispatch() {
+ E elem = null;
+ IFlowSink<E> sink = null;
+ synchronized (this) {
+ if (readyConsumers.isEmpty()) {
+ return false;
+ }
+
+ // Get the next sink:
+ sink = readyConsumers.getHead().sink;
+
+ // Get the next elem:
+ elem = queue.poll();
+ if (elem == null) {
+ return false;
+ }
+
+ readyConsumers.rotate();
+
+ // FIXME the release should really be done after dispatch.
+ // doing it here saves us from having to resynchronize
+ // after dispatch, but releases limiter space too soon.
+ if (autoRelease) {
+ sinkController.elementDispatched(elem);
+ }
+
+ }
+
+ sink.add(elem, sourceControler);
+ return true;
+ }
+
+ public final void addSink(IFlowSink<E> sink) {
+ synchronized (this) {
+ SinkNode node = consumers.get(sink);
+ if (node == null) {
+ node = new SinkNode(sink);
+ consumers.put(sink, node);
+ readyConsumers.addLast(node);
+ if (!queue.isEmpty()) {
+ notifyReady();
+ }
+ }
+ }
+ }
+
+ private class SinkNode extends LinkedNode<SinkNode> {
+ public final IFlowSink<E> sink;
+
+ public SinkNode(IFlowSink<E> sink) {
+ this.sink = sink;
+ }
+
+ @Override
+ public String toString() {
+ return sink.toString();
+ }
+ }
+
+ public boolean isStrcitDispatch() {
+ return strcitDispatch;
+ }
+
+ public void setStrcitDispatch(boolean strcitDispatch) {
+ this.strcitDispatch = strcitDispatch;
+ }
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Mapper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Mapper.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Mapper.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Mapper.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+public interface Mapper<K, V> {
+ K map(V element);
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MemoryStore.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MemoryStore.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MemoryStore.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.kahadb.util.LinkedNode;
+import org.apache.kahadb.util.LinkedNodeList;
+
+public class MemoryStore<K, V> implements Store<K, V> {
+
+ AtomicLong counter = new AtomicLong();
+
+ class MemoryStoreNode extends LinkedNode<MemoryStoreNode> implements StoreNode<K, V> {
+ private Subscription<V> owner;
+ private final K key;
+ private final V value;
+ private long id = counter.getAndIncrement();
+
+ public MemoryStoreNode(K key, V value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public boolean acquire(Subscription<V> owner) {
+ if (this.owner == null) {
+ this.owner = owner;
+ }
+ return true;
+ }
+
+ public K getKey() {
+ return key;
+ }
+
+ public V getValue() {
+ return value;
+ }
+
+ @Override
+ public String toString() {
+ return "node:" + id;
+ }
+
+ public void unacquire() {
+ this.owner = null;
+ }
+
+ }
+
+ class MemoryStoreCursor implements StoreCursor<K, V> {
+ private MemoryStoreNode last;
+ private MemoryStoreNode next;
+
+ public MemoryStoreCursor() {
+ }
+
+ public MemoryStoreCursor(MemoryStoreNode next) {
+ this.next = next;
+ }
+
+ public void setNext(StoreNode<K, V> next) {
+ this.next = (MemoryStoreNode) next;
+ }
+
+ public boolean hasNext() {
+ if (next != null)
+ return true;
+
+ if (last == null || last.getNextCircular() == last) {
+ next = (MemoryStoreNode) elements.getHead();
+ return next != null;
+ }
+
+ while (true) {
+ MemoryStoreNode t = last.getNextCircular();
+ if (t.id > last.id) {
+ next = t;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ public StoreNode<K, V> peekNext() {
+ hasNext();
+ return next;
+ }
+
+ public StoreNode<K, V> next() {
+ try {
+ hasNext();
+ return next;
+ } finally {
+ last = next;
+ next = null;
+ }
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+ protected HashMap<K, MemoryStoreNode> map = new HashMap<K, MemoryStoreNode>();
+ protected LinkedNodeList<MemoryStoreNode> elements = new LinkedNodeList<MemoryStoreNode>();
+
+ public StoreNode<K, V> add(K key, V value) {
+ MemoryStoreNode rc = new MemoryStoreNode(key, value);
+ map.put(key, rc);
+ elements.addLast(rc);
+ return rc;
+ }
+
+ public StoreNode<K, V> remove(K key) {
+ MemoryStoreNode node = (MemoryStoreNode) map.remove(key);
+ if (node != null) {
+ node.unlink();
+ }
+ return node;
+ }
+
+ public boolean isEmpty() {
+ return elements.isEmpty();
+ }
+
+ public org.apache.activemq.queue.Store.StoreCursor<K, V> openCursor() {
+ MemoryStoreCursor cursor = new MemoryStoreCursor();
+ return cursor;
+ }
+
+ public org.apache.activemq.queue.Store.StoreCursor<K, V> openCursorAt(org.apache.activemq.queue.Store.StoreNode<K, V> next) {
+ MemoryStoreCursor cursor = new MemoryStoreCursor((MemoryStoreNode) next);
+ return cursor;
+ }
+
+ public int size() {
+ return map.size();
+ }
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.FlowController;
+import org.apache.activemq.flow.IFlowLimiter;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.IFlowSource;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.SizeLimiter;
+import org.apache.kahadb.util.LinkedNode;
+import org.apache.kahadb.util.LinkedNodeList;
+
+public class MultiFlowQueue<E> extends AbstractFlowQueue<E> {
+ private final HashMap<Flow, SingleFlowQueue> flowQueues = new HashMap<Flow, SingleFlowQueue>();
+ private final LinkedNodeList<SingleFlowQueue> readyQueues = new LinkedNodeList<SingleFlowQueue>();
+
+ private final int perFlowWindow;
+ private final int resumeThreshold;
+
+ public MultiFlowQueue(String name, int perFlowWindow, int resumeThreshold) {
+ super(name);
+ this.perFlowWindow = perFlowWindow;
+ this.resumeThreshold = resumeThreshold;
+ }
+
+ public final void flowElemAccepted(ISourceController<E> controller, E elem) {
+ // We don't currently create a flow controller for this,
+ // so this shouldn't be called.
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean offer(E elem, ISourceController<E> source) {
+ throw new UnsupportedOperationException("Not yet implemented");
+ }
+
+ public synchronized void add(E elem, ISourceController<E> source) {
+ SingleFlowQueue queue = flowQueues.get(source.getFlow());
+ if (queue == null) {
+ queue = new SingleFlowQueue(source.getFlow(), new SizeLimiter<E>(perFlowWindow, resumeThreshold));
+ flowQueues.put(source.getFlow(), queue);
+ super.onFlowOpened(queue.controller);
+ }
+ queue.enqueue(elem, source);
+ }
+
+ public boolean pollingDispatch() {
+ SingleFlowQueue queue = null;
+ E elem = null;
+ synchronized (this) {
+ queue = peekReadyQueue();
+ if (queue == null) {
+ return false;
+ }
+
+ elem = queue.poll();
+ if (elem == null) {
+
+ unreadyQueue(queue);
+ return false;
+ }
+
+ // rotate to have fair dispatch.
+ queue.getList().rotate();
+ }
+
+ drain.drain(elem, queue.controller);
+ return true;
+ }
+
+ public final boolean isDispatchReady() {
+ return !readyQueues.isEmpty();
+ }
+
+ private SingleFlowQueue peekReadyQueue() {
+ if (readyQueues.isEmpty()) {
+ return null;
+ }
+ return readyQueues.getHead();
+ }
+
+ private void unreadyQueue(SingleFlowQueue node) {
+ node.unlink();
+ }
+
+ private void addReadyQueue(SingleFlowQueue node) {
+ readyQueues.addLast(node);
+ }
+
+ /**
+ * Limits a flow that has potentially multiple sources.
+ */
+ private class SingleFlowQueue extends LinkedNode<SingleFlowQueue> implements FlowController.FlowControllable<E> {
+ private final LinkedList<E> queue = new LinkedList<E>();
+ final FlowController<E> controller;
+ private boolean ready = false;
+
+ SingleFlowQueue(Flow flow, IFlowLimiter<E> limiter) {
+ this.controller = new FlowController<E>(this, flow, limiter, MultiFlowQueue.this);
+ }
+
+ final void enqueue(E elem, ISourceController<E> source) {
+ controller.add(elem, source);
+ }
+
+ public IFlowSource<E> getFlowSource() {
+ return MultiFlowQueue.this;
+ }
+
+ public IFlowSink<E> getFlowSink() {
+ return MultiFlowQueue.this;
+ }
+
+ public void flowElemAccepted(ISourceController<E> controller, E elem) {
+
+ synchronized (MultiFlowQueue.this) {
+ queue.add(elem);
+ if (!ready) {
+ addReadyQueue(this);
+ ready = true;
+ }
+ // Always request on new elements:
+ notifyReady();
+ }
+ }
+
+ private E poll() {
+ E e = queue.poll();
+ if (e == null) {
+ ready = false;
+ } else if (autoRelease) {
+ controller.elementDispatched(e);
+ }
+ return e;
+ }
+ }
+
+}