You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/04/27 21:35:21 UTC
svn commit: r769122 - in /activemq/sandbox/activemq-flow/src:
main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/flow/
main/java/org/apache/activemq/queue/ main/java/org/apache/activemq/util/
test/java/org/apache/activemq/broker/
Author: chirino
Date: Mon Apr 27 19:35:21 2009
New Revision: 769122
URL: http://svn.apache.org/viewvc?rev=769122&view=rev
Log:
Adding files that I missed adding as part of that last patch.
Added:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSizeLimiter.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPartitionedQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStore.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/Mapper.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/PriorityLinkedList.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/PriorityMap.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/SortedLinkedList.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/SortedLinkedListNode.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java?rev=769122&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java Mon Apr 27 19:35:21 2009
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.apache.activemq.broker.store.BrokerDatabase;
+import org.apache.activemq.broker.store.Store.QueueQueryResult;
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.PrioritySizeLimiter;
+import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.queue.IPartitionedQueue;
+import org.apache.activemq.queue.IQueue;
+import org.apache.activemq.queue.PartitionedQueue;
+import org.apache.activemq.queue.QueueStore;
+import org.apache.activemq.queue.SharedPriorityQueue;
+import org.apache.activemq.queue.SharedQueue;
+import org.apache.activemq.queue.SharedQueueOld;
+import org.apache.activemq.util.Mapper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class BrokerQueueStore implements QueueStore<Long, MessageDelivery> {
+
+ private static final Log LOG = LogFactory.getLog(BrokerQueueStore.class);
+ private static final boolean USE_OLD_QUEUE = false;
+ private static final boolean USE_PRIORITY_QUEUES = true;
+
+ private BrokerDatabase database;
+ private IDispatcher dispatcher;
+
+ private final short PARTITION_TYPE = 0;
+ private final short SHARED_QUEUE_TYPE = 1;
+ //private final short SUBSCRIBER_QUEUE_TYPE = 2;
+
+ private final HashMap<String, IQueue<Long, MessageDelivery>> sharedQueues = new HashMap<String, IQueue<Long, MessageDelivery>>();
+ //private final HashMap<String, IFlowQueue<MessageDelivery>> subscriberQueues = new HashMap<String, IFlowQueue<MessageDelivery>>();
+
+ private Mapper<Integer, MessageDelivery> partitionMapper;
+
+ public static final Mapper<Integer, MessageDelivery> PRIORITY_MAPPER = new Mapper<Integer, MessageDelivery>() {
+ public Integer map(MessageDelivery element) {
+ return element.getPriority();
+ }
+ };
+
+ static public final Mapper<Long, MessageDelivery> KEY_MAPPER = new Mapper<Long, MessageDelivery>() {
+ public Long map(MessageDelivery element) {
+ return element.getStoreTracking();
+ }
+ };
+
+ static public final Mapper<Integer, MessageDelivery> PARTITION_MAPPER = new Mapper<Integer, MessageDelivery>() {
+ public Integer map(MessageDelivery element) {
+ // we modulo 10 to have at most 10 partitions which the producers
+ // gets split across.
+ return (int) (element.getProducerId().hashCode() % 10);
+ }
+ };
+
+ public void setDatabase(BrokerDatabase database) {
+ this.database = database;
+ }
+
+ public void setDispatcher(IDispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ public void loadQueues() throws Exception {
+
+ // Load shared queues
+ Iterator<QueueQueryResult> results = database.listQueues(SHARED_QUEUE_TYPE);
+ while (results.hasNext()) {
+ QueueQueryResult loaded = results.next();
+ IQueue<Long, MessageDelivery> queue = createRestoredQueue(null, loaded);
+ sharedQueues.put(queue.getDescriptor().getQueueName().toString(), queue);
+ LOG.info("Loaded Queue " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize());
+ }
+ }
+
+ private IQueue<Long, MessageDelivery> createRestoredQueue(IPartitionedQueue<Long, MessageDelivery> parent, QueueQueryResult loaded) throws IOException {
+
+ IQueue<Long, MessageDelivery> queue;
+ if (parent != null) {
+ queue = parent.createPartition(loaded.getDescriptor().getPartitionKey());
+ } else {
+ queue = createSharedQueueInternal(loaded.getDescriptor().getQueueName().toString(), loaded.getDescriptor().getQueueType());
+ }
+
+ queue.initialize(loaded.getFirstSequence(), loaded.getLastSequence(), loaded.getCount(), loaded.getSize());
+
+ // Creat the child queues
+ Collection<QueueQueryResult> children = loaded.getPartitions();
+ if (children != null) {
+ try {
+ IPartitionedQueue<Long, MessageDelivery> pQueue = (IPartitionedQueue<Long, MessageDelivery>) queue;
+ for (QueueQueryResult child : children) {
+ createRestoredQueue(pQueue, child);
+ }
+ } catch (ClassCastException cce) {
+ LOG.error("Loaded partition for unpartitionable queue: " + queue.getResourceName());
+ throw cce;
+ }
+ }
+
+ return queue;
+
+ }
+
+ public Collection<IQueue<Long, MessageDelivery>> getSharedQueues() {
+ Collection<IQueue<Long, MessageDelivery>> c = sharedQueues.values();
+ ArrayList<IQueue<Long, MessageDelivery>> ret = new ArrayList<IQueue<Long, MessageDelivery>>(c.size());
+ ret.addAll(c);
+ return ret;
+ }
+
+ public IQueue<Long, MessageDelivery> createSharedQueue(String name) {
+
+ IQueue<Long, MessageDelivery> queue = null;
+ synchronized (this) {
+ queue = sharedQueues.get(name);
+ if (queue == null) {
+ queue = createSharedQueueInternal(name, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
+ queue.getDescriptor().setApplicationType(SHARED_QUEUE_TYPE);
+ queue.initialize(0, 0, 0, 0);
+ sharedQueues.put(name, queue);
+ addQueue(queue.getDescriptor());
+ }
+ }
+
+ return queue;
+ }
+
+ private IQueue<Long, MessageDelivery> createSharedQueueInternal(final String name, short type) {
+
+ IQueue<Long, MessageDelivery> ret;
+
+ switch (type) {
+ case QueueDescriptor.PARTITIONED: {
+ PartitionedQueue<Long, MessageDelivery> queue = new PartitionedQueue<Long, MessageDelivery>(name) {
+ @Override
+ public IQueue<Long, MessageDelivery> createPartition(int partitionKey) {
+ IQueue<Long, MessageDelivery> queue = createSharedQueueInternal(name + "$" + partitionKey, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
+ queue.getDescriptor().setPartitionId(partitionKey);
+ queue.getDescriptor().setParent(this.getDescriptor().getQueueName());
+ return queue;
+ }
+
+ };
+ queue.setPartitionMapper(partitionMapper);
+
+ ret = queue;
+ break;
+ }
+ case QueueDescriptor.SHARED_PRIORITY: {
+ PrioritySizeLimiter<MessageDelivery> limiter = new PrioritySizeLimiter<MessageDelivery>(100, 1, MessageBroker.MAX_PRIORITY);
+ limiter.setPriorityMapper(PRIORITY_MAPPER);
+ SharedPriorityQueue<Long, MessageDelivery> queue = new SharedPriorityQueue<Long, MessageDelivery>(name, limiter);
+ ret = queue;
+ queue.setKeyMapper(KEY_MAPPER);
+ queue.setAutoRelease(true);
+ break;
+ }
+ case QueueDescriptor.SHARED: {
+ SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(100, 1);
+ if (!USE_OLD_QUEUE) {
+ SharedQueue<Long, MessageDelivery> sQueue = new SharedQueue<Long, MessageDelivery>(name, limiter);
+ sQueue.setKeyMapper(KEY_MAPPER);
+ sQueue.setAutoRelease(true);
+ ret = sQueue;
+ } else {
+ SharedQueueOld<Long, MessageDelivery> sQueue = new SharedQueueOld<Long, MessageDelivery>(name, limiter);
+ sQueue.setKeyMapper(KEY_MAPPER);
+ sQueue.setAutoRelease(true);
+ ret = sQueue;
+ }
+ break;
+ }
+ default: {
+ throw new IllegalArgumentException("Unknown queue type" + type);
+ }
+ }
+ ret.getDescriptor().setApplicationType(PARTITION_TYPE);
+ ret.setDispatcher(dispatcher);
+ ret.setStore(this);
+
+ return ret;
+ }
+
+ public final void deleteQueueElement(QueueStore.QueueDescriptor descriptor, MessageDelivery elem) {
+ elem.acknowledge(descriptor);
+ }
+
+ public final boolean isElemPersistent(MessageDelivery elem) {
+ return elem.isPersistent();
+ }
+
+ public final boolean isFromStore(MessageDelivery elem) {
+ return elem.isFromStore();
+ }
+
+ public final void persistQueueElement(QueueStore.QueueDescriptor descriptor, ISourceController<?> controller, MessageDelivery elem, long sequence, boolean delayable) throws Exception {
+ elem.persist(descriptor, controller, sequence, delayable);
+ }
+
+ public final void restoreQueueElements(QueueStore.QueueDescriptor queue, long firstSequence, long maxSequence, int maxCount,
+ org.apache.activemq.queue.QueueStore.RestoreListener<MessageDelivery> listener) {
+ database.restoreMessages(queue, firstSequence, maxSequence, maxCount, listener);
+ }
+
+ public final void addQueue(QueueStore.QueueDescriptor queue) {
+ database.addQueue(queue);
+ }
+
+ public final void deleteQueue(QueueStore.QueueDescriptor queue) {
+ database.deleteQueue(queue);
+ }
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java?rev=769122&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java Mon Apr 27 19:35:21 2009
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.io.IOException;
+
+import org.apache.activemq.broker.store.Store.MessageRecord;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.queue.QueueStore;
+
+public class MessageDeliveryWrapper implements MessageDelivery {
+
+ private final MessageDelivery delegate;
+
+ public void acknowledge(QueueStore.QueueDescriptor queue) {
+ delegate.acknowledge(queue);
+ }
+
+ public <T> T asType(Class<T> type) {
+ return delegate.asType(type);
+ }
+
+ public MessageRecord createMessageRecord() throws IOException {
+ return delegate.createMessageRecord();
+ }
+
+ public Destination getDestination() {
+ return delegate.getDestination();
+ }
+
+ public int getFlowLimiterSize() {
+ return delegate.getFlowLimiterSize();
+ }
+
+ public AsciiBuffer getMsgId() {
+ return delegate.getMsgId();
+ }
+
+ public int getPriority() {
+ return delegate.getPriority();
+ }
+
+ public AsciiBuffer getProducerId() {
+ return delegate.getProducerId();
+ }
+
+ public long getStoreTracking() {
+ return delegate.getStoreTracking();
+ }
+
+ public Buffer getTransactionId() {
+ return delegate.getTransactionId();
+ }
+
+ public boolean isFromStore() {
+ return delegate.isFromStore();
+ }
+
+ public boolean isPersistent() {
+ return delegate.isPersistent();
+ }
+
+ public boolean isResponseRequired() {
+ return delegate.isResponseRequired();
+ }
+
+ public void onMessagePersisted() {
+ delegate.onMessagePersisted();
+ }
+
+ public void persist(QueueStore.QueueDescriptor queue, ISourceController<?> controller, long sequenceNumber, boolean delayable) throws IOException {
+ delegate.persist(queue, controller, sequenceNumber, delayable);
+ }
+
+ MessageDeliveryWrapper(MessageDelivery delivery) {
+ delegate = delivery;
+ }
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSizeLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSizeLimiter.java?rev=769122&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSizeLimiter.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSizeLimiter.java Mon Apr 27 19:35:21 2009
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+/**
+ * Interface for SizeBasedLimiters
+ * @param <E>
+ */
+public interface IFlowSizeLimiter<E> extends IFlowLimiter<E> {
+
+ public boolean add(int count, long size);
+
+ public long getCapacity();
+
+ public long getSize();
+
+ /**
+ * Returns the size of a limited element
+ */
+ public int getElementSize(E elem);
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPartitionedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPartitionedQueue.java?rev=769122&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPartitionedQueue.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPartitionedQueue.java Mon Apr 27 19:35:21 2009
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+public interface IPartitionedQueue<K, V> extends IQueue<K, V> {
+
+ public IQueue<K, V> createPartition(int partitionKey);
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStore.java?rev=769122&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStore.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStore.java Mon Apr 27 19:35:21 2009
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import java.util.Collection;
+
+import org.apache.activemq.broker.store.BrokerDatabase.OperationContext;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.protobuf.AsciiBuffer;
+
+public interface QueueStore<K, V> {
+
+ public interface SaveableQueueElement<V> {
+ /**
+ * Gets the element to save.
+ * @return
+ */
+ public V getElement();
+
+ /**
+ * Gets the sequence number of the element in the queue
+ * @return
+ */
+ public long getSequenceNumber();
+
+ /**
+ * @return a return value of true will cause {@link #notifySave()} to
+ * called when this element is persisted
+ */
+ public boolean requestNotify();
+
+ /**
+ * Called when the element has been saved.
+ *
+ * @return
+ */
+ public boolean notifySave();
+ }
+
+ /**
+ * A holder for queue elements loaded from the store.
+ *
+ */
+ public interface RestoredElement<V> {
+ /**
+ * @return Gets the restored element
+ * @throws Exception
+ */
+ public V getElement() throws Exception;
+
+ /**
+ * Returns the sequence number of this element in the queue
+ *
+ * @return the sequence number of this element
+ */
+ long getSequenceNumber();
+
+ /**
+ * Gets the tracking number of the stored message.
+ *
+ * @return the next sequence number
+ */
+ long getStoreTracking();
+
+ /**
+ * Gets the next sequence number in the queue after this one or -1 if
+ * this is the last stored element
+ *
+ * @return the next sequence number
+ */
+ long getNextSequenceNumber();
+ }
+
+ /**
+ * A callback to be used with {@link #elementsRestored(Collection)} to pass
+ * the results of a call to
+ * {@link QueueStore#restoreQueueElements(QueueDescriptor, long, long, int, RestoreListener)}
+ */
+ public interface RestoreListener<V> {
+
+ public void elementsRestored(Collection<RestoredElement<V>> restored);
+ }
+
+ public static class QueueDescriptor {
+
+ public static final short SHARED = 0;
+ public static final short SHARED_PRIORITY = 1;
+ public static final short PARTITIONED = 2;
+
+ AsciiBuffer queueName;
+ AsciiBuffer parent;
+ int partitionKey;
+ short applicationType;
+ short queueType = SHARED;
+
+ public QueueDescriptor() {
+ }
+
+ public QueueDescriptor(QueueDescriptor toCopy) {
+ if (toCopy == null) {
+ return;
+ }
+ queueName = toCopy.queueName;
+ applicationType = toCopy.applicationType;
+ queueType = toCopy.queueType;
+ partitionKey = toCopy.partitionKey;
+ parent = toCopy.parent;
+ }
+
+ public QueueDescriptor copy() {
+ return new QueueDescriptor(this);
+ }
+
+ public int getPartitionKey() {
+ return partitionKey;
+ }
+
+ public void setPartitionId(int key) {
+ this.partitionKey = key;
+ }
+
+ /**
+ * Sets the queue type which is useful for querying of queues. The value
+ * must not be less than 0.
+ *
+ * @param type
+ * The type of the queue.
+ */
+ public void setApplicationType(short type) {
+ if (type < 0) {
+ throw new IllegalArgumentException();
+ }
+ applicationType = type;
+ }
+
+ /**
+ * @param type
+ * The type of the queue.
+ */
+ public short getApplicationType() {
+ return applicationType;
+ }
+
+ public short getQueueType() {
+ return queueType;
+ }
+
+ public void setQueueType(short type) {
+ queueType = type;
+ }
+
+ /**
+ * If this queue is a partition of a parent queue, this should be set to
+ * the parent queue's name.
+ *
+ * @return The parent queue's name
+ */
+ public AsciiBuffer getParent() {
+ return parent;
+ }
+
+ /**
+ * If this queue is a partition of a parent queue, this should be set to
+ * the parent queue's name.
+ */
+ public void setParent(AsciiBuffer parent) {
+ this.parent = parent;
+ }
+
+ public AsciiBuffer getQueueName() {
+ return queueName;
+ }
+
+ public void setQueueName(AsciiBuffer queueName) {
+ this.queueName = queueName;
+ }
+
+ public int hashCode() {
+ return queueName.hashCode();
+ }
+
+ public boolean equals(Object o) {
+ if (o == null) {
+ return false;
+ }
+ if (o == this) {
+ return true;
+ }
+
+ if (o instanceof QueueDescriptor) {
+ return equals((QueueDescriptor) o);
+ } else {
+ return false;
+ }
+ }
+
+ public boolean equals(QueueDescriptor qd) {
+ if (qd.queueName.equals(queueName)) {
+ return true;
+ }
+ return false;
+ }
+ }
+
+ /**
+ * Loads a batch of messages for the specified queue. The loaded messages
+ * are given the provided {@link MessageRestoreListener}.
+ * <p>
+ * <b><i>NOTE:</i></b> This method uses the queue sequence number for the
+ * message not the store tracking number.
+ *
+ * @param queue
+ * The queue for which to load messages
+ * @param firstSequence
+ * The first queue sequence number to load (-1 starts at
+ * beginning)
+ * @param maxSequence
+ * The maximum sequence number to load (-1 if no limit)
+ * @param maxCount
+ * The maximum number of messages to load (-1 if no limit)
+ * @param listener
+ * The listener to which restored elements should be passed.
+ * @return The {@link OperationContext} associated with the operation
+ */
+ public void restoreQueueElements(QueueDescriptor queue, long firstSequence, long maxSequence, int maxCount, RestoreListener<V> listener);
+
+ /**
+ * Asynchronously deletes an element from the store.
+ *
+ * @param descriptor
+ * The queue descriptor
+ * @param element
+ * The element to delete.
+ */
+ public void deleteQueueElement(QueueDescriptor descriptor, V element);
+
+ /**
+ * Asynchronously saves the given element to the store
+ *
+ * @param descriptor
+ * The descriptor for the queue.
+ * @param controller
+ * A flow controller to use in the event that there isn't room in
+ * the database.
+ * @param elem
+ * The element to save
+ * @param sequence
+ * The sequence number for the saved element
+ * @param delayable
+ * Whether or not the save operation can be delayed.
+ * @throws Exception
+ * If there is an error saving the element.
+ */
+ public void persistQueueElement(QueueDescriptor descriptor, ISourceController<?> controller, V elem, long sequence, boolean delayable) throws Exception;
+
+ /**
+ * Tests whether or not the given element is persistent. When a message is
+ * added to a persistent queue it should be saved via
+ * {@link #persistQueueElement(QueueDescriptor, Object, long, boolean)}
+ *
+ * @param elem
+ * The element to check.
+ * @return True if the element requires persistence.
+ */
+ public boolean isElemPersistent(V elem);
+
+ /**
+ * Tests whether or not the given element came from the store. If so, a
+ * queue must delete the element when it is finished with it
+ *
+ * @param elem
+ * The element to check.
+ * @return True if the element came from the store.
+ */
+ public boolean isFromStore(V elem);
+
+ /**
+ * Adds a queue to the store.
+ *
+ * @param queue
+ */
+ public void addQueue(QueueDescriptor queue);
+
+ /**
+ * Deletes a queue from the store.
+ *
+ * @param queue
+ */
+ public void deleteQueue(QueueDescriptor queue);
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java?rev=769122&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java Mon Apr 27 19:35:21 2009
@@ -0,0 +1,603 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.FlowController;
+import org.apache.activemq.flow.IFlowResource;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.IFlowSizeLimiter;
+import org.apache.activemq.flow.ISinkController;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.util.Mapper;
+import org.apache.kahadb.util.LinkedNode;
+import org.apache.kahadb.util.LinkedNodeList;
+
+/**
+ * IQueue which does direct dispatch whenever it can.
+ */
+public class SharedQueueOld<K, V> extends AbstractFlowQueue<V> implements IQueue<K, V> {
+
+ protected TreeMemoryStore store = new TreeMemoryStore();
+
+ private final LinkedNodeList<SubscriptionNode> unreadyDirectSubs = new LinkedNodeList<SubscriptionNode>();
+ private final LinkedNodeList<SubscriptionNode> readyDirectSubs = new LinkedNodeList<SubscriptionNode>();
+
+ private final LinkedNodeList<SubscriptionNode> unreadyPollingSubs = new LinkedNodeList<SubscriptionNode>();
+ private final LinkedNodeList<SubscriptionNode> readyPollingSubs = new LinkedNodeList<SubscriptionNode>();
+
+ private final HashMap<Subscription<V>, SubscriptionNode> subscriptions = new HashMap<Subscription<V>, SubscriptionNode>();
+ private final HashMap<IFlowResource, SubscriptionNode> sinks = new HashMap<IFlowResource, SubscriptionNode>();
+
+ private final FlowController<V> sinkController;
+ private final IFlowSizeLimiter<V> limiter;
+ private final Object mutex;
+
+ protected Mapper<K, V> keyMapper;
+ private long directs;
+
+ private boolean started = false;
+
+ private final ISourceController<V> sourceControler = new ISourceController<V>() {
+
+ public Flow getFlow() {
+ return sinkController.getFlow();
+ }
+
+ public void elementDispatched(V elem) {
+ }
+
+ public void onFlowBlock(ISinkController<?> sink) {
+ }
+
+ public void onFlowResume(ISinkController<?> sinkController) {
+ IFlowResource sink = sinkController.getFlowResource();
+ synchronized (mutex) {
+ SubscriptionNode node = sinks.get(sink);
+ if (node != null) {
+ node.unlink();
+ boolean notify = false;
+ if (node.cursor == null) {
+ readyDirectSubs.addLast(node);
+ // System.out.println("Subscription state change: un-ready direct -> ready direct: "+node);
+ } else {
+ if (readyPollingSubs.isEmpty()) {
+ notify = !store.isEmpty();
+ }
+ readyPollingSubs.addLast(node);
+ // System.out.println("Subscription state change: un-ready polling -> ready polling: "+node);
+ }
+
+ if (notify) {
+ notifyReady();
+ }
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return getResourceName();
+ }
+
+ public IFlowResource getFlowResource() {
+ return SharedQueueOld.this;
+ }
+
+ };
+
+ private QueueStore.QueueDescriptor queueDescriptor;
+
+ public SharedQueueOld(String name, IFlowSizeLimiter<V> limiter) {
+ this(name, limiter, new Object());
+ autoRelease = true;
+ }
+
+ /**
+ * Creates a flow queue that can handle multiple flows.
+ *
+ * @param flow
+ * The {@link Flow}
+ * @param controller
+ * The FlowController if this queue is flow controlled:
+ */
+ public SharedQueueOld(String name, IFlowSizeLimiter<V> limiter, Object mutex) {
+ super(name);
+ queueDescriptor = new QueueStore.QueueDescriptor();
+ queueDescriptor.setQueueName(new AsciiBuffer(super.getResourceName()));
+ this.mutex = mutex;
+ Flow flow = new Flow(name, false);
+ this.limiter = limiter;
+ this.sinkController = new FlowController<V>(getFlowControllableHook(), flow, limiter, mutex);
+ super.onFlowOpened(sinkController);
+ }
+
+
+ public int getEnqueuedCount() {
+ synchronized (mutex) {
+ return store.size();
+ }
+ }
+
+ public long getEnqueuedSize() {
+ synchronized (mutex) {
+ return limiter.getSize();
+ }
+ }
+
+ public synchronized void start() {
+ if (!started) {
+ started = true;
+ if (isDispatchReady()) {
+ super.notifyReady();
+ }
+ }
+ }
+
+ public synchronized void stop() {
+ started = false;
+ }
+
+ public void initialize(long sequenceMin, long sequenceMax, int count, long size) {
+ // TODO - this queue is not persistent, so we can ignore this.
+ }
+
+ public void setStore(QueueStore<K, V> store) {
+ //No-op
+ }
+
+ protected final ISinkController<V> getSinkController(V elem, ISourceController<?> source) {
+ return sinkController;
+ }
+
+ /**
+ * Called when the controller accepts a message for this queue.
+ */
+ public void flowElemAccepted(ISourceController<V> controller, V value) {
+ synchronized (mutex) {
+
+ // Try to directly dispatch to one of the attached subscriptions
+ // sourceDispatch returns null on successful dispatch
+ ArrayList<SubscriptionNode> matches = directDispatch(value);
+ if (matches != null) {
+
+ if (directs != 0) {
+ // System.out.println("could not directly dispatch.. had directly dispatched: "+directs);
+ directs = 0;
+ }
+
+ K key = keyMapper.map(value);
+ StoreNode<K, V> node = store.add(key, value);
+
+ int matchCount = 0;
+ // Go through the un-ready direct subs and find out if any those
+ // would
+ // have matched the message, and if so then set it up to cursor
+ // from
+ // it.
+ SubscriptionNode sub = unreadyDirectSubs.getHead();
+ while (sub != null) {
+ SubscriptionNode next = sub.getNext();
+ if (sub.subscription.matches(value)) {
+ sub.unlink();
+ sub.resumeAt(node);
+ unreadyPollingSubs.addLast(sub);
+ matchCount++;
+ // System.out.println("Subscription state change: un-ready direct -> un-ready polling: "+sub);
+ }
+ sub = next;
+ }
+
+ // Also do it for all the ready nodes that matched... but which
+ // we
+ // could not enqueue to.
+ for (SubscriptionNode subNode : matches) {
+ subNode.unlink();
+ subNode.resumeAt(node);
+ unreadyPollingSubs.addLast(subNode);
+ // System.out.println("Subscription state change: ready direct -> un-ready polling: "+subNode);
+ }
+ matchCount += matches.size();
+
+ if (matchCount > 0) {
+ // We have interested subscriptions for the message.. but
+ // they are not ready to receive.
+ // Would be cool if we could flow control the source.
+ }
+
+ if (!readyPollingSubs.isEmpty()) {
+ notifyReady();
+ }
+ } else {
+ directs++;
+ }
+ }
+ }
+
+ public FlowController<V> getFlowController(Flow flow) {
+ return sinkController;
+ }
+
+ public boolean isDispatchReady() {
+ return started && !store.isEmpty() && !readyPollingSubs.isEmpty();
+ }
+
+ private ArrayList<SubscriptionNode> directDispatch(V elem) {
+ ArrayList<SubscriptionNode> matches = new ArrayList<SubscriptionNode>(readyDirectSubs.size());
+ boolean accepted = false;
+ SubscriptionNode next = null;
+ SubscriptionNode node = readyDirectSubs.getHead();
+ while (node != null) {
+ next = node.getNext();
+ if (node.subscription.matches(elem)) {
+ accepted = node.subscription.getSink().offer(elem, sourceControler);
+ if (accepted) {
+ if (autoRelease) {
+ sinkController.elementDispatched(elem);
+ }
+ break;
+ } else {
+ matches.add(node);
+ }
+ }
+ node = next;
+ }
+ if (next != null) {
+ readyDirectSubs.rotateTo(next);
+ }
+ return accepted ? null : matches;
+ }
+
+ public QueueStore.QueueDescriptor getDescriptor() {
+ return queueDescriptor;
+ }
+
+ public boolean pollingDispatch() {
+
+ // System.out.println("polling dispatch");
+
+ // Keep looping until we can find one subscription that we can
+ // dispatch a message to.
+ while (true) {
+
+ // Find a subscription that has a message available for dispatch.
+ SubscriptionNode subNode = null;
+ StoreNode<K, V> storeNode = null;
+ synchronized (mutex) {
+
+ if (readyPollingSubs.isEmpty()) {
+ return false;
+ }
+
+ SubscriptionNode next = null;
+ subNode = readyPollingSubs.getHead();
+ while (subNode != null) {
+ next = subNode.getNext();
+
+ storeNode = subNode.cursorPeek();
+ if (storeNode != null) {
+ // Found a message..
+ break;
+ } else {
+ // Cursor dried up... this subscriber can now be direct
+ // dispatched.
+ // System.out.println("Subscription state change: ready polling -> ready direct: "+subNode);
+ subNode.unlink();
+ readyDirectSubs.addLast(subNode);
+ }
+ subNode = next;
+ }
+
+ if (storeNode == null) {
+ return false;
+ }
+
+ if (next != null) {
+ readyPollingSubs.rotateTo(next);
+ }
+ }
+
+ // The subscription's sink may be full..
+ IFlowSink<V> sink = subNode.subscription.getSink();
+ boolean accepted = sink.offer(storeNode.getValue(), sourceControler);
+
+ synchronized (mutex) {
+ if (accepted) {
+ subNode.cursorNext();
+ if (subNode.subscription.isPreAcquired() && subNode.subscription.isRemoveOnDispatch()) {
+ StoreNode<K, V> removed = store.remove(storeNode.getKey());
+ assert removed != null : "Since the node was aquired.. it should not have been removed by anyone else.";
+ sinkController.elementDispatched(storeNode.getValue());
+ }
+ return true;
+ } else {
+ // System.out.println("Subscription state change: ready polling -> un-ready polling: "+subNode);
+ // Subscription is no longer ready..
+ subNode.cursorUnPeek(storeNode);
+ subNode.unlink();
+ unreadyPollingSubs.addLast(subNode);
+ }
+ }
+ }
+ }
+
+ public final V poll() {
+ throw new UnsupportedOperationException("Not supported");
+ }
+
+ public void addSubscription(Subscription<V> subscription) {
+ synchronized (mutex) {
+ SubscriptionNode node = subscriptions.get(subscription);
+ if (node == null) {
+ node = new SubscriptionNode(subscription);
+ subscriptions.put(subscription, node);
+ sinks.put(subscription.getSink(), node);
+ if (!store.isEmpty()) {
+ readyPollingSubs.addLast(node);
+ notifyReady();
+ } else {
+ readyDirectSubs.addLast(node);
+ }
+ }
+ }
+ }
+
+ public boolean removeSubscription(Subscription<V> subscription) {
+ synchronized (mutex) {
+ SubscriptionNode node = subscriptions.remove(subscription);
+ if (node != null) {
+ sinks.remove(subscription.getSink());
+ node.unlink();
+ return true;
+ }
+ return false;
+ }
+ }
+
+ private class SubscriptionNode extends LinkedNode<SubscriptionNode> {
+ public final Subscription<V> subscription;
+ public StoreCursor<K, V> cursor;
+
+ public SubscriptionNode(Subscription<V> subscription) {
+ this.subscription = subscription;
+ this.cursor = store.openCursor();
+ }
+
+ public void resumeAt(StoreNode<K, V> node) {
+ this.cursor = store.openCursorAt(node);
+ }
+
+ public void cursorNext() {
+ cursor.next();
+ }
+
+ public StoreNode<K, V> cursorPeek() {
+ if (cursor == null) {
+ return null;
+ }
+ while (cursor.hasNext()) {
+ StoreNode<K, V> elemNode = cursor.peekNext();
+
+ // Skip over messages that are not a match.
+ if (!subscription.matches(elemNode.getValue())) {
+ cursor.next();
+ continue;
+ }
+
+ if (subscription.isPreAcquired()) {
+ if (elemNode.acquire(subscription)) {
+ return elemNode;
+ } else {
+ cursor.next();
+ continue;
+ }
+ }
+ }
+ cursor = null;
+ return null;
+ }
+
+ public void cursorUnPeek(StoreNode<K, V> node) {
+ if (subscription.isPreAcquired()) {
+ node.unacquire();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "subscription from " + getResourceName() + " to " + subscription;
+ }
+ }
+
+ public Mapper<K, V> getKeyMapper() {
+ return keyMapper;
+ }
+
+ public void setKeyMapper(Mapper<K, V> keyMapper) {
+ this.keyMapper = keyMapper;
+ }
+
+ @Override
+ public String toString() {
+ return getResourceName();
+ }
+
+ public FlowController<V> getFlowControler() {
+ return this.sinkController;
+ }
+
+ public interface StoreNode<K, V> {
+
+ public boolean acquire(Subscription<V> ownerId);
+
+ public void unacquire();
+
+ public V getValue();
+
+ public K getKey();
+
+ }
+
+ public interface StoreCursor<K, V> extends Iterator<StoreNode<K, V>> {
+ public StoreNode<K, V> peekNext();
+
+ public void setNext(StoreNode<K, V> node);
+
+ }
+
+ private class TreeMemoryStore
+ {
+ AtomicLong counter = new AtomicLong();
+
+ class MemoryStoreNode implements StoreNode<K, V> {
+ private Subscription<V> owner;
+ private final K key;
+ private final V value;
+ private long id = counter.getAndIncrement();
+
+ public MemoryStoreNode(K key, V value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public boolean acquire(Subscription<V> owner) {
+ if (this.owner == null) {
+ this.owner = owner;
+ return true;
+ }
+ return false;
+ }
+
+ public K getKey() {
+ return key;
+ }
+
+ public V getValue() {
+ return value;
+ }
+
+ @Override
+ public String toString() {
+ return "node:" + id + ", owner=" + owner;
+ }
+
+ public void unacquire() {
+ this.owner = null;
+ }
+
+ }
+
+ class MemoryStoreCursor implements StoreCursor<K, V> {
+ private long last = -1;
+ private MemoryStoreNode next;
+
+ public MemoryStoreCursor() {
+ }
+
+ public MemoryStoreCursor(MemoryStoreNode next) {
+ this.next = next;
+ }
+
+ public void setNext(StoreNode<K, V> next) {
+ this.next = (MemoryStoreNode) next;
+ }
+
+ public boolean hasNext() {
+ if (next != null)
+ return true;
+
+ SortedMap<Long, MemoryStoreNode> m = order.tailMap(last + 1);
+ if (m.isEmpty()) {
+ next = null;
+ } else {
+ next = m.get(m.firstKey());
+ }
+ return next != null;
+ }
+
+ public StoreNode<K, V> peekNext() {
+ hasNext();
+ return next;
+ }
+
+ public StoreNode<K, V> next() {
+ try {
+ hasNext();
+ return next;
+ } finally {
+ last = next.id;
+ next = null;
+ }
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+ protected HashMap<K, MemoryStoreNode> map = new HashMap<K, MemoryStoreNode>();
+ protected TreeMap<Long, MemoryStoreNode> order = new TreeMap<Long, MemoryStoreNode>();
+
+ public StoreNode<K, V> add(K key, V value) {
+ MemoryStoreNode rc = new MemoryStoreNode(key, value);
+ MemoryStoreNode oldNode = map.put(key, rc);
+ if (oldNode != null) {
+ map.put(key, oldNode);
+ throw new IllegalArgumentException("Duplicate key violation");
+ }
+ order.put(rc.id, rc);
+ return rc;
+ }
+
+ public StoreNode<K, V> remove(K key) {
+ MemoryStoreNode node = (MemoryStoreNode) map.remove(key);
+ if (node != null) {
+ order.remove(node.id);
+ }
+ return node;
+ }
+
+ public boolean isEmpty() {
+ return map.isEmpty();
+ }
+
+ public StoreCursor<K, V> openCursor() {
+ MemoryStoreCursor cursor = new MemoryStoreCursor();
+ return cursor;
+ }
+
+ public StoreCursor<K, V> openCursorAt(StoreNode<K, V> next) {
+ MemoryStoreCursor cursor = new MemoryStoreCursor((MemoryStoreNode) next);
+ return cursor;
+ }
+
+ public int size() {
+ return map.size();
+ }
+
+ }
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/Mapper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/Mapper.java?rev=769122&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/Mapper.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/Mapper.java Mon Apr 27 19:35:21 2009
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+public interface Mapper<K, V> {
+ K map(V element);
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/PriorityLinkedList.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/PriorityLinkedList.java?rev=769122&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/PriorityLinkedList.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/PriorityLinkedList.java Mon Apr 27 19:35:21 2009
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.util.ArrayList;
+
+import org.apache.kahadb.util.LinkedNode;
+import org.apache.kahadb.util.LinkedNodeList;
+
+public class PriorityLinkedList<E extends LinkedNode<E>> {
+
+ private Mapper<Integer, E> priorityMapper;
+ private final ArrayList<LinkedNodeList<E>> priorityLists;
+ private int highesPriority = 0;
+
+ public PriorityLinkedList(int numPriorities) {
+ this(numPriorities, null);
+ }
+
+ public PriorityLinkedList(int numPriorities, Mapper<Integer, E> priorityMapper) {
+ this.priorityMapper = priorityMapper;
+ priorityLists = new ArrayList<LinkedNodeList<E>>();
+ for (int i = 0; i <= numPriorities; i++) {
+ priorityLists.add(new LinkedNodeList<E>());
+ }
+ }
+
+ public final int getHighestPriority() {
+ return highesPriority;
+ }
+
+ /**
+ * Gets the element at the front of the list:
+ *
+ * @return
+ */
+ public final E poll() {
+ LinkedNodeList<E> ll = getHighestPriorityList();
+ if (ll == null) {
+ return null;
+ }
+ E node = ll.getHead();
+ node.unlink();
+
+ return node;
+ }
+
+ public final boolean isEmpty() {
+ return peek() != null;
+ }
+
+ /**
+ * Gets the element at the front of the list:
+ *
+ * @return
+ */
+ public final E peek() {
+ LinkedNodeList<E> ll = getHighestPriorityList();
+ if (ll == null) {
+ return null;
+ }
+
+ return ll.getHead();
+ }
+
+ public final void add(E element) {
+ int prio = priorityMapper.map(element);
+ add(element, prio);
+ }
+
+ public final void add(E element, int prio) {
+ LinkedNodeList<E> ll = priorityLists.get(prio);
+ ll.addLast(element);
+ if (prio > highesPriority) {
+ highesPriority = prio;
+ }
+ }
+
+ private final LinkedNodeList<E> getHighestPriorityList() {
+ LinkedNodeList<E> ll = priorityLists.get(highesPriority);
+ while (ll.isEmpty()) {
+ if (highesPriority == 0) {
+ return null;
+ }
+ highesPriority--;
+ ll = priorityLists.get(highesPriority);
+ }
+
+ return ll;
+ }
+
+ public Mapper<Integer, E> getPriorityMapper() {
+ return priorityMapper;
+ }
+
+ public void setPriorityMapper(Mapper<Integer, E> priorityMapper) {
+ this.priorityMapper = priorityMapper;
+ }
+
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/PriorityMap.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/PriorityMap.java?rev=769122&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/PriorityMap.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/PriorityMap.java Mon Apr 27 19:35:21 2009
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.util.Arrays;
+
+public class PriorityMap<E> {
+
+ int first;
+ int base;
+ int size;
+
+ Object elements[] = new Object[1];
+
+ public E put(int key, E value) {
+ E rc = null;
+ if (isEmpty()) {
+ // This will be the first base prioritu..
+ base = key;
+ elements[0] = value;
+ first = 0;
+ } else {
+ if (key > base) {
+ // New priority is after the current base, we may need to
+ // expaned the
+ // priority array to fit this new one in.
+ int index = key - base;
+ if (elements.length <= index) {
+ // The funky thing is if the original base was removed,
+ // resizing
+ // will rebase the at the first.
+ resize(index + 1, 0);
+ }
+ if (index < first) {
+ first = index;
+ }
+ rc = element(index);
+ elements[index] = value;
+ } else {
+ // Ok this element is before the current base so we need to
+ // resize/rebase
+ // using this element as the base.
+ int oldLastIndex = indexOfLast();
+ int newLastIndex = (base + oldLastIndex) - key;
+ resize(newLastIndex + 1, first + (base - key), (oldLastIndex - first) + 1);
+ elements[0] = value;
+ first = 0;
+ }
+ }
+ if (rc == null) {
+ size++;
+ }
+ return rc;
+ }
+
+ private int indexOfLast() {
+ int i = elements.length - 1;
+ while (i >= 0) {
+ if (elements[i] != null) {
+ return i;
+ }
+ i--;
+ }
+ return -1;
+ }
+
+ private void resize(int newSize, int firstOffset) {
+ int count = Math.min(elements.length - first, newSize);
+ resize(newSize, firstOffset, count);
+ }
+
+ private void resize(int newSize, int firstOffset, int copyCount) {
+ Object t[];
+ if (elements.length == newSize) {
+ t = elements;
+ System.arraycopy(elements, first, t, firstOffset, copyCount);
+ Arrays.fill(t, 0, firstOffset, null);
+ } else {
+ t = new Object[newSize];
+ System.arraycopy(elements, first, t, firstOffset, copyCount);
+ }
+ base += (first - firstOffset);
+ elements = t;
+ }
+
+ public E get(int priority) {
+ int index = priority - base;
+ if (index < 0 || index >= elements.length) {
+ return null;
+ }
+ return element(index);
+ }
+
+ @SuppressWarnings("unchecked")
+ private E element(int index) {
+ return (E) elements[index];
+ }
+
+ public E remove(int priority) {
+ int index = priority - base;
+ if (index < 0 || index >= elements.length) {
+ return null;
+ }
+ E rc = element(index);
+ elements[index] = null;
+ if (rc != null) {
+ size--;
+ }
+ return rc;
+ }
+
+ public boolean isEmpty() {
+ return size == 0;
+ }
+
+ public E firstValue() {
+ if (size == 0) {
+ return null;
+ }
+ E rc = element(first);
+ while (rc == null) {
+ // The first element may have been removed so we need to find it...
+ first++;
+ rc = element(first);
+ }
+ return (E) rc;
+ }
+
+ public Integer firstKey() {
+ if (size == 0) {
+ return null;
+ }
+ E rc = element(first);
+ while (rc == null) {
+ // The first element may have been removed so we need to find it...
+ first++;
+ rc = element(first);
+ }
+ return first;
+ }
+
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/SortedLinkedList.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/SortedLinkedList.java?rev=769122&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/SortedLinkedList.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/SortedLinkedList.java Mon Apr 27 19:35:21 2009
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.util.ArrayList;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+
+public class SortedLinkedList<T extends SortedLinkedListNode<T>> {
+
+ protected final TreeMap<Long, T> index;
+ T head;
+ int size;
+
+ public SortedLinkedList() {
+ index = new TreeMap<Long, T>();
+ }
+
+ public boolean isEmpty() {
+ return head == null;
+ }
+
+ public void add(T node) {
+ T prev = null;
+ if (head != null) {
+ // Optimize for case where this is at tail:
+ if (head.prev.getSequence() < node.getSequence()) {
+ prev = head.prev;
+ } else {
+ Entry<Long, T> entry = index.lowerEntry(node.getSequence());
+ if (entry != null) {
+ prev = entry.getValue();
+ }
+ }
+ }
+ // T prev = index.lower(node);
+ // If this the lowest then the new head is this.
+ if (prev == null) {
+ node.linkToHead(this);
+ } else {
+ prev.linkAfter(node);
+ }
+ }
+
+ public T lower(long sequence, boolean inclusive) {
+ Entry<Long, T> lower = index.floorEntry(sequence);
+ if (lower == null) {
+ return null;
+ }
+
+ if (inclusive) {
+ return lower.getValue();
+ }
+
+ if (lower.getKey() == sequence) {
+ return lower.getValue().prev;
+ } else {
+ return lower.getValue();
+ }
+
+ }
+
+ public T upper(long sequence, boolean inclusive) {
+ if (head == null || head.prev.getSequence() < sequence) {
+ return null;
+ }
+
+ Entry<Long, T> upper = index.ceilingEntry(sequence);
+ if (upper == null) {
+ return null;
+ }
+
+ if (inclusive) {
+ return upper.getValue();
+ }
+
+ if (upper.getKey() == sequence) {
+ return upper.getValue().next;
+ } else {
+ return upper.getValue();
+ }
+ }
+
+ public void remove(T node) {
+ if (node.list == this) {
+ node.unlink();
+ }
+ }
+
+ public T getHead() {
+ return head;
+ }
+
+ public T getTail() {
+ return head.prev;
+ }
+
+ public void clear() {
+ while (head != null) {
+ head.unlink();
+ }
+ index.clear();
+ }
+
+ public int size() {
+ return size;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ boolean first = true;
+ T cur = getHead();
+ while (cur != null) {
+ if (!first) {
+ sb.append(", ");
+ }
+ sb.append(cur);
+ first = false;
+ cur = cur.getNext();
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
+ /**
+ * Copies the nodes of the LinkedNodeList to an ArrayList.
+ *
+ * @return
+ */
+ public ArrayList<T> toArrayList() {
+ ArrayList<T> rc = new ArrayList<T>(size);
+ T cur = head;
+ while (cur != null) {
+ rc.add(cur);
+ cur = cur.getNext();
+ }
+ return rc;
+ }
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/SortedLinkedListNode.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/SortedLinkedListNode.java?rev=769122&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/SortedLinkedListNode.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/SortedLinkedListNode.java Mon Apr 27 19:35:21 2009
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+public abstract class SortedLinkedListNode<T extends SortedLinkedListNode<T>> {
+
+ protected SortedLinkedList<T> list;
+ protected T next;
+ protected T prev;
+
+ public SortedLinkedListNode() {
+ }
+
+ @SuppressWarnings("unchecked")
+ private T getThis() {
+ return (T) this;
+ }
+
+ public T getHeadNode() {
+ return list.head;
+ }
+
+ public T getTailNode() {
+ return list.head.prev;
+ }
+
+ public T getNext() {
+ return isTailNode() ? null : next;
+ }
+
+ public T getPrevious() {
+ return isHeadNode() ? null : prev;
+ }
+
+ public T getNextCircular() {
+ return next;
+ }
+
+ public T getPreviousCircular() {
+ return prev;
+ }
+
+ public boolean isHeadNode() {
+ return list.head == this;
+ }
+
+ public boolean isTailNode() {
+ return list.head.prev == this;
+ }
+
+ /**
+ * Removes this node out of the linked list it is chained in.
+ */
+ public boolean unlink() {
+
+ // If we are already unlinked...
+ if (list == null) {
+ return false;
+ }
+
+ if (getThis() == prev) {
+ // We are the only item in the list
+ list.head = null;
+ } else {
+ // given we linked prev<->this<->next
+ next.prev = prev; // prev<-next
+ prev.next = next; // prev->next
+
+ if (isHeadNode()) {
+ list.head = next;
+ }
+ }
+ list.index.remove(this.getSequence());
+ list.size--;
+ list = null;
+ return true;
+ }
+
+ private void addToIndex(T node, SortedLinkedList<T> list) {
+ if (node.list != null) {
+ throw new IllegalArgumentException("You only insert nodes that are not in a list");
+ }
+
+ T old = list.index.put(node.getSequence(), node);
+ if(old != null)
+ {
+ list.index.put(old.getSequence(), old);
+ throw new IllegalArgumentException("A node with this key is already in the list");
+ }
+
+ node.list = list;
+ list.size++;
+ }
+
+ private final void checkLinkOk(T toLink) {
+ if (toLink == this) {
+ throw new IllegalArgumentException("You cannot link to yourself");
+ }
+
+ if (list == null) {
+ throw new IllegalArgumentException("This node is not yet in a list");
+ }
+ }
+
+ /**
+ * Adds the specified node to the head of the list.
+ *
+ * @param sub
+ * The sub list
+ */
+ protected void linkToHead(SortedLinkedList<T> target) {
+
+ if (target.head == null) {
+ addToIndex(getThis(), target);
+ next = prev = target.head = getThis();
+ } else {
+ target.head.linkBefore(getThis());
+ }
+ }
+
+ /**
+ * @param node
+ * the node to link after this node.
+ * @return this
+ */
+ protected void linkAfter(T node) {
+ checkLinkOk(node);
+ addToIndex(node, list);
+
+ // given we linked this<->next and are inserting node in between
+ node.prev = getThis(); // link this<-node
+ node.next = next; // link node->next
+ next.prev = node; // link node<-next
+ next = node; // this->node
+ }
+
+ /**
+ * @param node
+ * the node to link after this node.
+ * @return
+ * @return this
+ */
+ protected void linkBefore(T node) {
+ checkLinkOk(node);
+ addToIndex(node, list);
+
+ // given we linked prev<->this and are inserting node in between
+ node.next = getThis(); // node->this
+ node.prev = prev; // prev<-node
+ prev.next = node; // prev->node
+ prev = node; // node<-this
+
+ if (this == list.head) {
+ list.head = node;
+ }
+ }
+
+ public abstract long getSequence();
+
+ public boolean isLinked() {
+ return list != null;
+ }
+
+ public SortedLinkedList<T> getList() {
+ return list;
+ }
+}
Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java?rev=769122&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java Mon Apr 27 19:35:21 2009
@@ -0,0 +1,467 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.JMSException;
+import org.apache.activemq.broker.openwire.OpenWireMessageDelivery;
+import org.apache.activemq.broker.store.BrokerDatabase;
+import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.StoreFactory;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.dispatch.PriorityDispatcher;
+import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
+import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.IFlowController;
+import org.apache.activemq.flow.IFlowDrain;
+import org.apache.activemq.flow.IFlowRelay;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.ISinkController;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.flow.ISinkController.FlowUnblockListener;
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.MetricCounter;
+import org.apache.activemq.metric.Period;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.ExclusiveQueue;
+import org.apache.activemq.queue.IQueue;
+import org.apache.activemq.queue.QueueStore;
+import org.apache.activemq.queue.SingleFlowRelay;
+import org.apache.activemq.queue.Subscription;
+
+import junit.framework.TestCase;
+
+public class SharedQueuePerfTest extends TestCase {
+
+ private static int PERFORMANCE_SAMPLES = 5;
+
+ IDispatcher dispatcher;
+ BrokerDatabase database;
+ BrokerQueueStore queueStore;
+ private static final boolean USE_KAHA_DB = true;
+ private static final boolean PERSISTENT = true;
+ private static final boolean PURGE_STORE = false;
+
+ protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items");
+ protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items");
+
+ protected ArrayList<Consumer> consumers = new ArrayList<Consumer>();
+ protected ArrayList<Producer> producers = new ArrayList<Producer>();
+ protected ArrayList<IQueue<Long, MessageDelivery>> queues = new ArrayList<IQueue<Long, MessageDelivery>>();
+
+ protected IDispatcher createDispatcher() {
+ return PriorityDispatcher.createPriorityDispatchPool("TestDispatcher", MessageBroker.MAX_PRIORITY, Runtime.getRuntime().availableProcessors());
+ }
+
+ protected void startServices() throws Exception {
+ dispatcher = createDispatcher();
+ dispatcher.start();
+ database = new BrokerDatabase(createStore(), dispatcher);
+ database.start();
+ queueStore = new BrokerQueueStore();
+ queueStore.setDatabase(database);
+ queueStore.setDispatcher(dispatcher);
+ queueStore.loadQueues();
+ }
+
+ protected void stopServices() throws Exception {
+ dispatcher.shutdown();
+ database.stop();
+ dispatcher.shutdown();
+ consumers.clear();
+ producers.clear();
+ queues.clear();
+ }
+
+ protected Store createStore() throws Exception {
+ Store store = null;
+ if (USE_KAHA_DB) {
+ store = StoreFactory.createStore("kaha-db");
+ } else {
+ store = StoreFactory.createStore("memory");
+ }
+
+ store.setStoreDirectory(new File("test-data/shared-message-queue-test/"));
+ store.setDeleteAllMessages(PURGE_STORE);
+ return store;
+ }
+
+ protected void cleanup() throws Exception {
+ consumers.clear();
+ producers.clear();
+ queues.clear();
+ stopServices();
+ }
+
+
+ public void testSharedQueue_1_1_1() throws Exception {
+ startServices();
+ try {
+ createQueues(1);
+ createProducers(1);
+ createConsumers(1);
+ doTest();
+
+ } finally {
+ cleanup();
+ }
+ }
+
+ public void testSharedQueue_10_10_10() throws Exception {
+ startServices();
+ try {
+ createQueues(10);
+ createProducers(10);
+ createConsumers(10);
+ doTest();
+
+ } finally {
+ cleanup();
+ }
+ }
+
+ public void testSharedQueue_10_1_10() throws Exception {
+ startServices();
+ try {
+ createQueues(1);
+ createProducers(10);
+ createConsumers(10);
+ doTest();
+
+ } finally {
+ cleanup();
+ }
+ }
+
+
+ public void testSharedQueue_10_1_1() throws Exception {
+ startServices();
+ try {
+ createQueues(10);
+ createProducers(10);
+ createConsumers(10);
+ doTest();
+
+ } finally {
+ cleanup();
+ }
+ }
+
+ public void testSharedQueue_1_1_10() throws Exception {
+ startServices();
+ try {
+ createQueues(10);
+ createProducers(10);
+ createConsumers(10);
+ doTest();
+
+ } finally {
+ cleanup();
+ }
+ }
+
+ private void doTest() throws Exception {
+
+ try
+ {
+ // Start queues:
+ for (IQueue<Long, MessageDelivery> queue : queues) {
+ queue.start();
+ }
+
+ // Start consumers:
+ for (Consumer consumer : consumers) {
+ consumer.start();
+ }
+
+ // Start producers:
+ for (Producer producer : producers) {
+ producer.start();
+ }
+ reportRates();
+ }
+ finally
+ {
+ // Stop producers:
+ for (Producer producer : producers) {
+ producer.stop();
+ }
+
+ // Stop consumers:
+ for (Consumer consumer : consumers) {
+ consumer.stop();
+ }
+
+ // Stop queues:
+ for (IQueue<Long, MessageDelivery> queue : queues) {
+ queue.stop();
+ }
+ }
+ }
+
+ private final void createQueues(int count) {
+ for (int i = 0; i < count; i++) {
+ IQueue<Long, MessageDelivery> queue = queueStore.createSharedQueue("queue-" + (i + 1));
+ queues.add(queue);
+ }
+ }
+
+ private final void createProducers(int count) {
+ for (int i = 0; i < count; i++) {
+ Producer producer = new Producer("producer" + (i + 1), queues.get(i % queues.size()));
+ producers.add(producer);
+ }
+ }
+
+ private final void createConsumers(int count) {
+ for (int i = 0; i < count; i++) {
+ Consumer consumer = new Consumer("consumer" + (i + 1), queues.get(i % queues.size()));
+ consumers.add(consumer);
+ }
+ }
+
+ private void reportRates() throws InterruptedException {
+ System.out.println("Checking rates for test: " + super.getName());
+ for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
+ Period p = new Period();
+ Thread.sleep(5000);
+ System.out.println(totalProducerRate.getRateSummary(p));
+ System.out.println(totalConsumerRate.getRateSummary(p));
+ /*
+ * if (includeDetailedRates) {
+ * System.out.println(totalProducerRate.getChildRateSummary(p));
+ * System.out.println(totalConsumerRate.getChildRateSummary(p)); }
+ */
+ totalProducerRate.reset();
+ totalConsumerRate.reset();
+ }
+ }
+
+ class Producer implements Dispatchable, FlowUnblockListener<OpenWireMessageDelivery> {
+ private AtomicBoolean stopped = new AtomicBoolean(false);
+ private String name;
+ protected final MetricCounter rate = new MetricCounter();
+ private final DispatchContext dispatchContext;
+
+ protected IFlowController<OpenWireMessageDelivery> outboundController;
+ protected final IFlowRelay<OpenWireMessageDelivery> outboundQueue;
+ protected OpenWireMessageDelivery next;
+ private int priority;
+ private final byte[] payload;
+ private int sequenceNumber;
+ private final ActiveMQDestination destination;
+ private final IQueue<Long, MessageDelivery> targetQueue;
+
+ private final ProducerId producerId;
+ private final OpenWireFormat wireFormat;
+
+ public Producer(String name, IQueue<Long, MessageDelivery> targetQueue) {
+ this.name = name;
+ rate.name("Producer " + name + " Rate");
+ totalProducerRate.add(rate);
+ dispatchContext = dispatcher.register(this, name);
+ payload = new byte[1024];
+ producerId = new ProducerId(name);
+ wireFormat = new OpenWireFormat();
+ wireFormat.setCacheEnabled(false);
+ wireFormat.setSizePrefixDisabled(false);
+ wireFormat.setVersion(OpenWireFormat.DEFAULT_VERSION);
+
+ SizeLimiter<OpenWireMessageDelivery> limiter = new SizeLimiter<OpenWireMessageDelivery>(1000, 500);
+ Flow flow = new Flow(name, true);
+ outboundQueue = new SingleFlowRelay<OpenWireMessageDelivery>(flow, name, limiter);
+ outboundQueue.setFlowExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities() - 1));
+ outboundQueue.setDrain(new IFlowDrain<OpenWireMessageDelivery>() {
+
+ public void drain(OpenWireMessageDelivery elem, ISourceController<OpenWireMessageDelivery> controller) {
+
+ next.setStoreWireFormat(wireFormat);
+ next.beginDispatch(database);
+ Producer.this.targetQueue.add(elem, controller);
+ // Saves the message to the database:
+ try {
+ elem.finishDispatch(controller);
+ } catch (IOException e) {
+ e.printStackTrace();
+ stop();
+ } finally {
+ controller.elementDispatched(elem);
+ }
+ }
+ });
+ outboundController = outboundQueue.getFlowController(flow);
+ this.targetQueue = targetQueue;
+ this.destination = new ActiveMQQueue(targetQueue.getResourceName());
+ }
+
+ public void start() {
+ dispatchContext.requestDispatch();
+ }
+
+ public void stop() {
+ stopped.set(true);
+ }
+
+ public boolean dispatch() {
+ if (next == null) {
+ try {
+ createNextMessage();
+ } catch (JMSException e) {
+ // TODO Auto-generated catch restoreBlock
+ e.printStackTrace();
+ stopped.set(true);
+ return true;
+ }
+ }
+
+ // If flow controlled stop until flow control is lifted.
+ if (outboundController.isSinkBlocked()) {
+ if (outboundController.addUnblockListener(this)) {
+ return true;
+ }
+ }
+
+ outboundQueue.add(next, null);
+ rate.increment();
+ next = null;
+ return stopped.get();
+ }
+
+ private void createNextMessage() throws JMSException {
+ ActiveMQBytesMessage message = new ActiveMQBytesMessage();
+ message.setJMSPriority(priority);
+ message.setProducerId(producerId);
+ message.setMessageId(new MessageId(name, ++sequenceNumber));
+ message.setDestination(destination);
+ message.setPersistent(PERSISTENT);
+ if (payload != null) {
+ message.writeBytes(payload);
+ }
+ next = new OpenWireMessageDelivery(message);
+ }
+
+ public void onFlowUnblocked(ISinkController<OpenWireMessageDelivery> controller) {
+ dispatchContext.requestDispatch();
+ }
+
+ public String toString()
+ {
+ return name + " on " + targetQueue.getResourceName();
+ }
+ }
+
+ class Consumer implements DeliveryTarget {
+ private final HashMap<IQueue<Long, MessageDelivery>, Subscription<MessageDelivery>> subscriptions = new HashMap<IQueue<Long, MessageDelivery>, Subscription<MessageDelivery>>();
+ private AtomicBoolean stopped = new AtomicBoolean(true);
+ protected final MetricCounter rate = new MetricCounter();
+ private final String name;
+ private final SizeLimiter<MessageDelivery> limiter;
+ private final ExclusiveQueue<MessageDelivery> queue;
+ private final IQueue<Long, MessageDelivery> sourceQueue;
+ private final QueueStore.QueueDescriptor queueDescriptor;
+
+ public Consumer(String name, IQueue<Long, MessageDelivery> sourceQueue) {
+ this.sourceQueue = sourceQueue;
+ this.name = name;
+ Flow flow = new Flow(name + "-outbound", false);
+ limiter = new SizeLimiter<MessageDelivery>(1024, 512) {
+ public int getElementSize(MessageDelivery m) {
+ return m.getFlowLimiterSize();
+ }
+ };
+
+ queue = new ExclusiveQueue<MessageDelivery>(flow, flow.getFlowName(), limiter);
+ queue.setFlowExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities() - 1));
+ queue.setDispatcher(dispatcher);
+ queue.setAutoRelease(true);
+
+ queueDescriptor = new QueueStore.QueueDescriptor();
+ queueDescriptor.setQueueName(new AsciiBuffer(queue.getResourceName()));
+ queueDescriptor.setParent(null);
+
+ queue.setDrain(new IFlowDrain<MessageDelivery>() {
+
+ public void drain(MessageDelivery elem, ISourceController<MessageDelivery> controller) {
+ elem.acknowledge(queueDescriptor);
+ rate.increment();
+ }
+ });
+
+ rate.name("Consumer " + name + " Rate");
+ totalConsumerRate.add(rate);
+ }
+
+ public void start() {
+ stopped.set(false);
+ subscribe(sourceQueue);
+ }
+
+ private void subscribe(IQueue<Long, MessageDelivery> source) {
+ Subscription<MessageDelivery> subscription = subscriptions.get(sourceQueue);
+
+ subscriptions.get(sourceQueue);
+ if (subscription == null) {
+ subscription = new Queue.QueueSubscription(this);
+ subscriptions.put(sourceQueue, subscription);
+ }
+ source.addSubscription(subscription);
+ }
+
+ public void stop() throws InterruptedException {
+ sourceQueue.removeSubscription(subscriptions.get(sourceQueue));
+ stopped.set(true);
+ }
+
+ public void deliver(MessageDelivery delivery, ISourceController<?> source) {
+ queue.add(delivery, source);
+ }
+
+ public IFlowSink<MessageDelivery> getSink() {
+ return queue;
+ }
+
+ public boolean isDurable() {
+ return false;
+ }
+
+ public boolean hasSelector() {
+ return false;
+ }
+
+ public boolean match(MessageDelivery message) {
+ return true;
+ }
+
+ public String toString()
+ {
+ return name + " on " + sourceQueue.getResourceName();
+ }
+ }
+}