You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/04/27 21:35:21 UTC

svn commit: r769122 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/flow/ main/java/org/apache/activemq/queue/ main/java/org/apache/activemq/util/ test/java/org/apache/activemq/broker/

Author: chirino
Date: Mon Apr 27 19:35:21 2009
New Revision: 769122

URL: http://svn.apache.org/viewvc?rev=769122&view=rev
Log:
Adding files that I missed adding as part of that last patch.


Added:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSizeLimiter.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPartitionedQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStore.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/Mapper.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/PriorityLinkedList.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/PriorityMap.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/SortedLinkedList.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/SortedLinkedListNode.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java?rev=769122&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java Mon Apr 27 19:35:21 2009
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.apache.activemq.broker.store.BrokerDatabase;
+import org.apache.activemq.broker.store.Store.QueueQueryResult;
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.PrioritySizeLimiter;
+import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.queue.IPartitionedQueue;
+import org.apache.activemq.queue.IQueue;
+import org.apache.activemq.queue.PartitionedQueue;
+import org.apache.activemq.queue.QueueStore;
+import org.apache.activemq.queue.SharedPriorityQueue;
+import org.apache.activemq.queue.SharedQueue;
+import org.apache.activemq.queue.SharedQueueOld;
+import org.apache.activemq.util.Mapper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class BrokerQueueStore implements QueueStore<Long, MessageDelivery> {
+
+    private static final Log LOG = LogFactory.getLog(BrokerQueueStore.class);
+    private static final boolean USE_OLD_QUEUE = false;
+    private static final boolean USE_PRIORITY_QUEUES = true;
+
+    private BrokerDatabase database;
+    private IDispatcher dispatcher;
+
+    private final short PARTITION_TYPE = 0;
+    private final short SHARED_QUEUE_TYPE = 1;
+    //private final short SUBSCRIBER_QUEUE_TYPE = 2;
+
+    private final HashMap<String, IQueue<Long, MessageDelivery>> sharedQueues = new HashMap<String, IQueue<Long, MessageDelivery>>();
+    //private final HashMap<String, IFlowQueue<MessageDelivery>> subscriberQueues = new HashMap<String, IFlowQueue<MessageDelivery>>();
+
+    private Mapper<Integer, MessageDelivery> partitionMapper;
+
+    public static final Mapper<Integer, MessageDelivery> PRIORITY_MAPPER = new Mapper<Integer, MessageDelivery>() {
+        public Integer map(MessageDelivery element) {
+            return element.getPriority();
+        }
+    };
+
+    static public final Mapper<Long, MessageDelivery> KEY_MAPPER = new Mapper<Long, MessageDelivery>() {
+        public Long map(MessageDelivery element) {
+            return element.getStoreTracking();
+        }
+    };
+
+    static public final Mapper<Integer, MessageDelivery> PARTITION_MAPPER = new Mapper<Integer, MessageDelivery>() {
+        public Integer map(MessageDelivery element) {
+            // we modulo 10 to have at most 10 partitions which the producers
+            // gets split across.
+            return (int) (element.getProducerId().hashCode() % 10);
+        }
+    };
+
+    public void setDatabase(BrokerDatabase database) {
+        this.database = database;
+    }
+
+    public void setDispatcher(IDispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+    public void loadQueues() throws Exception {
+
+        // Load shared queues
+        Iterator<QueueQueryResult> results = database.listQueues(SHARED_QUEUE_TYPE);
+        while (results.hasNext()) {
+            QueueQueryResult loaded = results.next();
+            IQueue<Long, MessageDelivery> queue = createRestoredQueue(null, loaded);
+            sharedQueues.put(queue.getDescriptor().getQueueName().toString(), queue);
+            LOG.info("Loaded Queue " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize());
+        }
+    }
+
+    private IQueue<Long, MessageDelivery> createRestoredQueue(IPartitionedQueue<Long, MessageDelivery> parent, QueueQueryResult loaded) throws IOException {
+
+        IQueue<Long, MessageDelivery> queue;
+        if (parent != null) {
+            queue = parent.createPartition(loaded.getDescriptor().getPartitionKey());
+        } else {
+            queue = createSharedQueueInternal(loaded.getDescriptor().getQueueName().toString(), loaded.getDescriptor().getQueueType());
+        }
+
+        queue.initialize(loaded.getFirstSequence(), loaded.getLastSequence(), loaded.getCount(), loaded.getSize());
+
+        // Creat the child queues
+        Collection<QueueQueryResult> children = loaded.getPartitions();
+        if (children != null) {
+            try {
+                IPartitionedQueue<Long, MessageDelivery> pQueue = (IPartitionedQueue<Long, MessageDelivery>) queue;
+                for (QueueQueryResult child : children) {
+                    createRestoredQueue(pQueue, child);
+                }
+            } catch (ClassCastException cce) {
+                LOG.error("Loaded partition for unpartitionable queue: " + queue.getResourceName());
+                throw cce;
+            }
+        }
+
+        return queue;
+
+    }
+
+    public Collection<IQueue<Long, MessageDelivery>> getSharedQueues() {
+        Collection<IQueue<Long, MessageDelivery>> c = sharedQueues.values();
+        ArrayList<IQueue<Long, MessageDelivery>> ret = new ArrayList<IQueue<Long, MessageDelivery>>(c.size());
+        ret.addAll(c);
+        return ret;
+    }
+
+    public IQueue<Long, MessageDelivery> createSharedQueue(String name) {
+
+        IQueue<Long, MessageDelivery> queue = null;
+        synchronized (this) {
+            queue = sharedQueues.get(name);
+            if (queue == null) {
+                queue = createSharedQueueInternal(name, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
+                queue.getDescriptor().setApplicationType(SHARED_QUEUE_TYPE);
+                queue.initialize(0, 0, 0, 0);
+                sharedQueues.put(name, queue);
+                addQueue(queue.getDescriptor());
+            }
+        }
+
+        return queue;
+    }
+
+    private IQueue<Long, MessageDelivery> createSharedQueueInternal(final String name, short type) {
+
+        IQueue<Long, MessageDelivery> ret;
+
+        switch (type) {
+        case QueueDescriptor.PARTITIONED: {
+            PartitionedQueue<Long, MessageDelivery> queue = new PartitionedQueue<Long, MessageDelivery>(name) {
+                @Override
+                public IQueue<Long, MessageDelivery> createPartition(int partitionKey) {
+                    IQueue<Long, MessageDelivery> queue = createSharedQueueInternal(name + "$" + partitionKey, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
+                    queue.getDescriptor().setPartitionId(partitionKey);
+                    queue.getDescriptor().setParent(this.getDescriptor().getQueueName());
+                    return queue;
+                }
+
+            };
+            queue.setPartitionMapper(partitionMapper);
+
+            ret = queue;
+            break;
+        }
+        case QueueDescriptor.SHARED_PRIORITY: {
+            PrioritySizeLimiter<MessageDelivery> limiter = new PrioritySizeLimiter<MessageDelivery>(100, 1, MessageBroker.MAX_PRIORITY);
+            limiter.setPriorityMapper(PRIORITY_MAPPER);
+            SharedPriorityQueue<Long, MessageDelivery> queue = new SharedPriorityQueue<Long, MessageDelivery>(name, limiter);
+            ret = queue;
+            queue.setKeyMapper(KEY_MAPPER);
+            queue.setAutoRelease(true);
+            break;
+        }
+        case QueueDescriptor.SHARED: {
+            SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(100, 1);
+            if (!USE_OLD_QUEUE) {
+                SharedQueue<Long, MessageDelivery> sQueue = new SharedQueue<Long, MessageDelivery>(name, limiter);
+                sQueue.setKeyMapper(KEY_MAPPER);
+                sQueue.setAutoRelease(true);
+                ret = sQueue;
+            } else {
+                SharedQueueOld<Long, MessageDelivery> sQueue = new SharedQueueOld<Long, MessageDelivery>(name, limiter);
+                sQueue.setKeyMapper(KEY_MAPPER);
+                sQueue.setAutoRelease(true);
+                ret = sQueue;
+            }
+            break;
+        }
+        default: {
+            throw new IllegalArgumentException("Unknown queue type" + type);
+        }
+        }
+        ret.getDescriptor().setApplicationType(PARTITION_TYPE);
+        ret.setDispatcher(dispatcher);
+        ret.setStore(this);
+
+        return ret;
+    }
+
+    public final void deleteQueueElement(QueueStore.QueueDescriptor descriptor, MessageDelivery elem) {
+        elem.acknowledge(descriptor);
+    }
+
+    public final boolean isElemPersistent(MessageDelivery elem) {
+        return elem.isPersistent();
+    }
+
+    public final boolean isFromStore(MessageDelivery elem) {
+        return elem.isFromStore();
+    }
+
+    public final void persistQueueElement(QueueStore.QueueDescriptor descriptor, ISourceController<?> controller, MessageDelivery elem, long sequence, boolean delayable) throws Exception {
+        elem.persist(descriptor, controller, sequence, delayable);
+    }
+
+    public final void restoreQueueElements(QueueStore.QueueDescriptor queue, long firstSequence, long maxSequence, int maxCount,
+            org.apache.activemq.queue.QueueStore.RestoreListener<MessageDelivery> listener) {
+        database.restoreMessages(queue, firstSequence, maxSequence, maxCount, listener);
+    }
+
+    public final void addQueue(QueueStore.QueueDescriptor queue) {
+        database.addQueue(queue);
+    }
+
+    public final void deleteQueue(QueueStore.QueueDescriptor queue) {
+        database.deleteQueue(queue);
+    }
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java?rev=769122&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java Mon Apr 27 19:35:21 2009
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.io.IOException;
+
+import org.apache.activemq.broker.store.Store.MessageRecord;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.queue.QueueStore;
+
+public class MessageDeliveryWrapper implements MessageDelivery {
+
+    private final MessageDelivery delegate;
+    
+    public void acknowledge(QueueStore.QueueDescriptor queue) {
+        delegate.acknowledge(queue);
+    }
+
+    public <T> T asType(Class<T> type) {
+        return delegate.asType(type);
+    }
+
+    public MessageRecord createMessageRecord() throws IOException {
+        return delegate.createMessageRecord();
+    }
+
+    public Destination getDestination() {
+        return delegate.getDestination();
+    }
+
+    public int getFlowLimiterSize() {
+        return delegate.getFlowLimiterSize();
+    }
+
+    public AsciiBuffer getMsgId() {
+        return delegate.getMsgId();
+    }
+
+    public int getPriority() {
+        return delegate.getPriority();
+    }
+
+    public AsciiBuffer getProducerId() {
+        return delegate.getProducerId();
+    }
+
+    public long getStoreTracking() {
+        return delegate.getStoreTracking();
+    }
+
+    public Buffer getTransactionId() {
+        return delegate.getTransactionId();
+    }
+
+    public boolean isFromStore() {
+        return delegate.isFromStore();
+    }
+
+    public boolean isPersistent() {
+        return delegate.isPersistent();
+    }
+
+    public boolean isResponseRequired() {
+        return delegate.isResponseRequired();
+    }
+
+    public void onMessagePersisted() {
+        delegate.onMessagePersisted();
+    }
+
+    public void persist(QueueStore.QueueDescriptor queue, ISourceController<?> controller, long sequenceNumber, boolean delayable) throws IOException {
+        delegate.persist(queue, controller, sequenceNumber, delayable);
+    }
+
+    MessageDeliveryWrapper(MessageDelivery delivery) {
+        delegate = delivery;
+    }
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSizeLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSizeLimiter.java?rev=769122&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSizeLimiter.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSizeLimiter.java Mon Apr 27 19:35:21 2009
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+/**
+ * Interface for SizeBasedLimiters
+ * @param <E>
+ */
+public interface IFlowSizeLimiter<E> extends IFlowLimiter<E> {
+
+    public boolean add(int count, long size);
+    
+    public long getCapacity();
+    
+    public long getSize();
+    
+    /**
+     * Returns the size of a limited element
+     */
+    public int getElementSize(E elem);
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPartitionedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPartitionedQueue.java?rev=769122&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPartitionedQueue.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPartitionedQueue.java Mon Apr 27 19:35:21 2009
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+public interface IPartitionedQueue<K, V> extends IQueue<K, V> {
+
+    public IQueue<K, V> createPartition(int partitionKey);
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStore.java?rev=769122&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStore.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStore.java Mon Apr 27 19:35:21 2009
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import java.util.Collection;
+
+import org.apache.activemq.broker.store.BrokerDatabase.OperationContext;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.protobuf.AsciiBuffer;
+
+public interface QueueStore<K, V> {
+
+    public interface SaveableQueueElement<V> {
+        /**
+         * Gets the element to save.
+         * @return
+         */
+        public V getElement();
+
+        /**
+         * Gets the sequence number of the element in the queue
+         * @return
+         */
+        public long getSequenceNumber();
+
+        /**
+         * @return a return value of true will cause {@link #notifySave()} to
+         *         called when this element is persisted
+         */
+        public boolean requestNotify();
+
+        /**
+         * Called when the element has been saved.
+         * 
+         * @return
+         */
+        public boolean notifySave();
+    }
+
+    /**
+     * A holder for queue elements loaded from the store.
+     * 
+     */
+    public interface RestoredElement<V> {
+        /**
+         * @return Gets the restored element
+         * @throws Exception
+         */
+        public V getElement() throws Exception;
+
+        /**
+         * Returns the sequence number of this element in the queue
+         * 
+         * @return the sequence number of this element
+         */
+        long getSequenceNumber();
+
+        /**
+         * Gets the tracking number of the stored message.
+         * 
+         * @return the next sequence number
+         */
+        long getStoreTracking();
+
+        /**
+         * Gets the next sequence number in the queue after this one or -1 if
+         * this is the last stored element
+         * 
+         * @return the next sequence number
+         */
+        long getNextSequenceNumber();
+    }
+
+    /**
+     * A callback to be used with {@link #elementsRestored(Collection)} to pass
+     * the results of a call to
+     * {@link QueueStore#restoreQueueElements(QueueDescriptor, long, long, int, RestoreListener)}
+     */
+    public interface RestoreListener<V> {
+
+        public void elementsRestored(Collection<RestoredElement<V>> restored);
+    }
+
+    public static class QueueDescriptor {
+
+        public static final short SHARED = 0;
+        public static final short SHARED_PRIORITY = 1;
+        public static final short PARTITIONED = 2;
+
+        AsciiBuffer queueName;
+        AsciiBuffer parent;
+        int partitionKey;
+        short applicationType;
+        short queueType = SHARED;
+
+        public QueueDescriptor() {
+        }
+
+        public QueueDescriptor(QueueDescriptor toCopy) {
+            if (toCopy == null) {
+                return;
+            }
+            queueName = toCopy.queueName;
+            applicationType = toCopy.applicationType;
+            queueType = toCopy.queueType;
+            partitionKey = toCopy.partitionKey;
+            parent = toCopy.parent;
+        }
+
+        public QueueDescriptor copy() {
+            return new QueueDescriptor(this);
+        }
+
+        public int getPartitionKey() {
+            return partitionKey;
+        }
+
+        public void setPartitionId(int key) {
+            this.partitionKey = key;
+        }
+
+        /**
+         * Sets the queue type which is useful for querying of queues. The value
+         * must not be less than 0.
+         * 
+         * @param type
+         *            The type of the queue.
+         */
+        public void setApplicationType(short type) {
+            if (type < 0) {
+                throw new IllegalArgumentException();
+            }
+            applicationType = type;
+        }
+
+        /**
+         * @param type
+         *            The type of the queue.
+         */
+        public short getApplicationType() {
+            return applicationType;
+        }
+
+        public short getQueueType() {
+            return queueType;
+        }
+
+        public void setQueueType(short type) {
+            queueType = type;
+        }
+
+        /**
+         * If this queue is a partition of a parent queue, this should be set to
+         * the parent queue's name.
+         * 
+         * @return The parent queue's name
+         */
+        public AsciiBuffer getParent() {
+            return parent;
+        }
+
+        /**
+         * If this queue is a partition of a parent queue, this should be set to
+         * the parent queue's name.
+         */
+        public void setParent(AsciiBuffer parent) {
+            this.parent = parent;
+        }
+
+        public AsciiBuffer getQueueName() {
+            return queueName;
+        }
+
+        public void setQueueName(AsciiBuffer queueName) {
+            this.queueName = queueName;
+        }
+
+        public int hashCode() {
+            return queueName.hashCode();
+        }
+
+        public boolean equals(Object o) {
+            if (o == null) {
+                return false;
+            }
+            if (o == this) {
+                return true;
+            }
+
+            if (o instanceof QueueDescriptor) {
+                return equals((QueueDescriptor) o);
+            } else {
+                return false;
+            }
+        }
+
+        public boolean equals(QueueDescriptor qd) {
+            if (qd.queueName.equals(queueName)) {
+                return true;
+            }
+            return false;
+        }
+    }
+
+    /**
+     * Loads a batch of messages for the specified queue. The loaded messages
+     * are given the provided {@link MessageRestoreListener}.
+     * <p>
+     * <b><i>NOTE:</i></b> This method uses the queue sequence number for the
+     * message not the store tracking number.
+     * 
+     * @param queue
+     *            The queue for which to load messages
+     * @param firstSequence
+     *            The first queue sequence number to load (-1 starts at
+     *            beginning)
+     * @param maxSequence
+     *            The maximum sequence number to load (-1 if no limit)
+     * @param maxCount
+     *            The maximum number of messages to load (-1 if no limit)
+     * @param listener
+     *            The listener to which restored elements should be passed.
+     * @return The {@link OperationContext} associated with the operation
+     */
+    public void restoreQueueElements(QueueDescriptor queue, long firstSequence, long maxSequence, int maxCount, RestoreListener<V> listener);
+
+    /**
+     * Asynchronously deletes an element from the store.
+     * 
+     * @param descriptor
+     *            The queue descriptor
+     * @param element
+     *            The element to delete.
+     */
+    public void deleteQueueElement(QueueDescriptor descriptor, V element);
+
+    /**
+     * Asynchronously saves the given element to the store
+     * 
+     * @param descriptor
+     *            The descriptor for the queue.
+     * @param controller
+     *            A flow controller to use in the event that there isn't room in
+     *            the database.
+     * @param elem
+     *            The element to save
+     * @param sequence
+     *            The sequence number for the saved element
+     * @param delayable
+     *            Whether or not the save operation can be delayed.
+     * @throws Exception
+     *             If there is an error saving the element.
+     */
+    public void persistQueueElement(QueueDescriptor descriptor, ISourceController<?> controller, V elem, long sequence, boolean delayable) throws Exception;
+
+    /**
+     * Tests whether or not the given element is persistent. When a message is
+     * added to a persistent queue it should be saved via
+     * {@link #persistQueueElement(QueueDescriptor, Object, long, boolean)}
+     * 
+     * @param elem
+     *            The element to check.
+     * @return True if the element requires persistence.
+     */
+    public boolean isElemPersistent(V elem);
+
+    /**
+     * Tests whether or not the given element came from the store. If so, a
+     * queue must delete the element when it is finished with it
+     * 
+     * @param elem
+     *            The element to check.
+     * @return True if the element came from the store.
+     */
+    public boolean isFromStore(V elem);
+
+    /**
+     * Adds a queue to the store.
+     * 
+     * @param queue
+     */
+    public void addQueue(QueueDescriptor queue);
+
+    /**
+     * Deletes a queue from the store.
+     * 
+     * @param queue
+     */
+    public void deleteQueue(QueueDescriptor queue);
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java?rev=769122&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java Mon Apr 27 19:35:21 2009
@@ -0,0 +1,603 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.FlowController;
+import org.apache.activemq.flow.IFlowResource;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.IFlowSizeLimiter;
+import org.apache.activemq.flow.ISinkController;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.util.Mapper;
+import org.apache.kahadb.util.LinkedNode;
+import org.apache.kahadb.util.LinkedNodeList;
+
+/**
+ * IQueue which does direct dispatch whenever it can.
+ */
+public class SharedQueueOld<K, V> extends AbstractFlowQueue<V> implements IQueue<K, V> {
+
+    protected TreeMemoryStore store = new TreeMemoryStore();
+
+    private final LinkedNodeList<SubscriptionNode> unreadyDirectSubs = new LinkedNodeList<SubscriptionNode>();
+    private final LinkedNodeList<SubscriptionNode> readyDirectSubs = new LinkedNodeList<SubscriptionNode>();
+
+    private final LinkedNodeList<SubscriptionNode> unreadyPollingSubs = new LinkedNodeList<SubscriptionNode>();
+    private final LinkedNodeList<SubscriptionNode> readyPollingSubs = new LinkedNodeList<SubscriptionNode>();
+
+    private final HashMap<Subscription<V>, SubscriptionNode> subscriptions = new HashMap<Subscription<V>, SubscriptionNode>();
+    private final HashMap<IFlowResource, SubscriptionNode> sinks = new HashMap<IFlowResource, SubscriptionNode>();
+
+    private final FlowController<V> sinkController;
+    private final IFlowSizeLimiter<V> limiter;
+    private final Object mutex;
+
+    protected Mapper<K, V> keyMapper;
+    private long directs;
+
+    private boolean started = false;
+
+    private final ISourceController<V> sourceControler = new ISourceController<V>() {
+
+        public Flow getFlow() {
+            return sinkController.getFlow();
+        }
+
+        public void elementDispatched(V elem) {
+        }
+
+        public void onFlowBlock(ISinkController<?> sink) {
+        }
+
+        public void onFlowResume(ISinkController<?> sinkController) {
+            IFlowResource sink = sinkController.getFlowResource();
+            synchronized (mutex) {
+                SubscriptionNode node = sinks.get(sink);
+                if (node != null) {
+                    node.unlink();
+                    boolean notify = false;
+                    if (node.cursor == null) {
+                        readyDirectSubs.addLast(node);
+                        // System.out.println("Subscription state change: un-ready direct -> ready direct: "+node);
+                    } else {
+                        if (readyPollingSubs.isEmpty()) {
+                            notify = !store.isEmpty();
+                        }
+                        readyPollingSubs.addLast(node);
+                        // System.out.println("Subscription state change: un-ready polling -> ready polling: "+node);
+                    }
+
+                    if (notify) {
+                        notifyReady();
+                    }
+                }
+            }
+        }
+
+        @Override
+        public String toString() {
+            return getResourceName();
+        }
+
+        public IFlowResource getFlowResource() {
+            return SharedQueueOld.this;
+        }
+
+    };
+
+    private QueueStore.QueueDescriptor queueDescriptor;
+
+    public SharedQueueOld(String name, IFlowSizeLimiter<V> limiter) {
+        this(name, limiter, new Object());
+        autoRelease = true;
+    }
+
+    /**
+     * Creates a flow queue that can handle multiple flows.
+     * 
+     * @param flow
+     *            The {@link Flow}
+     * @param controller
+     *            The FlowController if this queue is flow controlled:
+     */
+    public SharedQueueOld(String name, IFlowSizeLimiter<V> limiter, Object mutex) {
+        super(name);
+        queueDescriptor = new QueueStore.QueueDescriptor();
+        queueDescriptor.setQueueName(new AsciiBuffer(super.getResourceName()));
+        this.mutex = mutex;
+        Flow flow = new Flow(name, false);
+        this.limiter = limiter;
+        this.sinkController = new FlowController<V>(getFlowControllableHook(), flow, limiter, mutex);
+        super.onFlowOpened(sinkController);
+    }
+
+
+    public int getEnqueuedCount() {
+        synchronized (mutex) {
+            return store.size();
+        }
+    }
+
+    public long getEnqueuedSize() {
+        synchronized (mutex) {
+            return limiter.getSize();
+        }
+    }
+    
+    public synchronized void start() {
+        if (!started) {
+            started = true;
+            if (isDispatchReady()) {
+                super.notifyReady();
+            }
+        }
+    }
+
+    public synchronized void stop() {
+        started = false;
+    }
+
+    public void initialize(long sequenceMin, long sequenceMax, int count, long size) {
+        // TODO - this queue is not persistent, so we can ignore this.
+    }
+    
+    public void setStore(QueueStore<K, V> store) {
+        //No-op
+    }
+
+    protected final ISinkController<V> getSinkController(V elem, ISourceController<?> source) {
+        return sinkController;
+    }
+
+    /**
+     * Called when the controller accepts a message for this queue.
+     */
+    public void flowElemAccepted(ISourceController<V> controller, V value) {
+        synchronized (mutex) {
+
+            // Try to directly dispatch to one of the attached subscriptions
+            // sourceDispatch returns null on successful dispatch
+            ArrayList<SubscriptionNode> matches = directDispatch(value);
+            if (matches != null) {
+
+                if (directs != 0) {
+                    // System.out.println("could not directly dispatch.. had directly dispatched: "+directs);
+                    directs = 0;
+                }
+
+                K key = keyMapper.map(value);
+                StoreNode<K, V> node = store.add(key, value);
+
+                int matchCount = 0;
+                // Go through the un-ready direct subs and find out if any those
+                // would
+                // have matched the message, and if so then set it up to cursor
+                // from
+                // it.
+                SubscriptionNode sub = unreadyDirectSubs.getHead();
+                while (sub != null) {
+                    SubscriptionNode next = sub.getNext();
+                    if (sub.subscription.matches(value)) {
+                        sub.unlink();
+                        sub.resumeAt(node);
+                        unreadyPollingSubs.addLast(sub);
+                        matchCount++;
+                        // System.out.println("Subscription state change: un-ready direct -> un-ready polling: "+sub);
+                    }
+                    sub = next;
+                }
+
+                // Also do it for all the ready nodes that matched... but which
+                // we
+                // could not enqueue to.
+                for (SubscriptionNode subNode : matches) {
+                    subNode.unlink();
+                    subNode.resumeAt(node);
+                    unreadyPollingSubs.addLast(subNode);
+                    // System.out.println("Subscription state change: ready direct -> un-ready polling: "+subNode);
+                }
+                matchCount += matches.size();
+
+                if (matchCount > 0) {
+                    // We have interested subscriptions for the message.. but
+                    // they are not ready to receive.
+                    // Would be cool if we could flow control the source.
+                }
+
+                if (!readyPollingSubs.isEmpty()) {
+                    notifyReady();
+                }
+            } else {
+                directs++;
+            }
+        }
+    }
+
+    public FlowController<V> getFlowController(Flow flow) {
+        return sinkController;
+    }
+
+    public boolean isDispatchReady() {
+        return started && !store.isEmpty() && !readyPollingSubs.isEmpty();
+    }
+
+    private ArrayList<SubscriptionNode> directDispatch(V elem) {
+        ArrayList<SubscriptionNode> matches = new ArrayList<SubscriptionNode>(readyDirectSubs.size());
+        boolean accepted = false;
+        SubscriptionNode next = null;
+        SubscriptionNode node = readyDirectSubs.getHead();
+        while (node != null) {
+            next = node.getNext();
+            if (node.subscription.matches(elem)) {
+                accepted = node.subscription.getSink().offer(elem, sourceControler);
+                if (accepted) {
+                    if (autoRelease) {
+                        sinkController.elementDispatched(elem);
+                    }
+                    break;
+                } else {
+                    matches.add(node);
+                }
+            }
+            node = next;
+        }
+        if (next != null) {
+            readyDirectSubs.rotateTo(next);
+        }
+        return accepted ? null : matches;
+    }
+
+    public QueueStore.QueueDescriptor getDescriptor() {
+        return queueDescriptor;
+    }
+    
+    public boolean pollingDispatch() {
+
+        // System.out.println("polling dispatch");
+
+        // Keep looping until we can find one subscription that we can
+        // dispatch a message to.
+        while (true) {
+
+            // Find a subscription that has a message available for dispatch.
+            SubscriptionNode subNode = null;
+            StoreNode<K, V> storeNode = null;
+            synchronized (mutex) {
+
+                if (readyPollingSubs.isEmpty()) {
+                    return false;
+                }
+
+                SubscriptionNode next = null;
+                subNode = readyPollingSubs.getHead();
+                while (subNode != null) {
+                    next = subNode.getNext();
+
+                    storeNode = subNode.cursorPeek();
+                    if (storeNode != null) {
+                        // Found a message..
+                        break;
+                    } else {
+                        // Cursor dried up... this subscriber can now be direct
+                        // dispatched.
+                        // System.out.println("Subscription state change: ready polling -> ready direct: "+subNode);
+                        subNode.unlink();
+                        readyDirectSubs.addLast(subNode);
+                    }
+                    subNode = next;
+                }
+
+                if (storeNode == null) {
+                    return false;
+                }
+
+                if (next != null) {
+                    readyPollingSubs.rotateTo(next);
+                }
+            }
+
+            // The subscription's sink may be full..
+            IFlowSink<V> sink = subNode.subscription.getSink();
+            boolean accepted = sink.offer(storeNode.getValue(), sourceControler);
+
+            synchronized (mutex) {
+                if (accepted) {
+                    subNode.cursorNext();
+                    if (subNode.subscription.isPreAcquired() && subNode.subscription.isRemoveOnDispatch()) {
+                        StoreNode<K, V> removed = store.remove(storeNode.getKey());
+                        assert removed != null : "Since the node was aquired.. it should not have been removed by anyone else.";
+                        sinkController.elementDispatched(storeNode.getValue());
+                    }
+                    return true;
+                } else {
+                    // System.out.println("Subscription state change: ready polling -> un-ready polling: "+subNode);
+                    // Subscription is no longer ready..
+                    subNode.cursorUnPeek(storeNode);
+                    subNode.unlink();
+                    unreadyPollingSubs.addLast(subNode);
+                }
+            }
+        }
+    }
+
+    public final V poll() {
+        throw new UnsupportedOperationException("Not supported");
+    }
+
+    public void addSubscription(Subscription<V> subscription) {
+        synchronized (mutex) {
+            SubscriptionNode node = subscriptions.get(subscription);
+            if (node == null) {
+                node = new SubscriptionNode(subscription);
+                subscriptions.put(subscription, node);
+                sinks.put(subscription.getSink(), node);
+                if (!store.isEmpty()) {
+                    readyPollingSubs.addLast(node);
+                    notifyReady();
+                } else {
+                    readyDirectSubs.addLast(node);
+                }
+            }
+        }
+    }
+
+    public boolean removeSubscription(Subscription<V> subscription) {
+        synchronized (mutex) {
+            SubscriptionNode node = subscriptions.remove(subscription);
+            if (node != null) {
+                sinks.remove(subscription.getSink());
+                node.unlink();
+                return true;
+            }
+            return false;
+        }
+    }
+
+    private class SubscriptionNode extends LinkedNode<SubscriptionNode> {
+        public final Subscription<V> subscription;
+        public StoreCursor<K, V> cursor;
+
+        public SubscriptionNode(Subscription<V> subscription) {
+            this.subscription = subscription;
+            this.cursor = store.openCursor();
+        }
+
+        public void resumeAt(StoreNode<K, V> node) {
+            this.cursor = store.openCursorAt(node);
+        }
+
+        public void cursorNext() {
+            cursor.next();
+        }
+
+        public StoreNode<K, V> cursorPeek() {
+            if (cursor == null) {
+                return null;
+            }
+            while (cursor.hasNext()) {
+                StoreNode<K, V> elemNode = cursor.peekNext();
+
+                // Skip over messages that are not a match.
+                if (!subscription.matches(elemNode.getValue())) {
+                    cursor.next();
+                    continue;
+                }
+
+                if (subscription.isPreAcquired()) {
+                    if (elemNode.acquire(subscription)) {
+                        return elemNode;
+                    } else {
+                        cursor.next();
+                        continue;
+                    }
+                }
+            }
+            cursor = null;
+            return null;
+        }
+
+        public void cursorUnPeek(StoreNode<K, V> node) {
+            if (subscription.isPreAcquired()) {
+                node.unacquire();
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "subscription from " + getResourceName() + " to " + subscription;
+        }
+    }
+
+    public Mapper<K, V> getKeyMapper() {
+        return keyMapper;
+    }
+
+    public void setKeyMapper(Mapper<K, V> keyMapper) {
+        this.keyMapper = keyMapper;
+    }
+
+    @Override
+    public String toString() {
+        return getResourceName();
+    }
+
+    public FlowController<V> getFlowControler() {
+        return this.sinkController;
+    }
+    
+    public interface StoreNode<K, V> {
+
+        public boolean acquire(Subscription<V> ownerId);
+
+        public void unacquire();
+
+        public V getValue();
+
+        public K getKey();
+
+    }
+
+    public interface StoreCursor<K, V> extends Iterator<StoreNode<K, V>> {
+        public StoreNode<K, V> peekNext();
+
+        public void setNext(StoreNode<K, V> node);
+
+    }
+    
+    private class TreeMemoryStore
+    {
+        AtomicLong counter = new AtomicLong();
+
+        class MemoryStoreNode implements StoreNode<K, V> {
+            private Subscription<V> owner;
+            private final K key;
+            private final V value;
+            private long id = counter.getAndIncrement();
+
+            public MemoryStoreNode(K key, V value) {
+                this.key = key;
+                this.value = value;
+            }
+
+            public boolean acquire(Subscription<V> owner) {
+                if (this.owner == null) {
+                    this.owner = owner;
+                    return true;
+                }
+                return false;
+            }
+
+            public K getKey() {
+                return key;
+            }
+
+            public V getValue() {
+                return value;
+            }
+
+            @Override
+            public String toString() {
+                return "node:" + id + ", owner=" + owner;
+            }
+
+            public void unacquire() {
+                this.owner = null;
+            }
+
+        }
+
+        class MemoryStoreCursor implements StoreCursor<K, V> {
+            private long last = -1;
+            private MemoryStoreNode next;
+
+            public MemoryStoreCursor() {
+            }
+
+            public MemoryStoreCursor(MemoryStoreNode next) {
+                this.next = next;
+            }
+
+            public void setNext(StoreNode<K, V> next) {
+                this.next = (MemoryStoreNode) next;
+            }
+
+            public boolean hasNext() {
+                if (next != null)
+                    return true;
+
+                SortedMap<Long, MemoryStoreNode> m = order.tailMap(last + 1);
+                if (m.isEmpty()) {
+                    next = null;
+                } else {
+                    next = m.get(m.firstKey());
+                }
+                return next != null;
+            }
+
+            public StoreNode<K, V> peekNext() {
+                hasNext();
+                return next;
+            }
+
+            public StoreNode<K, V> next() {
+                try {
+                    hasNext();
+                    return next;
+                } finally {
+                    last = next.id;
+                    next = null;
+                }
+            }
+
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+
+        }
+
+        protected HashMap<K, MemoryStoreNode> map = new HashMap<K, MemoryStoreNode>();
+        protected TreeMap<Long, MemoryStoreNode> order = new TreeMap<Long, MemoryStoreNode>();
+
+        public StoreNode<K, V> add(K key, V value) {
+            MemoryStoreNode rc = new MemoryStoreNode(key, value);
+            MemoryStoreNode oldNode = map.put(key, rc);
+            if (oldNode != null) {
+                map.put(key, oldNode);
+                throw new IllegalArgumentException("Duplicate key violation");
+            }
+            order.put(rc.id, rc);
+            return rc;
+        }
+
+        public StoreNode<K, V> remove(K key) {
+            MemoryStoreNode node = (MemoryStoreNode) map.remove(key);
+            if (node != null) {
+                order.remove(node.id);
+            }
+            return node;
+        }
+
+        public boolean isEmpty() {
+            return map.isEmpty();
+        }
+
+        public StoreCursor<K, V> openCursor() {
+            MemoryStoreCursor cursor = new MemoryStoreCursor();
+            return cursor;
+        }
+
+        public StoreCursor<K, V> openCursorAt(StoreNode<K, V> next) {
+            MemoryStoreCursor cursor = new MemoryStoreCursor((MemoryStoreNode) next);
+            return cursor;
+        }
+
+        public int size() {
+            return map.size();
+        }
+
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/Mapper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/Mapper.java?rev=769122&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/Mapper.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/Mapper.java Mon Apr 27 19:35:21 2009
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+public interface Mapper<K, V> {
+    K map(V element);
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/PriorityLinkedList.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/PriorityLinkedList.java?rev=769122&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/PriorityLinkedList.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/PriorityLinkedList.java Mon Apr 27 19:35:21 2009
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.util.ArrayList;
+
+import org.apache.kahadb.util.LinkedNode;
+import org.apache.kahadb.util.LinkedNodeList;
+
+public class PriorityLinkedList<E extends LinkedNode<E>> {
+
+    private Mapper<Integer, E> priorityMapper;
+    private final ArrayList<LinkedNodeList<E>> priorityLists;
+    private int highesPriority = 0;
+
+    public PriorityLinkedList(int numPriorities) {
+        this(numPriorities, null);
+    }
+
+    public PriorityLinkedList(int numPriorities, Mapper<Integer, E> priorityMapper) {
+        this.priorityMapper = priorityMapper;
+        priorityLists = new ArrayList<LinkedNodeList<E>>();
+        for (int i = 0; i <= numPriorities; i++) {
+            priorityLists.add(new LinkedNodeList<E>());
+        }
+    }
+
+    public final int getHighestPriority() {
+        return highesPriority;
+    }
+
+    /**
+     * Gets the element at the front of the list:
+     * 
+     * @return
+     */
+    public final E poll() {
+        LinkedNodeList<E> ll = getHighestPriorityList();
+        if (ll == null) {
+            return null;
+        }
+        E node = ll.getHead();
+        node.unlink();
+
+        return node;
+    }
+
+    public final boolean isEmpty() {
+        return peek() != null;
+    }
+
+    /**
+     * Gets the element at the front of the list:
+     * 
+     * @return
+     */
+    public final E peek() {
+        LinkedNodeList<E> ll = getHighestPriorityList();
+        if (ll == null) {
+            return null;
+        }
+
+        return ll.getHead();
+    }
+
+    public final void add(E element) {
+        int prio = priorityMapper.map(element);
+        add(element, prio);
+    }
+
+    public final void add(E element, int prio) {
+        LinkedNodeList<E> ll = priorityLists.get(prio);
+        ll.addLast(element);
+        if (prio > highesPriority) {
+            highesPriority = prio;
+        }
+    }
+
+    private final LinkedNodeList<E> getHighestPriorityList() {
+        LinkedNodeList<E> ll = priorityLists.get(highesPriority);
+        while (ll.isEmpty()) {
+            if (highesPriority == 0) {
+                return null;
+            }
+            highesPriority--;
+            ll = priorityLists.get(highesPriority);
+        }
+
+        return ll;
+    }
+
+    public Mapper<Integer, E> getPriorityMapper() {
+        return priorityMapper;
+    }
+
+    public void setPriorityMapper(Mapper<Integer, E> priorityMapper) {
+        this.priorityMapper = priorityMapper;
+    }
+
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/PriorityMap.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/PriorityMap.java?rev=769122&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/PriorityMap.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/PriorityMap.java Mon Apr 27 19:35:21 2009
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.util.Arrays;
+
+public class PriorityMap<E> {
+
+    int first;
+    int base;
+    int size;
+
+    Object elements[] = new Object[1];
+
+    public E put(int key, E value) {
+        E rc = null;
+        if (isEmpty()) {
+            // This will be the first base prioritu..
+            base = key;
+            elements[0] = value;
+            first = 0;
+        } else {
+            if (key > base) {
+                // New priority is after the current base, we may need to
+                // expaned the
+                // priority array to fit this new one in.
+                int index = key - base;
+                if (elements.length <= index) {
+                    // The funky thing is if the original base was removed,
+                    // resizing
+                    // will rebase the at the first.
+                    resize(index + 1, 0);
+                }
+                if (index < first) {
+                    first = index;
+                }
+                rc = element(index);
+                elements[index] = value;
+            } else {
+                // Ok this element is before the current base so we need to
+                // resize/rebase
+                // using this element as the base.
+                int oldLastIndex = indexOfLast();
+                int newLastIndex = (base + oldLastIndex) - key;
+                resize(newLastIndex + 1, first + (base - key), (oldLastIndex - first) + 1);
+                elements[0] = value;
+                first = 0;
+            }
+        }
+        if (rc == null) {
+            size++;
+        }
+        return rc;
+    }
+
+    private int indexOfLast() {
+        int i = elements.length - 1;
+        while (i >= 0) {
+            if (elements[i] != null) {
+                return i;
+            }
+            i--;
+        }
+        return -1;
+    }
+
+    private void resize(int newSize, int firstOffset) {
+        int count = Math.min(elements.length - first, newSize);
+        resize(newSize, firstOffset, count);
+    }
+
+    private void resize(int newSize, int firstOffset, int copyCount) {
+        Object t[];
+        if (elements.length == newSize) {
+            t = elements;
+            System.arraycopy(elements, first, t, firstOffset, copyCount);
+            Arrays.fill(t, 0, firstOffset, null);
+        } else {
+            t = new Object[newSize];
+            System.arraycopy(elements, first, t, firstOffset, copyCount);
+        }
+        base += (first - firstOffset);
+        elements = t;
+    }
+
+    public E get(int priority) {
+        int index = priority - base;
+        if (index < 0 || index >= elements.length) {
+            return null;
+        }
+        return element(index);
+    }
+
+    @SuppressWarnings("unchecked")
+    private E element(int index) {
+        return (E) elements[index];
+    }
+
+    public E remove(int priority) {
+        int index = priority - base;
+        if (index < 0 || index >= elements.length) {
+            return null;
+        }
+        E rc = element(index);
+        elements[index] = null;
+        if (rc != null) {
+            size--;
+        }
+        return rc;
+    }
+
+    public boolean isEmpty() {
+        return size == 0;
+    }
+
+    public E firstValue() {
+        if (size == 0) {
+            return null;
+        }
+        E rc = element(first);
+        while (rc == null) {
+            // The first element may have been removed so we need to find it...
+            first++;
+            rc = element(first);
+        }
+        return (E) rc;
+    }
+
+    public Integer firstKey() {
+        if (size == 0) {
+            return null;
+        }
+        E rc = element(first);
+        while (rc == null) {
+            // The first element may have been removed so we need to find it...
+            first++;
+            rc = element(first);
+        }
+        return first;
+    }
+
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/SortedLinkedList.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/SortedLinkedList.java?rev=769122&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/SortedLinkedList.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/SortedLinkedList.java Mon Apr 27 19:35:21 2009
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.util.ArrayList;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+
+public class SortedLinkedList<T extends SortedLinkedListNode<T>> {
+
+    protected final TreeMap<Long, T> index;
+    T head;
+    int size;
+
+    public SortedLinkedList() {
+        index = new TreeMap<Long, T>();
+    }
+
+    public boolean isEmpty() {
+        return head == null;
+    }
+
+    public void add(T node) {
+        T prev = null;
+        if (head != null) {
+            // Optimize for case where this is at tail:
+            if (head.prev.getSequence() < node.getSequence()) {
+                prev = head.prev;
+            } else {
+                Entry<Long, T> entry = index.lowerEntry(node.getSequence());
+                if (entry != null) {
+                    prev = entry.getValue();
+                }
+            }
+        }
+        // T prev = index.lower(node);
+        // If this the lowest then the new head is this.
+        if (prev == null) {
+            node.linkToHead(this);
+        } else {
+            prev.linkAfter(node);
+        }
+    }
+
+    public T lower(long sequence, boolean inclusive) {
+        Entry<Long, T> lower = index.floorEntry(sequence);
+        if (lower == null) {
+            return null;
+        }
+
+        if (inclusive) {
+            return lower.getValue();
+        }
+
+        if (lower.getKey() == sequence) {
+            return lower.getValue().prev;
+        } else {
+            return lower.getValue();
+        }
+
+    }
+
+    public T upper(long sequence, boolean inclusive) {
+        if (head == null || head.prev.getSequence() < sequence) {
+            return null;
+        }
+
+        Entry<Long, T> upper = index.ceilingEntry(sequence);
+        if (upper == null) {
+            return null;
+        }
+
+        if (inclusive) {
+            return upper.getValue();
+        }
+
+        if (upper.getKey() == sequence) {
+            return upper.getValue().next;
+        } else {
+            return upper.getValue();
+        }
+    }
+
+    public void remove(T node) {
+        if (node.list == this) {
+            node.unlink();
+        }
+    }
+
+    public T getHead() {
+        return head;
+    }
+
+    public T getTail() {
+        return head.prev;
+    }
+
+    public void clear() {
+        while (head != null) {
+            head.unlink();
+        }
+        index.clear();
+    }
+
+    public int size() {
+        return size;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("[");
+        boolean first = true;
+        T cur = getHead();
+        while (cur != null) {
+            if (!first) {
+                sb.append(", ");
+            }
+            sb.append(cur);
+            first = false;
+            cur = cur.getNext();
+        }
+        sb.append("]");
+        return sb.toString();
+    }
+
+    /**
+     * Copies the nodes of the LinkedNodeList to an ArrayList.
+     * 
+     * @return
+     */
+    public ArrayList<T> toArrayList() {
+        ArrayList<T> rc = new ArrayList<T>(size);
+        T cur = head;
+        while (cur != null) {
+            rc.add(cur);
+            cur = cur.getNext();
+        }
+        return rc;
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/SortedLinkedListNode.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/SortedLinkedListNode.java?rev=769122&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/SortedLinkedListNode.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/SortedLinkedListNode.java Mon Apr 27 19:35:21 2009
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+public abstract class SortedLinkedListNode<T extends SortedLinkedListNode<T>> {
+
+    protected SortedLinkedList<T> list;
+    protected T next;
+    protected T prev;
+
+    public SortedLinkedListNode() {
+    }
+
+    @SuppressWarnings("unchecked")
+    private T getThis() {
+        return (T) this;
+    }
+
+    public T getHeadNode() {
+        return list.head;
+    }
+
+    public T getTailNode() {
+        return list.head.prev;
+    }
+
+    public T getNext() {
+        return isTailNode() ? null : next;
+    }
+
+    public T getPrevious() {
+        return isHeadNode() ? null : prev;
+    }
+
+    public T getNextCircular() {
+        return next;
+    }
+
+    public T getPreviousCircular() {
+        return prev;
+    }
+
+    public boolean isHeadNode() {
+        return list.head == this;
+    }
+
+    public boolean isTailNode() {
+        return list.head.prev == this;
+    }
+
+    /**
+     * Removes this node out of the linked list it is chained in.
+     */
+    public boolean unlink() {
+
+        // If we are already unlinked...
+        if (list == null) {
+            return false;
+        }
+
+        if (getThis() == prev) {
+            // We are the only item in the list
+            list.head = null;
+        } else {
+            // given we linked prev<->this<->next
+            next.prev = prev; // prev<-next
+            prev.next = next; // prev->next
+
+            if (isHeadNode()) {
+                list.head = next;
+            }
+        }
+        list.index.remove(this.getSequence());
+        list.size--;
+        list = null;
+        return true;
+    }
+
+    private void addToIndex(T node, SortedLinkedList<T> list) {
+        if (node.list != null) {
+            throw new IllegalArgumentException("You only insert nodes that are not in a list");
+        }
+
+        T old = list.index.put(node.getSequence(), node);
+        if(old != null)
+        {
+            list.index.put(old.getSequence(), old);
+            throw new IllegalArgumentException("A node with this key is already in the list");
+        }
+
+        node.list = list;
+        list.size++;
+    }
+
+    private final void checkLinkOk(T toLink) {
+        if (toLink == this) {
+            throw new IllegalArgumentException("You cannot link to yourself");
+        }
+
+        if (list == null) {
+            throw new IllegalArgumentException("This node is not yet in a list");
+        }
+    }
+
+    /**
+     * Adds the specified node to the head of the list.
+     * 
+     * @param sub
+     *            The sub list
+     */
+    protected void linkToHead(SortedLinkedList<T> target) {
+
+        if (target.head == null) {
+            addToIndex(getThis(), target);
+            next = prev = target.head = getThis();
+        } else {
+            target.head.linkBefore(getThis());
+        }
+    }
+
+    /**
+     * @param node
+     *            the node to link after this node.
+     * @return this
+     */
+    protected void linkAfter(T node) {
+        checkLinkOk(node);
+        addToIndex(node, list);
+
+        // given we linked this<->next and are inserting node in between
+        node.prev = getThis(); // link this<-node
+        node.next = next; // link node->next
+        next.prev = node; // link node<-next
+        next = node; // this->node
+    }
+
+    /**
+     * @param node
+     *            the node to link after this node.
+     * @return
+     * @return this
+     */
+    protected void linkBefore(T node) {
+        checkLinkOk(node);
+        addToIndex(node, list);
+
+        // given we linked prev<->this and are inserting node in between
+        node.next = getThis(); // node->this
+        node.prev = prev; // prev<-node
+        prev.next = node; // prev->node
+        prev = node; // node<-this
+
+        if (this == list.head) {
+            list.head = node;
+        }
+    }
+
+    public abstract long getSequence(); 
+
+    public boolean isLinked() {
+        return list != null;
+    }
+
+    public SortedLinkedList<T> getList() {
+        return list;
+    }
+}

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java?rev=769122&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java Mon Apr 27 19:35:21 2009
@@ -0,0 +1,467 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.JMSException;
+import org.apache.activemq.broker.openwire.OpenWireMessageDelivery;
+import org.apache.activemq.broker.store.BrokerDatabase;
+import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.StoreFactory;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.dispatch.PriorityDispatcher;
+import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
+import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.IFlowController;
+import org.apache.activemq.flow.IFlowDrain;
+import org.apache.activemq.flow.IFlowRelay;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.ISinkController;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.flow.ISinkController.FlowUnblockListener;
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.MetricCounter;
+import org.apache.activemq.metric.Period;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.ExclusiveQueue;
+import org.apache.activemq.queue.IQueue;
+import org.apache.activemq.queue.QueueStore;
+import org.apache.activemq.queue.SingleFlowRelay;
+import org.apache.activemq.queue.Subscription;
+
+import junit.framework.TestCase;
+
+public class SharedQueuePerfTest extends TestCase {
+
+    private static int PERFORMANCE_SAMPLES = 5;
+
+    IDispatcher dispatcher;
+    BrokerDatabase database;
+    BrokerQueueStore queueStore;
+    private static final boolean USE_KAHA_DB = true;
+    private static final boolean PERSISTENT = true;
+    private static final boolean PURGE_STORE = false;
+
+    protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items");
+    protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items");
+
+    protected ArrayList<Consumer> consumers = new ArrayList<Consumer>();
+    protected ArrayList<Producer> producers = new ArrayList<Producer>();
+    protected ArrayList<IQueue<Long, MessageDelivery>> queues = new ArrayList<IQueue<Long, MessageDelivery>>();
+
+    protected IDispatcher createDispatcher() {
+        return PriorityDispatcher.createPriorityDispatchPool("TestDispatcher", MessageBroker.MAX_PRIORITY, Runtime.getRuntime().availableProcessors());
+    }
+
+    protected void startServices() throws Exception {
+        dispatcher = createDispatcher();
+        dispatcher.start();
+        database = new BrokerDatabase(createStore(), dispatcher);
+        database.start();
+        queueStore = new BrokerQueueStore();
+        queueStore.setDatabase(database);
+        queueStore.setDispatcher(dispatcher);
+        queueStore.loadQueues();
+    }
+
+    protected void stopServices() throws Exception {
+        dispatcher.shutdown();
+        database.stop();
+        dispatcher.shutdown();
+        consumers.clear();
+        producers.clear();
+        queues.clear();
+    }
+
+    protected Store createStore() throws Exception {
+        Store store = null;
+        if (USE_KAHA_DB) {
+            store = StoreFactory.createStore("kaha-db");
+        } else {
+            store = StoreFactory.createStore("memory");
+        }
+
+        store.setStoreDirectory(new File("test-data/shared-message-queue-test/"));
+        store.setDeleteAllMessages(PURGE_STORE);
+        return store;
+    }
+
+    protected void cleanup() throws Exception {
+        consumers.clear();
+        producers.clear();
+        queues.clear();
+        stopServices();
+    }
+
+    
+    public void testSharedQueue_1_1_1() throws Exception {
+        startServices();
+        try {
+            createQueues(1);
+            createProducers(1);
+            createConsumers(1);
+            doTest();
+
+        } finally {
+            cleanup();
+        }
+    }
+
+    public void testSharedQueue_10_10_10() throws Exception {
+        startServices();
+        try {
+            createQueues(10);
+            createProducers(10);
+            createConsumers(10);
+            doTest();
+
+        } finally {
+            cleanup();
+        }
+    }
+    
+    public void testSharedQueue_10_1_10() throws Exception {
+        startServices();
+        try {
+            createQueues(1);
+            createProducers(10);
+            createConsumers(10);
+            doTest();
+
+        } finally {
+            cleanup();
+        }
+    }
+    
+    
+    public void testSharedQueue_10_1_1() throws Exception {
+        startServices();
+        try {
+            createQueues(10);
+            createProducers(10);
+            createConsumers(10);
+            doTest();
+
+        } finally {
+            cleanup();
+        }
+    }
+    
+    public void testSharedQueue_1_1_10() throws Exception {
+        startServices();
+        try {
+            createQueues(10);
+            createProducers(10);
+            createConsumers(10);
+            doTest();
+
+        } finally {
+            cleanup();
+        }
+    }
+
+    private void doTest() throws Exception {
+        
+        try
+        {
+            // Start queues:
+            for (IQueue<Long, MessageDelivery> queue : queues) {
+                queue.start();
+            }
+    
+            // Start consumers:
+            for (Consumer consumer : consumers) {
+                consumer.start();
+            }
+    
+            // Start producers:
+            for (Producer producer : producers) {
+                producer.start();
+            }
+            reportRates();
+        }
+        finally
+        {
+            // Stop producers:
+            for (Producer producer : producers) {
+                producer.stop();
+            }
+            
+            // Stop consumers:
+            for (Consumer consumer : consumers) {
+                consumer.stop();
+            }
+            
+            // Stop queues:
+            for (IQueue<Long, MessageDelivery> queue : queues) {
+                queue.stop();
+            }        
+        }
+    }
+
+    private final void createQueues(int count) {
+        for (int i = 0; i < count; i++) {
+            IQueue<Long, MessageDelivery> queue = queueStore.createSharedQueue("queue-" + (i + 1));
+            queues.add(queue);
+        }
+    }
+
+    private final void createProducers(int count) {
+        for (int i = 0; i < count; i++) {
+            Producer producer = new Producer("producer" + (i + 1), queues.get(i % queues.size()));
+            producers.add(producer);
+        }
+    }
+
+    private final void createConsumers(int count) {
+        for (int i = 0; i < count; i++) {
+            Consumer consumer = new Consumer("consumer" + (i + 1), queues.get(i % queues.size()));
+            consumers.add(consumer);
+        }
+    }
+
+    private void reportRates() throws InterruptedException {
+        System.out.println("Checking rates for test: " + super.getName());
+        for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
+            Period p = new Period();
+            Thread.sleep(5000);
+            System.out.println(totalProducerRate.getRateSummary(p));
+            System.out.println(totalConsumerRate.getRateSummary(p));
+            /*
+             * if (includeDetailedRates) {
+             * System.out.println(totalProducerRate.getChildRateSummary(p));
+             * System.out.println(totalConsumerRate.getChildRateSummary(p)); }
+             */
+            totalProducerRate.reset();
+            totalConsumerRate.reset();
+        }
+    }
+
+    class Producer implements Dispatchable, FlowUnblockListener<OpenWireMessageDelivery> {
+        private AtomicBoolean stopped = new AtomicBoolean(false);
+        private String name;
+        protected final MetricCounter rate = new MetricCounter();
+        private final DispatchContext dispatchContext;
+
+        protected IFlowController<OpenWireMessageDelivery> outboundController;
+        protected final IFlowRelay<OpenWireMessageDelivery> outboundQueue;
+        protected OpenWireMessageDelivery next;
+        private int priority;
+        private final byte[] payload;
+        private int sequenceNumber;
+        private final ActiveMQDestination destination;
+        private final IQueue<Long, MessageDelivery> targetQueue;
+
+        private final ProducerId producerId;
+        private final OpenWireFormat wireFormat;
+
+        public Producer(String name, IQueue<Long, MessageDelivery> targetQueue) {
+            this.name = name;
+            rate.name("Producer " + name + " Rate");
+            totalProducerRate.add(rate);
+            dispatchContext = dispatcher.register(this, name);
+            payload = new byte[1024];
+            producerId = new ProducerId(name);
+            wireFormat = new OpenWireFormat();
+            wireFormat.setCacheEnabled(false);
+            wireFormat.setSizePrefixDisabled(false);
+            wireFormat.setVersion(OpenWireFormat.DEFAULT_VERSION);
+
+            SizeLimiter<OpenWireMessageDelivery> limiter = new SizeLimiter<OpenWireMessageDelivery>(1000, 500);
+            Flow flow = new Flow(name, true);
+            outboundQueue = new SingleFlowRelay<OpenWireMessageDelivery>(flow, name, limiter);
+            outboundQueue.setFlowExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities() - 1));
+            outboundQueue.setDrain(new IFlowDrain<OpenWireMessageDelivery>() {
+
+                public void drain(OpenWireMessageDelivery elem, ISourceController<OpenWireMessageDelivery> controller) {
+
+                    next.setStoreWireFormat(wireFormat);
+                    next.beginDispatch(database);
+                    Producer.this.targetQueue.add(elem, controller);
+                    // Saves the message to the database:
+                    try {
+                        elem.finishDispatch(controller);
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                        stop();
+                    } finally {
+                        controller.elementDispatched(elem);
+                    }
+                }
+            });
+            outboundController = outboundQueue.getFlowController(flow);
+            this.targetQueue = targetQueue;
+            this.destination = new ActiveMQQueue(targetQueue.getResourceName());
+        }
+
+        public void start() {
+            dispatchContext.requestDispatch();
+        }
+
+        public void stop() {
+            stopped.set(true);
+        }
+
+        public boolean dispatch() {
+            if (next == null) {
+                try {
+                    createNextMessage();
+                } catch (JMSException e) {
+                    // TODO Auto-generated catch restoreBlock
+                    e.printStackTrace();
+                    stopped.set(true);
+                    return true;
+                }
+            }
+
+            // If flow controlled stop until flow control is lifted.
+            if (outboundController.isSinkBlocked()) {
+                if (outboundController.addUnblockListener(this)) {
+                    return true;
+                }
+            }
+
+            outboundQueue.add(next, null);
+            rate.increment();
+            next = null;
+            return stopped.get();
+        }
+
+        private void createNextMessage() throws JMSException {
+            ActiveMQBytesMessage message = new ActiveMQBytesMessage();
+            message.setJMSPriority(priority);
+            message.setProducerId(producerId);
+            message.setMessageId(new MessageId(name, ++sequenceNumber));
+            message.setDestination(destination);
+            message.setPersistent(PERSISTENT);
+            if (payload != null) {
+                message.writeBytes(payload);
+            }
+            next = new OpenWireMessageDelivery(message);
+        }
+
+        public void onFlowUnblocked(ISinkController<OpenWireMessageDelivery> controller) {
+            dispatchContext.requestDispatch();
+        }
+        
+        public String toString()
+        {
+            return name + " on " + targetQueue.getResourceName();
+        }
+    }
+
+    class Consumer implements DeliveryTarget {
+        private final HashMap<IQueue<Long, MessageDelivery>, Subscription<MessageDelivery>> subscriptions = new HashMap<IQueue<Long, MessageDelivery>, Subscription<MessageDelivery>>();
+        private AtomicBoolean stopped = new AtomicBoolean(true);
+        protected final MetricCounter rate = new MetricCounter();
+        private final String name;
+        private final SizeLimiter<MessageDelivery> limiter;
+        private final ExclusiveQueue<MessageDelivery> queue;
+        private final IQueue<Long, MessageDelivery> sourceQueue;
+        private final QueueStore.QueueDescriptor queueDescriptor;
+
+        public Consumer(String name, IQueue<Long, MessageDelivery> sourceQueue) {
+            this.sourceQueue = sourceQueue;
+            this.name = name;
+            Flow flow = new Flow(name + "-outbound", false);
+            limiter = new SizeLimiter<MessageDelivery>(1024, 512) {
+                public int getElementSize(MessageDelivery m) {
+                    return m.getFlowLimiterSize();
+                }
+            };
+
+            queue = new ExclusiveQueue<MessageDelivery>(flow, flow.getFlowName(), limiter);
+            queue.setFlowExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities() - 1));
+            queue.setDispatcher(dispatcher);
+            queue.setAutoRelease(true);
+
+            queueDescriptor = new QueueStore.QueueDescriptor();
+            queueDescriptor.setQueueName(new AsciiBuffer(queue.getResourceName()));
+            queueDescriptor.setParent(null);
+
+            queue.setDrain(new IFlowDrain<MessageDelivery>() {
+
+                public void drain(MessageDelivery elem, ISourceController<MessageDelivery> controller) {
+                    elem.acknowledge(queueDescriptor);
+                    rate.increment();
+                }
+            });
+
+            rate.name("Consumer " + name + " Rate");
+            totalConsumerRate.add(rate);
+        }
+
+        public void start() {
+            stopped.set(false);
+            subscribe(sourceQueue);
+        }
+
+        private void subscribe(IQueue<Long, MessageDelivery> source) {
+            Subscription<MessageDelivery> subscription = subscriptions.get(sourceQueue);
+
+            subscriptions.get(sourceQueue);
+            if (subscription == null) {
+                subscription = new Queue.QueueSubscription(this);
+                subscriptions.put(sourceQueue, subscription);
+            }
+            source.addSubscription(subscription);
+        }
+
+        public void stop() throws InterruptedException {
+            sourceQueue.removeSubscription(subscriptions.get(sourceQueue));
+            stopped.set(true);
+        }
+
+        public void deliver(MessageDelivery delivery, ISourceController<?> source) {
+            queue.add(delivery, source);
+        }
+
+        public IFlowSink<MessageDelivery> getSink() {
+            return queue;
+        }
+
+        public boolean isDurable() {
+            return false;
+        }
+
+        public boolean hasSelector() {
+            return false;
+        }
+
+        public boolean match(MessageDelivery message) {
+            return true;
+        }
+        
+        public String toString()
+        {
+            return name + " on " + sourceQueue.getResourceName();
+        }
+    }
+}