You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/10/27 17:27:30 UTC
svn commit: r468414 [6/7] - in
/incubator/activemq/sandbox/qpid/src/main/java/org/apache: activemq/amqp/
activemq/amqp/broker/ activemq/amqp/command/ activemq/amqp/transport/
activemq/amqp/wireformat/ activemq/amqp/wireformat/v8_0/
activemq/qpid/broker...
Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,378 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.activemq.amqp.command.*;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.TxnBuffer;
+import org.apache.qpid.AMQException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Combines the information that make up a deliverable message into a more manageable form.
+ */
+public class AMQMessage
+{
+ private final Set<Object> _tokens = new HashSet<Object>();
+
+ private AMQProtocolSession _publisher;
+
+ private final BasicPublishBody _publishBody;
+
+ private ContentHeaderBody _contentHeaderBody;
+
+ private List<ContentBody> _contentBodies;
+
+ private boolean _redelivered;
+
+ private final long _messageId;
+
+ private final AtomicInteger _referenceCount = new AtomicInteger(1);
+
+ /**
+ * Keeps a track of how many bytes we have received in body frames
+ */
+ private long _bodyLengthReceived = 0;
+
+ /**
+ * The message store in which this message is contained.
+ */
+ private transient final MessageStore _store;
+
+ /**
+ * For non transactional publishes, a message can be stored as
+ * soon as it is complete. For transactional messages it doesnt
+ * need to be stored until the transaction is committed.
+ */
+ private boolean _storeWhenComplete;
+
+ /**
+ * TxnBuffer for transactionally published messages
+ */
+ private TxnBuffer _txnBuffer;
+
+ /**
+ * Flag to indicate whether message has been delivered to a
+ * consumer. Used in implementing return functionality for
+ * messages published with the 'immediate' flag.
+ */
+ private boolean _deliveredToConsumer;
+
+
+ public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody)
+ {
+ this(messageStore, publishBody, true);
+ }
+
+ public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody, boolean storeWhenComplete)
+ {
+ _messageId = messageStore.getNewMessageId();
+ _publishBody = publishBody;
+ _store = messageStore;
+ _contentBodies = new LinkedList<ContentBody>();
+ _storeWhenComplete = storeWhenComplete;
+ }
+
+ public AMQMessage(MessageStore store, long messageId, BasicPublishBody publishBody,
+ ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies)
+ throws AMQException
+
+ {
+ _publishBody = publishBody;
+ _contentHeaderBody = contentHeaderBody;
+ _contentBodies = contentBodies;
+ _messageId = messageId;
+ _store = store;
+ storeMessage();
+ }
+
+ public AMQMessage(MessageStore store, BasicPublishBody publishBody,
+ ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies)
+ throws AMQException
+ {
+ this(store, store.getNewMessageId(), publishBody, contentHeaderBody, contentBodies);
+ }
+
+ protected AMQMessage(AMQMessage msg) throws AMQException
+ {
+ this(msg._store, msg._messageId, msg._publishBody, msg._contentHeaderBody, msg._contentBodies);
+ }
+
+ public void storeMessage() throws AMQException
+ {
+ if (isPersistent())
+ {
+ _store.put(this);
+ }
+ }
+
+ public CompositeAMQDataBlock getDataBlock(BasicDeliverBody deliver, int channel)
+ {
+ AMQDataBlock[] allFrames = new AMQDataBlock[2 + _contentBodies.size()];
+
+ allFrames[0] = deliver;
+ allFrames[1] = _contentHeaderBody;
+ allFrames[1].channel = channel;
+ for (int i = 2; i < allFrames.length; i++)
+ {
+ allFrames[i] = _contentBodies.get(i - 2);
+ allFrames[i].setChannel(channel);
+ }
+ return new CompositeAMQDataBlock(allFrames);
+ }
+
+
+ public CompositeAMQDataBlock getDataBlock(int channel, String consumerTag, long deliveryTag)
+ {
+ AMQDataBlock[] allFrames = new AMQDataBlock[2 + _contentBodies.size()];
+
+ BasicDeliverBody basicDeliverBody = new BasicDeliverBody();
+ basicDeliverBody.setChannel(channel);
+ basicDeliverBody.setConsumerTag(consumerTag);
+ basicDeliverBody.setDeliveryTag(deliveryTag);
+ basicDeliverBody.setRedelivered(_redelivered);
+ basicDeliverBody.setExchange(getExchangeName());
+ basicDeliverBody.setRoutingKey(getRoutingKey());
+ allFrames[0] = basicDeliverBody;
+
+ _contentHeaderBody.setChannel(channel);
+ allFrames[1] = _contentHeaderBody;
+
+ for (int i = 2; i < allFrames.length; i++)
+ {
+ allFrames[i] = _contentBodies.get(i - 2);
+ allFrames[i].setChannel(channel);
+ }
+ return new CompositeAMQDataBlock(allFrames);
+ }
+
+ public List<AMQDataBlock> getPayload()
+ {
+ List<AMQDataBlock> payload = new ArrayList<AMQDataBlock>(2 + _contentBodies.size());
+ payload.add(_publishBody);
+ payload.add(_contentHeaderBody);
+ payload.addAll(_contentBodies);
+ return payload;
+ }
+
+ public BasicPublishBody getPublishBody()
+ {
+ return _publishBody;
+ }
+
+ public ContentHeaderBody getContentHeaderBody()
+ {
+ return _contentHeaderBody;
+ }
+
+ public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException
+ {
+ _contentHeaderBody = contentHeaderBody;
+ if (_storeWhenComplete && isAllContentReceived())
+ {
+ storeMessage();
+ }
+ }
+
+ public List<ContentBody> getContentBodies()
+ {
+ return _contentBodies;
+ }
+
+ public void setContentBodies(List<ContentBody> contentBodies)
+ {
+ _contentBodies = contentBodies;
+ }
+
+ public void addContentBodyFrame(ContentBody contentBody) throws AMQException
+ {
+ _contentBodies.add(contentBody);
+ _bodyLengthReceived += contentBody.getSize();
+ if (_storeWhenComplete && isAllContentReceived())
+ {
+ storeMessage();
+ }
+ }
+
+ public boolean isAllContentReceived()
+ {
+ return _bodyLengthReceived == _contentHeaderBody.bodySize;
+ }
+
+ public boolean isRedelivered()
+ {
+ return _redelivered;
+ }
+
+ String getExchangeName()
+ {
+ return _publishBody.exchange;
+ }
+
+ String getRoutingKey()
+ {
+ return _publishBody.routingKey;
+ }
+
+ boolean isImmediate()
+ {
+ return _publishBody.immediate;
+ }
+
+ NoConsumersException getNoConsumersException(String queue)
+ {
+ return new NoConsumersException(queue, _publishBody, _contentHeaderBody, _contentBodies);
+ }
+
+ void setRedelivered(boolean redelivered)
+ {
+ _redelivered = redelivered;
+ }
+
+ public long getMessageId()
+ {
+ return _messageId;
+ }
+
+ /**
+ * Threadsafe. Increment the reference count on the message.
+ */
+ public void incrementReference()
+ {
+ _referenceCount.incrementAndGet();
+ }
+
+ /**
+ * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
+ * message store.
+ */
+ public void decrementReference() throws MessageCleanupException
+ {
+ // note that the operation of decrementing the reference count and then removing the message does not
+ // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
+ // the message has been passed to all queues. i.e. we are
+ // not relying on the all the increments having taken place before the delivery manager decrements.
+ if (_referenceCount.decrementAndGet() == 0)
+ {
+ try
+ {
+ _store.removeMessage(_messageId);
+ }
+ catch(AMQException e)
+ {
+ //to maintain consistency, we revert the count
+ incrementReference();
+ throw new MessageCleanupException(_messageId, e);
+ }
+ }
+ }
+
+ public void setPublisher(AMQProtocolSession publisher)
+ {
+ _publisher = publisher;
+ }
+
+ public AMQProtocolSession getPublisher()
+ {
+ return _publisher;
+ }
+
+ public boolean checkToken(Object token)
+ {
+ if(_tokens.contains(token))
+ {
+ return true;
+ }
+ else
+ {
+ _tokens.add(token);
+ return false;
+ }
+ }
+
+ public void enqueue(AMQQueue queue) throws AMQException
+ {
+ //if the message is not persistent or the queue is not durable
+ //we will not need to recover the association and so do not
+ //need to record it
+ if(isPersistent() && queue.isDurable())
+ {
+ _store.enqueueMessage(queue.getName(), _messageId);
+ }
+ }
+
+ public void dequeue(AMQQueue queue) throws AMQException
+ {
+ //only record associations where both queue and message will survive
+ //a restart, so only need to remove association if this is the case
+ if(isPersistent() && queue.isDurable())
+ {
+ _store.dequeueMessage(queue.getName(), _messageId);
+ }
+ }
+
+ public boolean isPersistent() throws AMQException
+ {
+ if(_contentHeaderBody == null)
+ {
+ throw new AMQException("Cannot determine delivery mode of message. Content header not found.");
+ }
+
+ //todo remove literal values to a constant file such as AMQConstants in common
+ return _contentHeaderBody.properties instanceof BasicContentHeaderProperties
+ &&((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
+ }
+
+ public void setTxnBuffer(TxnBuffer buffer)
+ {
+ _txnBuffer = buffer;
+ }
+
+ public TxnBuffer getTxnBuffer()
+ {
+ return _txnBuffer;
+ }
+
+ /**
+ * Called to enforce the 'immediate' flag.
+ * @throws NoConsumersException if the message is marked for
+ * immediate delivery but has not been marked as delivered to a
+ * consumer
+ */
+ public void checkDeliveredToConsumer() throws NoConsumersException{
+ if(isImmediate() && !_deliveredToConsumer)
+ {
+ throw new NoConsumersException(_publishBody, _contentHeaderBody, _contentBodies);
+ }
+ }
+
+ /**
+ * Called when this message is delivered to a consumer. (used to
+ * implement the 'immediate' flag functionality).
+ */
+ public void setDeliveredToConsumer(){
+ _deliveredToConsumer = true;
+ }
+}
Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,827 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.activemq.amqp.command.*;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.management.Managable;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.txn.TxnBuffer;
+import org.apache.qpid.server.txn.TxnOp;
+
+import javax.management.JMException;
+import javax.management.MBeanException;
+import javax.management.MBeanNotificationInfo;
+import javax.management.NotCompliantMBeanException;
+import javax.management.Notification;
+import javax.management.monitor.MonitorNotification;
+import javax.management.openmbean.*;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+/**
+ * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like
+ * that. It is described fully in RFC 006.
+ */
+public class AMQQueue implements Managable
+{
+ private static final Logger _logger = Logger.getLogger(AMQQueue.class);
+
+ private final String _name;
+
+ /**
+ * null means shared
+ */
+ private final String _owner;
+
+ private final boolean _durable;
+
+ /**
+ * If true, this queue is deleted when the last subscriber is removed
+ */
+ private final boolean _autoDelete;
+
+ /**
+ * Holds subscribers to the queue.
+ */
+ private final SubscriptionSet _subscribers;
+
+ private final SubscriptionFactory _subscriptionFactory;
+
+ /**
+ * Manages message delivery.
+ */
+ private final DeliveryManager _deliveryMgr;
+
+ /**
+ * The queue registry with which this queue is registered.
+ */
+ private final QueueRegistry _queueRegistry;
+
+ /**
+ * Used to track bindings to exchanges so that on deletion they can easily
+ * be cancelled.
+ */
+ private final ExchangeBindings _bindings = new ExchangeBindings(this);
+
+ /**
+ * Executor on which asynchronous delivery will be carriedout where required
+ */
+ private final Executor _asyncDelivery;
+
+ private final AMQQueueMBean _managedObject;
+
+ /**
+ * max allowed size of a single message(in KBytes).
+ */
+ private long _maxAllowedMessageSize = 10000; // 10 MB
+
+ /**
+ * max allowed number of messages on a queue.
+ */
+ private Integer _maxAllowedMessageCount = 10000;
+
+ /**
+ * max allowed size in KBytes for all the messages combined together in a queue.
+ */
+ private long _queueDepth = 10000000; // 10 GB
+
+ /**
+ * total messages received by the queue since startup.
+ */
+ private long _totalMessagesReceived = 0;
+
+ /**
+ * MBean class for AMQQueue. It implements all the management features exposed
+ * for an AMQQueue.
+ */
+ @MBeanDescription("Management Interface for AMQQueue")
+ private final class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
+ {
+ private String _queueName = null;
+ //private MBeanInfo _mbeanInfo;
+
+ // AMQ message attribute names exposed.
+ private String[] _msgAttributeNames = {"MessageId",
+ "Redelivered",
+ "Content's size",
+ "Contents"};
+ // AMQ Message attribute descriptions.
+ private String[] _msgAttributeDescriptions = {"Message Id",
+ "Redelivered",
+ "Message content's size in bytes",
+ "Message content bodies"};
+ // AMQ message attribute types.
+ private OpenType[] _msgAttributeTypes = new OpenType[4];
+ // Messages will be indexed according to the messageId.
+ private String[] _msgAttributeIndex = {"MessageId"};
+ // Composite type for representing AMQ Message data.
+ private CompositeType _messageDataType = null;
+ // Datatype for representing AMQ messages list.
+ private TabularType _messagelistDataType = null;
+
+ private String[] _contentNames = {"SerialNumber", "ContentBody"};
+ private String[] _contentDesc = {"Serial Number", "Content Body"};
+ private String[] _contentIndex = {"SerialNumber"};
+ private OpenType[] _contentType = new OpenType[2];
+ private CompositeType _contentBodyType = null;
+ private TabularType _contentBodyListType = null;
+
+ @MBeanConstructor("Creates an MBean exposing an AMQQueue.")
+ public AMQQueueMBean() throws NotCompliantMBeanException
+ {
+ super(ManagedQueue.class, ManagedQueue.TYPE);
+ init();
+ }
+
+ private void init()
+ {
+ _queueName = jmxEncode(new StringBuffer(_name), 0).toString();
+ try
+ {
+ _contentType[0] = SimpleType.INTEGER;
+ _contentType[1] = new ArrayType(1, SimpleType.BYTE);
+ _contentBodyType = new CompositeType("Content",
+ "Content",
+ _contentNames,
+ _contentDesc,
+ _contentType);
+ _contentBodyListType = new TabularType("MessageContents",
+ "Message Contents",
+ _contentBodyType,
+ _contentIndex);
+
+ _msgAttributeTypes[0] = SimpleType.LONG;
+ _msgAttributeTypes[1] = SimpleType.BOOLEAN;
+ _msgAttributeTypes[2] = SimpleType.LONG;
+ _msgAttributeTypes[3] = _contentBodyListType;
+
+ _messageDataType = new CompositeType("Message",
+ "AMQ Message",
+ _msgAttributeNames,
+ _msgAttributeDescriptions,
+ _msgAttributeTypes);
+ _messagelistDataType = new TabularType("Messages",
+ "List of messages",
+ _messageDataType,
+ _msgAttributeIndex);
+ }
+ catch (OpenDataException ex)
+ {
+ _logger.error("OpenDataTypes could not be created.", ex);
+ throw new RuntimeException(ex);
+ }
+ }
+
+ public String getObjectInstanceName()
+ {
+ return _queueName;
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public boolean isDurable()
+ {
+ return _durable;
+ }
+
+ public String getOwner()
+ {
+ return _owner;
+ }
+
+ public boolean isAutoDelete()
+ {
+ return _autoDelete;
+ }
+
+ public Integer getMessageCount()
+ {
+ return _deliveryMgr.getQueueMessageCount();
+ }
+
+ public Long getMaximumMessageSize()
+ {
+ return _maxAllowedMessageSize;
+ }
+
+ public void setMaximumMessageSize(Long value)
+ {
+ _maxAllowedMessageSize = value;
+ }
+
+ public Integer getConsumerCount()
+ {
+ return _subscribers.size();
+ }
+
+ public Integer getActiveConsumerCount()
+ {
+ return _subscribers.getWeight();
+ }
+
+ public Long getReceivedMessageCount()
+ {
+ return _totalMessagesReceived;
+ }
+
+ public Integer getMaximumMessageCount()
+ {
+ return _maxAllowedMessageCount;
+ }
+
+ public void setMaximumMessageCount(Integer value)
+ {
+ _maxAllowedMessageCount = value;
+ }
+
+ public Long getQueueDepth()
+ {
+ return _queueDepth;
+ }
+
+ // Sets the queue depth, the max queue size
+ public void setQueueDepth(Long value)
+ {
+ _queueDepth = value;
+ }
+
+ // Returns the size of messages in the queue
+ public Long getQueueSize()
+ {
+ List<AMQMessage> list = _deliveryMgr.getMessages();
+ if (list.size() == 0)
+ {
+ return 0l;
+ }
+
+ long queueSize = 0;
+ for (AMQMessage message : list)
+ {
+ queueSize = queueSize + getMessageSize(message);
+ }
+ return new Long(Math.round(queueSize / 100));
+ }
+ // Operations
+
+ // calculates the size of an AMQMessage
+
+ private long getMessageSize(AMQMessage msg)
+ {
+ if (msg == null)
+ {
+ return 0l;
+ }
+
+ List<ContentBody> cBodies = msg.getContentBodies();
+ long messageSize = 0;
+ for (ContentBody body : cBodies)
+ {
+ if (body != null)
+ {
+ messageSize = messageSize + body.getSize();
+ }
+ }
+ return messageSize;
+ }
+
+ // Checks if there is any notification to be send to the listeners
+ private void checkForNotification(AMQMessage msg)
+ {
+ // Check for message count
+ Integer msgCount = getMessageCount();
+ if (msgCount >= getMaximumMessageCount())
+ {
+ notifyClients("MessageCount = " + msgCount + ", Queue has reached its size limit and is now full.");
+ }
+
+ // Check for received message size
+ long messageSize = getMessageSize(msg);
+ if (messageSize >= getMaximumMessageSize())
+ {
+ notifyClients("MessageSize = " + messageSize + ", Message size (MessageID=" + msg.getMessageId() +
+ ")is higher than the threshold value");
+ }
+
+ // Check for queue size in bytes
+ long queueSize = getQueueSize();
+ if (queueSize >= getQueueDepth())
+ {
+ notifyClients("QueueSize = " + queueSize + ", Queue size has reached the threshold value");
+ }
+ }
+
+ // Send the notification to the listeners
+ private void notifyClients(String notificationMsg)
+ {
+ Notification n = new Notification(
+ MonitorNotification.THRESHOLD_VALUE_EXCEEDED,
+ this,
+ ++_notificationSequenceNumber,
+ System.currentTimeMillis(),
+ notificationMsg);
+
+ _broadcaster.sendNotification(n);
+ }
+
+ public void deleteMessageFromTop() throws JMException
+ {
+ try
+ {
+ _deliveryMgr.removeAMessageFromTop();
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex, ex.toString());
+ }
+ }
+
+ public void clearQueue() throws JMException
+ {
+ try
+ {
+ _deliveryMgr.clearAllMessages();
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex, ex.toString());
+ }
+ }
+
+ /**
+ * Returns the messages stored in this queue in tabular form.
+ *
+ * @param beginIndex
+ * @param endIndex
+ * @return AMQ messages in tabular form.
+ * @throws JMException
+ */
+ public TabularData viewMessages(int beginIndex, int endIndex) throws JMException
+ {
+ if ((beginIndex > endIndex) || (beginIndex < 1))
+ {
+ throw new JMException("FromIndex = " + beginIndex + ", ToIndex = " + endIndex +
+ "\nFromIndex should be greater than 0 and less than ToIndex");
+ }
+
+ List<AMQMessage> list = _deliveryMgr.getMessages();
+
+ if (beginIndex > list.size())
+ {
+ throw new JMException("FromIndex = " + beginIndex + ". There are only " + list.size() + " messages in the queue");
+ }
+
+ endIndex = endIndex < list.size() ? endIndex : list.size();
+ TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
+
+ for (int i = beginIndex; i <= endIndex; i++)
+ {
+ AMQMessage msg = list.get(i - 1);
+ long msgId = msg.getMessageId();
+
+ List<ContentBody> cBodies = msg.getContentBodies();
+
+ TabularDataSupport _contentList = new TabularDataSupport(_contentBodyListType);
+ int contentSerialNo = 1;
+ long size = 0;
+
+ for (ContentBody body : cBodies)
+ {
+ if (body.getSize() != 0)
+ {
+ Byte[] byteArray = getByteArray(body.payload);
+ size = size + byteArray.length;
+
+ Object[] contentValues = {contentSerialNo, byteArray};
+ CompositeData contentData = new CompositeDataSupport(_contentBodyType,
+ _contentNames,
+ contentValues);
+
+ _contentList.put(contentData);
+ }
+ }
+
+ Object[] itemValues = {msgId, true, size, _contentList};
+ CompositeData messageData = new CompositeDataSupport(_messageDataType,
+ _msgAttributeNames,
+ itemValues);
+ _messageList.put(messageData);
+ }
+
+ return _messageList;
+ }
+
+ /**
+ * A utility to convert byte[] to Byte[]. Required to create composite
+ * type for message contents.
+ *
+ * @param sequence message content as byte[]
+ * @return Byte[]
+ */
+ private Byte[] getByteArray(ByteSequence sequence)
+ {
+ List<Byte> list = new ArrayList<Byte>();
+
+ for (int i = 0; i < sequence.length; i++)
+ {
+ list.add(sequence.data[i+sequence.offset]);
+ }
+
+ return list.toArray(new Byte[0]);
+ }
+
+ /**
+ * Creates all the notifications this MBean can send.
+ *
+ * @return Notifications broadcasted by this MBean.
+ */
+ @Override
+ public MBeanNotificationInfo[] getNotificationInfo()
+ {
+ String[] notificationTypes = new String[]
+ {MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
+ String name = MonitorNotification.class.getName();
+ String description = "An attribute of this MBean has reached threshold value";
+ MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes,
+ name,
+ description);
+
+ return new MBeanNotificationInfo[]{info1};
+ }
+
+ } // End of AMQMBean class
+
+ public AMQQueue(String name, boolean durable, String owner,
+ boolean autoDelete, QueueRegistry queueRegistry)
+ throws AMQException
+ {
+ this(name, durable, owner, autoDelete, queueRegistry,
+ Executors.newCachedThreadPool(), new SubscriptionImpl.Factory());
+ }
+
+ public AMQQueue(String name, boolean durable, String owner,
+ boolean autoDelete, QueueRegistry queueRegistry, SubscriptionFactory subscriptionFactory)
+ throws AMQException
+ {
+ this(name, durable, owner, autoDelete, queueRegistry,
+ Executors.newCachedThreadPool(), subscriptionFactory);
+ }
+
+ public AMQQueue(String name, boolean durable, String owner,
+ boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery,
+ SubscriptionFactory subscriptionFactory)
+ throws AMQException
+ {
+
+ this(name, durable, owner, autoDelete, queueRegistry, asyncDelivery, new SubscriptionSet(), subscriptionFactory);
+ }
+
+ public AMQQueue(String name, boolean durable, String owner,
+ boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery)
+ throws AMQException
+ {
+
+ this(name, durable, owner, autoDelete, queueRegistry, asyncDelivery, new SubscriptionSet(),
+ new SubscriptionImpl.Factory());
+ }
+
+ protected AMQQueue(String name, boolean durable, String owner,
+ boolean autoDelete, QueueRegistry queueRegistry,
+ SubscriptionSet subscribers, SubscriptionFactory subscriptionFactory)
+ throws AMQException
+ {
+ this(name, durable, owner, autoDelete, queueRegistry,
+ Executors.newCachedThreadPool(), subscribers, subscriptionFactory);
+ }
+
+ protected AMQQueue(String name, boolean durable, String owner,
+ boolean autoDelete, QueueRegistry queueRegistry,
+ SubscriptionSet subscribers)
+ throws AMQException
+ {
+ this(name, durable, owner, autoDelete, queueRegistry,
+ Executors.newCachedThreadPool(), subscribers, new SubscriptionImpl.Factory());
+ }
+
+ protected AMQQueue(String name, boolean durable, String owner,
+ boolean autoDelete, QueueRegistry queueRegistry,
+ Executor asyncDelivery, SubscriptionSet subscribers, SubscriptionFactory subscriptionFactory)
+ throws AMQException
+ {
+ if (name == null)
+ {
+ throw new IllegalArgumentException("Queue name must not be null");
+ }
+ if (queueRegistry == null)
+ {
+ throw new IllegalArgumentException("Queue registry must not be null");
+ }
+ _name = name;
+ _durable = durable;
+ _owner = owner;
+ _autoDelete = autoDelete;
+ _queueRegistry = queueRegistry;
+ _asyncDelivery = asyncDelivery;
+ _managedObject = createMBean();
+ _managedObject.register();
+ _subscribers = subscribers;
+ _subscriptionFactory = subscriptionFactory;
+ _deliveryMgr = new DeliveryManager(_subscribers, this);
+ }
+
+ private AMQQueueMBean createMBean() throws AMQException
+ {
+ try
+ {
+ return new AMQQueueMBean();
+ }
+ catch (NotCompliantMBeanException ex)
+ {
+ throw new AMQException("AMQQueue MBean creation has failed.", ex);
+ }
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public boolean isShared()
+ {
+ return _owner == null;
+ }
+
+ public boolean isDurable()
+ {
+ return _durable;
+ }
+
+ public String getOwner()
+ {
+ return _owner;
+ }
+
+ public boolean isAutoDelete()
+ {
+ return _autoDelete;
+ }
+
+ public int getMessageCount()
+ {
+ return _deliveryMgr.getQueueMessageCount();
+ }
+
+ public ManagedObject getManagedObject()
+ {
+ return _managedObject;
+ }
+
+ public void bind(String routingKey, Exchange exchange)
+ {
+ _bindings.addBinding(routingKey, exchange);
+ }
+
+ public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks)
+ throws AMQException
+ {
+ debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
+
+ Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks);
+ _subscribers.addSubscriber(subscription);
+ }
+
+ public void unregisterProtocolSession(AMQProtocolSession ps, int channel, String consumerTag) throws AMQException
+ {
+ debug("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag,
+ this);
+
+ Subscription removedSubscription;
+ if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel,
+ ps,
+ consumerTag)))
+ == null)
+ {
+ throw new AMQException("Protocol session with channel " + channel + " and consumer tag " + consumerTag +
+ " and protocol session key " + ps.getKey() + " not registered with queue " + this);
+ }
+
+ // if we are eligible for auto deletion, unregister from the queue registry
+ if (_autoDelete && _subscribers.isEmpty())
+ {
+ autodelete();
+ // we need to manually fire the event to the removed subscription (which was the last one left for this
+ // queue. This is because the delete method uses the subscription set which has just been cleared
+ removedSubscription.queueDeleted(this);
+ }
+ }
+
+ public int delete(boolean checkUnused, boolean checkEmpty) throws AMQException
+ {
+ if (checkUnused && !_subscribers.isEmpty())
+ {
+ _logger.info("Will not delete " + this + " as it is in use.");
+ return 0;
+ }
+ else if (checkEmpty && _deliveryMgr.getQueueMessageCount() > 0)
+ {
+ _logger.info("Will not delete " + this + " as it is not empty.");
+ return 0;
+ }
+ else
+ {
+ delete();
+ return _deliveryMgr.getQueueMessageCount();
+ }
+ }
+
+ public void delete() throws AMQException
+ {
+ _subscribers.queueDeleted(this);
+ _bindings.deregister();
+ _queueRegistry.unregisterQueue(_name);
+ _managedObject.unregister();
+ }
+
+ protected void autodelete() throws AMQException
+ {
+ debug("autodeleting {0}", this);
+ delete();
+ }
+
+ public void deliver(AMQMessage msg) throws AMQException
+ {
+ TxnBuffer buffer = msg.getTxnBuffer();
+ if (buffer == null)
+ {
+ //non-transactional
+ record(msg);
+ process(msg);
+ }
+ else
+ {
+ buffer.enlist(new Deliver(msg));
+ }
+ }
+
+ private void record(AMQMessage msg) throws AMQException
+ {
+ msg.enqueue(this);
+ msg.incrementReference();
+ }
+
+ private void process(AMQMessage msg) throws FailedDequeueException
+ {
+ _deliveryMgr.deliver(getName(), msg);
+ try
+ {
+ msg.checkDeliveredToConsumer();
+ updateReceivedMessageCount(msg);
+ }
+ catch (NoConsumersException e)
+ {
+ // as this message will be returned, it should be removed
+ // from the queue:
+ dequeue(msg);
+ }
+ }
+
+ void dequeue(AMQMessage msg) throws FailedDequeueException
+ {
+ try
+ {
+ msg.dequeue(this);
+ msg.decrementReference();
+ }
+ catch (MessageCleanupException e)
+ {
+ //Message was dequeued, but could notthen be deleted
+ //though it is no longer referenced. This should be very
+ //rare and can be detected and cleaned up on recovery or
+ //done through some form of manual intervention.
+ _logger.error(e, e);
+ }
+ catch (AMQException e)
+ {
+ throw new FailedDequeueException(_name, e);
+ }
+ }
+
+ public void deliverAsync()
+ {
+ _deliveryMgr.processAsync(_asyncDelivery);
+ }
+
+ protected SubscriptionManager getSubscribers()
+ {
+ return _subscribers;
+ }
+
+ protected void updateReceivedMessageCount(AMQMessage msg)
+ {
+ _totalMessagesReceived++;
+ _managedObject.checkForNotification(msg);
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final AMQQueue amqQueue = (AMQQueue) o;
+
+ return (_name.equals(amqQueue._name));
+ }
+
+ public int hashCode()
+ {
+ return _name.hashCode();
+ }
+
+ public String toString()
+ {
+ return "Queue(" + _name + ")@" + System.identityHashCode(this);
+ }
+
+ private void debug(String msg, Object... args)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(MessageFormat.format(msg, args));
+ }
+ }
+
+ private class Deliver implements TxnOp
+ {
+ private final AMQMessage _msg;
+
+ Deliver(AMQMessage msg)
+ {
+ _msg = msg;
+ }
+
+ public void prepare() throws AMQException
+ {
+ //do the persistent part of the record()
+ _msg.enqueue(AMQQueue.this);
+ }
+
+ public void undoPrepare()
+ {
+ }
+
+ public void commit()
+ {
+ //do the memeory part of the record()
+ _msg.incrementReference();
+ //then process the message
+ try
+ {
+ process(_msg);
+ }
+ catch (FailedDequeueException e)
+ {
+ //TODO: is there anything else we can do here? I think not...
+ _logger.error("Error during commit of a queue delivery: " + e, e);
+ }
+ }
+
+ public void rollback()
+ {
+ }
+ }
+
+}
Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,47 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class DefaultQueueRegistry implements QueueRegistry
+{
+ private ConcurrentMap<String, AMQQueue> _queueMap = new ConcurrentHashMap<String, AMQQueue>();
+
+ public DefaultQueueRegistry()
+ {
+ }
+
+ public void registerQueue(AMQQueue queue) throws AMQException
+ {
+ _queueMap.put(queue.getName(), queue);
+ }
+
+ public void unregisterQueue(String name) throws AMQException
+ {
+ _queueMap.remove(name);
+ }
+
+ public AMQQueue getQueue(String name)
+ {
+ return _queueMap.get(name);
+ }
+}
Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,328 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.activemq.amqp.command.ContentBody;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.util.ConcurrentLinkedQueueNoSize;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Manages delivery of messages on behalf of a queue
+ */
+public class DeliveryManager
+{
+ private static final Logger _log = Logger.getLogger(DeliveryManager.class);
+
+ public boolean compressBufferOnQueue;
+ /**
+ * Holds any queued messages
+ */
+ private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueNoSize<AMQMessage>();
+ //private int _messageCount;
+ /**
+ * Ensures that only one asynchronous task is running for this manager at
+ * any time.
+ */
+ private final AtomicBoolean _processing = new AtomicBoolean();
+ /**
+ * The subscriptions on the queue to whom messages are delivered
+ */
+ private final SubscriptionManager _subscriptions;
+
+ /**
+ * A reference to the queue we are delivering messages for. We need this to be able
+ * to pass the code that handles acknowledgements a handle on the queue.
+ */
+ private final AMQQueue _queue;
+
+
+ private volatile int _queueSize = 0;
+
+ DeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
+ {
+
+ if (compressBufferOnQueue)
+ {
+ _log.info("Compressing Buffers on queue.");
+ }
+
+ _subscriptions = subscriptions;
+ _queue = queue;
+ }
+
+ /**
+ * @return boolean if we are queueing
+ */
+ private boolean queueing()
+ {
+ return getMessageCount() != 0;
+ }
+
+
+ /**
+ * @param msg to enqueue
+ * @return true if we are queue this message
+ */
+ private boolean enqueue(AMQMessage msg)
+ {
+ if (msg.isImmediate())
+ {
+ return false;
+ }
+ else
+ {
+ if (queueing())
+ {
+ return addMessageToQueue(msg);
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+
+ private void startQueueing(AMQMessage msg)
+ {
+ if (!msg.isImmediate())
+ {
+ addMessageToQueue(msg);
+ }
+ }
+
+ private boolean addMessageToQueue(AMQMessage msg)
+ {
+ // Shrink the ContentBodies to their actual size to save memory.
+ // synchronize to ensure this msg is the next one to get added.
+ if (compressBufferOnQueue)
+ {
+ synchronized(_messages)
+ {
+ Iterator it = msg.getContentBodies().iterator();
+ while (it.hasNext())
+ {
+ ContentBody cb = (ContentBody) it.next();
+ cb.reduceBufferToFit();
+ }
+
+ _messages.offer(msg);
+ _queueSize++;
+ }
+ }
+ else
+ {
+ _messages.offer(msg);
+ _queueSize++;
+ }
+ return true;
+ }
+
+
+ /**
+ * Determines whether there are queued messages. Sets _queueing to false if
+ * there are no queued messages. This needs to be atomic.
+ *
+ * @return true if there are queued messages
+ */
+ private boolean hasQueuedMessages()
+ {
+ return getMessageCount() != 0;
+ }
+
+ public int getQueueMessageCount()
+ {
+ return getMessageCount();
+ }
+
+ /**
+ * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine size.
+ *
+ * @return int the number of messages in the delivery queue.
+ */
+
+ private int getMessageCount()
+ {
+ return _messages.size();
+ }
+
+
+ protected synchronized List<AMQMessage> getMessages()
+ {
+ return new ArrayList<AMQMessage>(_messages);
+ }
+
+ protected synchronized void removeAMessageFromTop() throws AMQException
+ {
+ AMQMessage msg = poll();
+ if (msg != null)
+ {
+ msg.dequeue(_queue);
+ }
+ }
+
+ protected synchronized void clearAllMessages() throws AMQException
+ {
+ AMQMessage msg = poll();
+ while (msg != null)
+ {
+ msg.dequeue(_queue);
+ msg = poll();
+ }
+ }
+
+ /**
+ * Only one thread should ever execute this method concurrently, but
+ * it can do so while other threads invoke deliver().
+ */
+ private void processQueue()
+ {
+ try
+ {
+ boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
+ while (hasQueuedMessages() && hasSubscribers)
+ {
+ // _log.debug("Have messages(" + _messages.size() + ") and subscribers");
+ Subscription next = _subscriptions.nextSubscriber(peek());
+
+ //We don't synchronize access to subscribers so need to re-check
+ if (next != null)
+ {
+ next.send(poll(), _queue);
+ }
+ else
+ {
+ hasSubscribers = false;
+ }
+ }
+ }
+ catch (FailedDequeueException e)
+ {
+ _log.error("Unable to deliver message as dequeue failed: " + e, e);
+ }
+ finally
+ {
+ _log.debug("End of processQueue: (" + _queueSize + ")" + " subscribers:" + _subscriptions.hasActiveSubscribers());
+ _processing.set(false);
+ }
+ }
+
+ private AMQMessage peek()
+ {
+ return _messages.peek();
+ }
+
+ private AMQMessage poll()
+ {
+ _queueSize--;
+ return _messages.poll();
+ }
+
+ Runner asyncDelivery = new Runner();
+
+ /**
+ * Requests that the delivery manager start processing the queue asynchronously
+ * if there is work that can be done (i.e. there are messages queued up and
+ * subscribers that can receive them.
+ * <p/>
+ * This should be called when subscribers are added, but only after the consume-ok
+ * message has been returned as message delivery may start immediately. It should also
+ * be called after unsuspending a client.
+ * <p/>
+ *
+ * @param executor the executor on which the delivery should take place
+ */
+ void processAsync(Executor executor)
+ {
+ _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + _queueSize + ")" +
+ " Active:" + _subscriptions.hasActiveSubscribers() +
+ " Processing:" + _processing.get());
+
+ if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
+ {
+ //are we already running? if so, don't re-run
+ if (_processing.compareAndSet(false, true))
+ {
+ executor.execute(asyncDelivery);
+ }
+ }
+ }
+
+ /**
+ * Handles message delivery. The delivery manager is always in one of two modes;
+ * it is either queueing messages for asynchronous delivery or delivering
+ * directly.
+ *
+ * @param name the name of the entity on whose behalf we are delivering the message
+ * @param msg the message to deliver
+ * @throws FailedDequeueException if the message could not be dequeued
+ */
+ void deliver(String name, AMQMessage msg) throws FailedDequeueException
+ {
+ // first check whether we are queueing, and enqueue if we are
+ if (!enqueue(msg))
+ {
+ // not queueing so deliver message to 'next' subscriber
+ Subscription s = _subscriptions.nextSubscriber(msg);
+ if (s == null)
+ {
+ if (!msg.isImmediate())
+ {
+ if (_subscriptions instanceof SubscriptionSet)
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Start Queueing messages Active Subs:" + _subscriptions.hasActiveSubscribers()
+ + " Size :" + ((SubscriptionSet) _subscriptions).size()
+ + " Empty :" + ((SubscriptionSet) _subscriptions).isEmpty());
+ }
+ }
+ else
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Start Queueing messages Active Subs:" + _subscriptions.hasActiveSubscribers());
+ }
+ }
+ // no subscribers yet so enter 'queueing' mode and queue this message
+ startQueueing(msg);
+ }
+ }
+ else
+ {
+ s.send(msg, _queue);
+ msg.setDeliveredToConsumer();
+ }
+ }
+ }
+
+ private class Runner implements Runnable
+ {
+ public void run()
+ {
+ processQueue();
+ }
+ }
+}
Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,109 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.AMQException;
+
+import java.util.List;
+import java.util.HashSet;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * When a queue is deleted, it should be deregistered from any
+ * exchange it has been bound to. This class assists in this task,
+ * by keeping track of all bindings for a given queue.
+ */
+class ExchangeBindings
+{
+ static class ExchangeBinding
+ {
+ private final Exchange exchange;
+ private final String routingKey;
+
+ ExchangeBinding(String routingKey, Exchange exchange)
+ {
+ this.routingKey = routingKey;
+ this.exchange = exchange;
+ }
+
+ void unbind(AMQQueue queue) throws AMQException
+ {
+ exchange.deregisterQueue(routingKey, queue);
+ }
+
+ public Exchange getExchange()
+ {
+ return exchange;
+ }
+
+ public String getRoutingKey()
+ {
+ return routingKey;
+ }
+
+ public int hashCode()
+ {
+ return exchange.hashCode() + routingKey.hashCode();
+ }
+
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof ExchangeBinding)) return false;
+ ExchangeBinding eb = (ExchangeBinding) o;
+ return exchange.equals(eb.exchange) && routingKey.equals(eb.routingKey);
+ }
+ }
+
+ private final List<ExchangeBinding> _bindings = new CopyOnWriteArrayList<ExchangeBinding>();
+ private final AMQQueue _queue;
+
+ ExchangeBindings(AMQQueue queue)
+ {
+ _queue = queue;
+ }
+
+ /**
+ * Adds the specified binding to those being tracked.
+ * @param routingKey the routing key with which the queue whose bindings
+ * are being tracked by the instance has been bound to the exchange
+ * @param exchange the exchange bound to
+ */
+ void addBinding(String routingKey, Exchange exchange)
+ {
+ _bindings.add(new ExchangeBinding(routingKey, exchange));
+ }
+
+ /**
+ * Deregisters this queue from any exchange it has been bound to
+ */
+ void deregister() throws AMQException
+ {
+ //remove duplicates at this point
+ HashSet<ExchangeBinding> copy = new HashSet<ExchangeBinding>(_bindings);
+ for (ExchangeBinding b : copy)
+ {
+ b.unbind(_queue);
+ }
+ }
+
+ List<ExchangeBinding> getExchangeBindings()
+ {
+ return _bindings;
+ }
+}
Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,36 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+/**
+ * Signals that the dequeue of a message from a queue failed
+ */
+public class FailedDequeueException extends AMQException
+{
+ public FailedDequeueException(String queue)
+ {
+ super("Failed to dequeue message from " + queue);
+ }
+
+ public FailedDequeueException(String queue, AMQException e)
+ {
+ super("Failed to dequeue message from " + queue, e);
+ }
+}
Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,212 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.management.MBeanAttribute;
+import org.apache.qpid.server.management.MBeanOperation;
+import org.apache.qpid.server.management.MBeanOperationParameter;
+
+import javax.management.JMException;
+import javax.management.MBeanOperationInfo;
+import javax.management.openmbean.TabularData;
+import java.io.IOException;
+
+/**
+ * The management interface exposed to allow management of a queue.
+ * @author Robert J. Greig
+ * @author Bhupendra Bhardwaj
+ * @version 0.1
+ */
+public interface ManagedQueue
+{
+ static final String TYPE = "Queue";
+
+ /**
+ * Returns the Name of the ManagedQueue.
+ * @return the name of the managedQueue.
+ * @throws IOException
+ */
+ @MBeanAttribute(name="Name", description = "Name of the " + TYPE)
+ String getName() throws IOException;
+
+ /**
+ * Tells whether this ManagedQueue is durable or not.
+ * @return true if this ManagedQueue is a durable queue.
+ * @throws IOException
+ */
+ @MBeanAttribute(name="Durable", description = "true if the AMQQueue is durable")
+ boolean isDurable() throws IOException;
+
+ /**
+ * Tells the Owner of the ManagedQueue.
+ * @return the owner's name.
+ * @throws IOException
+ */
+ @MBeanAttribute(name="Owner", description = "Owner")
+ String getOwner() throws IOException;
+
+ /**
+ * Tells if the ManagedQueue is set to AutoDelete.
+ * @return true if the ManagedQueue is set to AutoDelete.
+ * @throws IOException
+ */
+ @MBeanAttribute(name="AutoDelete", description = "true if the AMQQueue is AutoDelete")
+ boolean isAutoDelete() throws IOException;
+
+ /**
+ * Total number of messages on the queue, which are yet to be delivered to the consumer(s).
+ * @return number of undelivered message in the Queue.
+ * @throws IOException
+ */
+ @MBeanAttribute(name="MessageCount",
+ description = "Total number of undelivered messages on the queue")
+ Integer getMessageCount() throws IOException;
+
+ /**
+ * Returns the maximum size of a message (in kbytes) allowed to be accepted by the
+ * ManagedQueue. This is useful in setting notifications or taking
+ * appropriate action, if the size of the message received is more than
+ * the allowed size.
+ * @return the maximum size of a message allowed to be aceepted by the
+ * ManagedQueue.
+ * @throws IOException
+ */
+ Long getMaximumMessageSize() throws IOException;
+
+ /**
+ * Sets the maximum size of the message (in kbytes) that is allowed to be
+ * accepted by the Queue.
+ * @param size maximum size of message.
+ * @throws IOException
+ */
+ @MBeanAttribute(name="MaximumMessageSize",
+ description="Maximum size(KB) of a message allowed for this Queue")
+ void setMaximumMessageSize(Long size) throws IOException;
+
+ /**
+ * Returns the total number of subscribers to the queue.
+ * @return the number of subscribers.
+ * @throws IOException
+ */
+ @MBeanAttribute(name="ConsumerCount", description="The total number of subscribers to the queue")
+ Integer getConsumerCount() throws IOException;
+
+ /**
+ * Returns the total number of active subscribers to the queue.
+ * @return the number of active subscribers
+ * @throws IOException
+ */
+ @MBeanAttribute(name="ActiveConsumerCount", description="The total number of active subscribers to the queue")
+ Integer getActiveConsumerCount() throws IOException;
+
+ /**
+ * Tells the total number of messages receieved by the queue since startup.
+ * @return total number of messages received.
+ * @throws IOException
+ */
+ @MBeanAttribute(name="ReceivedMessageCount",
+ description="The total number of messages receieved by the queue since startup")
+ Long getReceivedMessageCount() throws IOException;
+
+ /**
+ * Tells the maximum number of messages that can be stored in the queue.
+ * This is useful in setting the notifications or taking required
+ * action is the number of message increase this limit.
+ * @return maximum muber of message allowed to be stored in the queue.
+ * @throws IOException
+ */
+ Integer getMaximumMessageCount() throws IOException;
+
+ /**
+ * Sets the maximum number of messages allowed to be stored in the queue.
+ * @param value the maximum number of messages allowed to be stored in the queue.
+ * @throws IOException
+ */
+ @MBeanAttribute(name="MaximumMessageCount",
+ description="The maximum number of messages allowed to be stored in the queue")
+ void setMaximumMessageCount(Integer value) throws IOException;
+
+ /**
+ * Size of messages in the queue
+ * @return
+ * @throws IOException
+ */
+ @MBeanAttribute(name="QueueSize", description="Size of messages(KB) in the queue")
+ Long getQueueSize() throws IOException;
+
+ /**
+ * Tells the maximum size of all the messages combined together,
+ * that can be stored in the queue. This is useful for setting notifications
+ * or taking required action if the size of messages stored in the queue
+ * increases over this limit.
+ * @return maximum size of the all the messages allowed for the queue.
+ * @throws IOException
+ */
+ Long getQueueDepth() throws IOException;
+
+ /**
+ * Sets the maximum size of all the messages together, that can be stored
+ * in the queue.
+ * @param value
+ * @throws IOException
+ */
+ @MBeanAttribute(name="QueueDepth",
+ description="The size(KB) of all the messages together, that can be stored in the queue")
+ void setQueueDepth(Long value) throws IOException;
+
+
+
+ //********** Operations *****************//
+
+
+ /**
+ * Returns a subset of all the messages stored in the queue. The messages
+ * are returned based on the given index numbers.
+ * @param fromIndex
+ * @param toIndex
+ * @return
+ * @throws IOException
+ * @throws JMException
+ */
+ @MBeanOperation(name="viewMessages",
+ description="shows messages in this queue with given indexes. eg. from index 1 - 100")
+ TabularData viewMessages(@MBeanOperationParameter(name="from index", description="from index")int fromIndex,
+ @MBeanOperationParameter(name="to index", description="to index")int toIndex)
+ throws IOException, JMException;
+
+ /**
+ * Deletes the first message from top.
+ * @throws IOException
+ * @throws JMException
+ */
+ @MBeanOperation(name="deleteMessageFromTop",
+ description="Deletes the first message from top",
+ impact= MBeanOperationInfo.ACTION)
+ void deleteMessageFromTop() throws IOException, JMException;
+
+ /**
+ * Clears the queue by deleting all the undelivered messages from the queue.
+ * @throws IOException
+ * @throws JMException
+ */
+ @MBeanOperation(name="clearQueue",
+ description="Clears the queue by deleting all the undelivered messages from the queue",
+ impact= MBeanOperationInfo.ACTION)
+ void clearQueue() throws IOException, JMException;
+
+}
Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+/**
+ * Signals that the removal of a message once its refcount reached
+ * zero failed.
+ */
+public class MessageCleanupException extends AMQException
+{
+ public MessageCleanupException(long messageId, AMQException e)
+ {
+ super("Failed to cleanup message with id " + messageId, e);
+ }
+}
Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,52 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.activemq.amqp.command.*;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.protocol.AMQConstant;
+
+import java.util.List;
+
+/**
+ * Signals that no consumers exist for a message at a given point in time.
+ * Used if a message has immediate=true and there are no consumers registered
+ * with the queue.
+ */
+public class NoConsumersException extends RequiredDeliveryException
+{
+ public NoConsumersException(String queue,
+ BasicPublishBody publishBody,
+ ContentHeaderBody contentHeaderBody,
+ List<ContentBody> contentBodies)
+ {
+ super("Immediate delivery to " + queue + " is not possible.", publishBody, contentHeaderBody, contentBodies);
+ }
+
+ public NoConsumersException(BasicPublishBody publishBody,
+ ContentHeaderBody contentHeaderBody,
+ List<ContentBody> contentBodies)
+ {
+ super("Immediate delivery is not possible.", publishBody, contentHeaderBody, contentBodies);
+ }
+
+ public int getReplyCode()
+ {
+ return AMQConstant.NO_CONSUMERS.getCode();
+ }
+}
Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,30 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+
+public interface QueueRegistry
+{
+ void registerQueue(AMQQueue queue) throws AMQException;
+
+ void unregisterQueue(String name) throws AMQException;
+
+ AMQQueue getQueue(String name);
+}
Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/Subscription.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/Subscription.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/Subscription.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,29 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+public interface Subscription
+{
+ void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException;
+
+ boolean isSuspended();
+
+ void queueDeleted(AMQQueue queue);
+}
Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,37 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.AMQException;
+
+/**
+ * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This
+ * factory primarily assists testing although in future more sophisticated subscribers may need a different
+ * subscription implementation.
+ *
+ * @see org.apache.qpid.server.queue.AMQQueue
+ */
+public interface SubscriptionFactory
+{
+ Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
+ throws AMQException;
+
+ Subscription createSubscription(int channel, AMQProtocolSession protocolSession,String consumerTag)
+ throws AMQException;
+}
Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,190 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.activemq.amqp.command.*;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+
+/**
+ * Encapsulation of a supscription to a queue.
+ * <p/>
+ * Ties together the protocol session of a subscriber, the consumer tag that
+ * was given out by the broker and the channel id.
+ * <p/>
+ */
+public class SubscriptionImpl implements Subscription
+{
+ private static final Logger _logger = Logger.getLogger(SubscriptionImpl.class);
+
+ public final AMQChannel channel;
+
+ public final AMQProtocolSession protocolSession;
+
+ public final String consumerTag;
+
+ private final Object sessionKey;
+
+ /**
+ * True if messages need to be acknowledged
+ */
+ private final boolean _acks;
+
+ public static class Factory implements SubscriptionFactory
+ {
+ public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
+ throws AMQException
+ {
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, acks);
+ }
+
+ public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
+ throws AMQException
+ {
+ return new SubscriptionImpl(channel, protocolSession, consumerTag);
+ }
+ }
+
+ public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
+ String consumerTag, boolean acks)
+ throws AMQException
+ {
+ AMQChannel channel = protocolSession.getChannel(channelId);
+ if (channel == null)
+ {
+ throw new NullPointerException("channel not found in protocol session");
+ }
+
+ this.channel = channel;
+ this.protocolSession = protocolSession;
+ this.consumerTag = consumerTag;
+ sessionKey = protocolSession.getKey();
+ _acks = acks;
+ }
+
+ public SubscriptionImpl(int channel, AMQProtocolSession protocolSession,
+ String consumerTag)
+ throws AMQException
+ {
+ this(channel, protocolSession, consumerTag, false);
+ }
+
+ public boolean equals(Object o)
+ {
+ return (o instanceof SubscriptionImpl) && equals((SubscriptionImpl) o);
+ }
+
+ /**
+ * Equality holds if the session matches and the channel and consumer tag are the same.
+ */
+ private boolean equals(SubscriptionImpl psc)
+ {
+ return sessionKey.equals(psc.sessionKey)
+ && psc.channel == channel
+ && psc.consumerTag.equals(consumerTag);
+ }
+
+ public int hashCode()
+ {
+ return sessionKey.hashCode();
+ }
+
+ public String toString()
+ {
+ return "[channel=" + channel + ", consumerTag=" + consumerTag + ", session=" + protocolSession.getKey() + "]";
+ }
+
+ /**
+ * This method can be called by each of the publisher threads.
+ * As a result all changes to the channel object must be thread safe.
+ *
+ * @param msg
+ * @param queue
+ * @throws AMQException
+ */
+ public void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
+ {
+ if (msg != null)
+ {
+ // if we do not need to wait for client acknowledgements
+ // we can decrement the reference count immediately.
+
+ // By doing this _before_ the send we ensure that it
+ // doesn't get sent if it can't be dequeued, preventing
+ // duplicate delivery on recovery.
+
+ // The send may of course still fail, in which case, as
+ // the message is unacked, it will be lost.
+ if (!_acks)
+ {
+ queue.dequeue(msg);
+ }
+ synchronized(channel)
+ {
+ long deliveryTag = channel.getNextDeliveryTag();
+
+ if (_acks)
+ {
+ channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ }
+
+// ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
+// AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
+
+ BasicDeliverBody deliver = createDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
+ CompositeAMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
+
+ protocolSession.writeFrame(frame);
+ }
+ }
+ else
+ {
+ _logger.error("Attempt to send Null message", new NullPointerException());
+ }
+ }
+
+ public boolean isSuspended()
+ {
+ return channel.isSuspended();
+ }
+
+ /**
+ * Callback indicating that a queue has been deleted.
+ *
+ * @param queue
+ */
+ public void queueDeleted(AMQQueue queue)
+ {
+ channel.queueDeleted(queue);
+ }
+
+ private BasicDeliverBody createDeliverFrame(long deliveryTag, String routingKey, String exchange)
+ {
+ BasicDeliverBody basicDeliverBody = new BasicDeliverBody();
+ basicDeliverBody.setChannel(channel.getChannelId());
+ basicDeliverBody.setConsumerTag(consumerTag);
+ basicDeliverBody.setDeliveryTag(deliveryTag);
+ basicDeliverBody.setRedelivered(false);
+ basicDeliverBody.setExchange(exchange);
+ basicDeliverBody.setRoutingKey(routingKey);
+ return basicDeliverBody;
+ }
+
+}
Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,28 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+/**
+ * Abstraction of actor that will determine the subscriber to whom
+ * a message will be sent.
+ */
+public interface SubscriptionManager
+{
+ public boolean hasActiveSubscribers();
+ public Subscription nextSubscriber(AMQMessage msg);
+}