You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/10/27 17:27:30 UTC

svn commit: r468414 [6/7] - in /incubator/activemq/sandbox/qpid/src/main/java/org/apache: activemq/amqp/ activemq/amqp/broker/ activemq/amqp/command/ activemq/amqp/transport/ activemq/amqp/wireformat/ activemq/amqp/wireformat/v8_0/ activemq/qpid/broker...

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,378 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.activemq.amqp.command.*;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.TxnBuffer;
+import org.apache.qpid.AMQException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Combines the information that make up a deliverable message into a more manageable form.
+ */
+public class AMQMessage
+{
+    private final Set<Object> _tokens = new HashSet<Object>();
+
+    private AMQProtocolSession _publisher;
+
+    private final BasicPublishBody  _publishBody;
+
+    private ContentHeaderBody _contentHeaderBody;
+
+    private List<ContentBody> _contentBodies;
+
+    private boolean _redelivered;
+
+    private final long _messageId;
+
+    private final AtomicInteger _referenceCount = new AtomicInteger(1);
+
+    /**
+     * Keeps a track of how many bytes we have received in body frames
+     */
+    private long _bodyLengthReceived = 0;
+
+    /**
+     * The message store in which this message is contained.
+     */
+    private transient final MessageStore _store;
+
+    /**
+     * For non transactional publishes, a message can be stored as
+     * soon as it is complete. For transactional messages it doesnt
+     * need to be stored until the transaction is committed.
+     */
+    private boolean _storeWhenComplete;
+
+    /**
+     * TxnBuffer for transactionally published messages
+     */
+    private TxnBuffer _txnBuffer;
+
+    /**
+     * Flag to indicate whether message has been delivered to a
+     * consumer. Used in implementing return functionality for
+     * messages published with the 'immediate' flag.
+     */
+    private boolean _deliveredToConsumer;
+
+
+    public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody)
+    {
+        this(messageStore, publishBody, true);
+    }
+
+    public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody, boolean storeWhenComplete)
+    {
+        _messageId = messageStore.getNewMessageId();
+        _publishBody = publishBody;
+        _store = messageStore;
+        _contentBodies = new LinkedList<ContentBody>();
+        _storeWhenComplete = storeWhenComplete;
+    }
+
+    public AMQMessage(MessageStore store, long messageId, BasicPublishBody publishBody,
+                      ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies)
+            throws AMQException
+    
+    {
+        _publishBody = publishBody;
+        _contentHeaderBody = contentHeaderBody;
+        _contentBodies = contentBodies;
+        _messageId = messageId;
+        _store = store;
+        storeMessage();
+    }
+
+    public AMQMessage(MessageStore store, BasicPublishBody publishBody,
+                      ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies)
+            throws AMQException
+    {
+        this(store, store.getNewMessageId(), publishBody, contentHeaderBody, contentBodies);        
+    }
+
+    protected AMQMessage(AMQMessage msg) throws AMQException
+    {
+        this(msg._store, msg._messageId, msg._publishBody, msg._contentHeaderBody, msg._contentBodies);
+    }
+
+    public void storeMessage() throws AMQException
+    {
+        if (isPersistent())
+        {
+            _store.put(this);
+        }
+    }
+
+    public CompositeAMQDataBlock getDataBlock(BasicDeliverBody deliver, int channel)
+    {
+    	AMQDataBlock[] allFrames = new AMQDataBlock[2 + _contentBodies.size()];
+    	
+    	allFrames[0] = deliver;
+        allFrames[1] = _contentHeaderBody;
+        allFrames[1].channel = channel;
+        for (int i = 2; i < allFrames.length; i++)
+        {
+            allFrames[i] = _contentBodies.get(i - 2);
+            allFrames[i].setChannel(channel);
+        }
+        return new CompositeAMQDataBlock(allFrames);
+    }
+
+
+    public CompositeAMQDataBlock getDataBlock(int channel, String consumerTag, long deliveryTag)
+    {
+    	AMQDataBlock[] allFrames = new AMQDataBlock[2 + _contentBodies.size()];
+        
+        BasicDeliverBody basicDeliverBody = new BasicDeliverBody();
+        basicDeliverBody.setChannel(channel);
+        basicDeliverBody.setConsumerTag(consumerTag);
+        basicDeliverBody.setDeliveryTag(deliveryTag);
+        basicDeliverBody.setRedelivered(_redelivered);
+        basicDeliverBody.setExchange(getExchangeName());
+        basicDeliverBody.setRoutingKey(getRoutingKey());
+        allFrames[0] = basicDeliverBody;
+        
+        _contentHeaderBody.setChannel(channel);
+        allFrames[1] = _contentHeaderBody;
+        
+        for (int i = 2; i < allFrames.length; i++)
+        {
+            allFrames[i] = _contentBodies.get(i - 2);
+            allFrames[i].setChannel(channel);
+        }
+        return new CompositeAMQDataBlock(allFrames);
+    }
+
+    public List<AMQDataBlock> getPayload()
+    {
+        List<AMQDataBlock> payload = new ArrayList<AMQDataBlock>(2 + _contentBodies.size());
+        payload.add(_publishBody);
+        payload.add(_contentHeaderBody);
+        payload.addAll(_contentBodies);
+        return payload;
+    }
+
+    public BasicPublishBody getPublishBody()
+    {
+        return _publishBody;
+    }
+
+    public ContentHeaderBody getContentHeaderBody()
+    {
+        return _contentHeaderBody;
+    }
+
+    public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException
+    {
+        _contentHeaderBody = contentHeaderBody;
+        if (_storeWhenComplete && isAllContentReceived())
+        {
+            storeMessage();
+        }
+    }
+
+    public List<ContentBody> getContentBodies()
+    {
+        return _contentBodies;
+    }
+
+    public void setContentBodies(List<ContentBody> contentBodies)
+    {
+        _contentBodies = contentBodies;
+    }
+
+    public void addContentBodyFrame(ContentBody contentBody) throws AMQException
+    {
+        _contentBodies.add(contentBody);
+        _bodyLengthReceived += contentBody.getSize();
+        if (_storeWhenComplete && isAllContentReceived())
+        {
+            storeMessage();
+        }
+    }
+
+    public boolean isAllContentReceived()
+    {
+        return _bodyLengthReceived == _contentHeaderBody.bodySize;
+    }
+
+    public boolean isRedelivered()
+    {
+        return _redelivered;
+    }
+
+    String getExchangeName()
+    {
+        return _publishBody.exchange;
+    }
+
+    String getRoutingKey()
+    {
+        return _publishBody.routingKey;
+    }
+
+    boolean isImmediate()
+    {
+        return _publishBody.immediate;
+    }
+
+    NoConsumersException getNoConsumersException(String queue)
+    {
+        return new NoConsumersException(queue, _publishBody, _contentHeaderBody, _contentBodies);
+    }
+
+    void setRedelivered(boolean redelivered)
+    {
+        _redelivered = redelivered;
+    }
+
+    public long getMessageId()
+    {
+        return _messageId;
+    }
+
+    /**
+     * Threadsafe. Increment the reference count on the message.
+     */
+    public void incrementReference()
+    {
+        _referenceCount.incrementAndGet();
+    }
+
+    /**
+     * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
+     * message store.
+     */
+    public void decrementReference() throws MessageCleanupException
+    {
+        // note that the operation of decrementing the reference count and then removing the message does not
+        // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
+        // the message has been passed to all queues. i.e. we are
+        // not relying on the all the increments having taken place before the delivery manager decrements.
+        if (_referenceCount.decrementAndGet() == 0)
+        {
+            try
+            {
+                _store.removeMessage(_messageId);
+            }
+            catch(AMQException e)
+            {
+                //to maintain consistency, we revert the count
+                incrementReference();
+                throw new MessageCleanupException(_messageId, e);
+            }
+        }
+    }
+
+    public void setPublisher(AMQProtocolSession publisher)
+    {
+        _publisher = publisher;
+    }
+
+    public AMQProtocolSession getPublisher()
+    {
+        return _publisher;
+    }
+
+    public boolean checkToken(Object token)
+    {
+        if(_tokens.contains(token))
+        {
+            return true;
+        }
+        else
+        {
+            _tokens.add(token);
+            return false;
+        }
+    }
+
+    public void enqueue(AMQQueue queue) throws AMQException
+    {
+        //if the message is not persistent or the queue is not durable
+        //we will not need to recover the association and so do not
+        //need to record it
+        if(isPersistent() && queue.isDurable())
+        {
+            _store.enqueueMessage(queue.getName(), _messageId);
+        }
+    }
+
+    public void dequeue(AMQQueue queue) throws AMQException
+    {
+        //only record associations where both queue and message will survive
+        //a restart, so only need to remove association if this is the case
+        if(isPersistent() && queue.isDurable())
+        {
+            _store.dequeueMessage(queue.getName(), _messageId);
+        }
+    }
+
+    public boolean isPersistent() throws AMQException
+    {
+        if(_contentHeaderBody == null)
+        {
+            throw new AMQException("Cannot determine delivery mode of message. Content header not found.");
+        }
+
+        //todo remove literal values to a constant file such as AMQConstants in common
+        return _contentHeaderBody.properties instanceof BasicContentHeaderProperties
+                &&((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
+    }
+
+    public void setTxnBuffer(TxnBuffer buffer)
+    {
+        _txnBuffer = buffer;
+    }
+
+    public TxnBuffer getTxnBuffer()
+    {
+        return _txnBuffer;
+    }
+
+    /**
+     * Called to enforce the 'immediate' flag. 
+     * @throws NoConsumersException if the message is marked for
+     * immediate delivery but has not been marked as delivered to a
+     * consumer
+     */
+    public void checkDeliveredToConsumer() throws NoConsumersException{
+        if(isImmediate() && !_deliveredToConsumer)
+        {
+            throw new NoConsumersException(_publishBody, _contentHeaderBody, _contentBodies);
+        }
+    }
+
+    /**
+     * Called when this message is delivered to a consumer. (used to
+     * implement the 'immediate' flag functionality).
+     */
+    public void setDeliveredToConsumer(){
+        _deliveredToConsumer = true;
+    }
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,827 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.activemq.amqp.command.*;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.management.Managable;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.txn.TxnBuffer;
+import org.apache.qpid.server.txn.TxnOp;
+
+import javax.management.JMException;
+import javax.management.MBeanException;
+import javax.management.MBeanNotificationInfo;
+import javax.management.NotCompliantMBeanException;
+import javax.management.Notification;
+import javax.management.monitor.MonitorNotification;
+import javax.management.openmbean.*;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+/**
+ * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like
+ * that. It is described fully in RFC 006.
+ */
+public class AMQQueue implements Managable
+{
+    private static final Logger _logger = Logger.getLogger(AMQQueue.class);
+
+    private final String _name;
+
+    /**
+     * null means shared
+     */
+    private final String _owner;
+
+    private final boolean _durable;
+
+    /**
+     * If true, this queue is deleted when the last subscriber is removed
+     */
+    private final boolean _autoDelete;
+
+    /**
+     * Holds subscribers to the queue.
+     */
+    private final SubscriptionSet _subscribers;
+
+    private final SubscriptionFactory _subscriptionFactory;
+
+    /**
+     * Manages message delivery.
+     */
+    private final DeliveryManager _deliveryMgr;
+
+    /**
+     * The queue registry with which this queue is registered.
+     */
+    private final QueueRegistry _queueRegistry;
+
+    /**
+     * Used to track bindings to exchanges so that on deletion they can easily
+     * be cancelled.
+     */
+    private final ExchangeBindings _bindings = new ExchangeBindings(this);
+
+    /**
+     * Executor on which asynchronous delivery will be carriedout where required
+     */
+    private final Executor _asyncDelivery;
+
+    private final AMQQueueMBean _managedObject;
+
+    /**
+     * max allowed size of a single message(in KBytes).
+     */
+    private long _maxAllowedMessageSize = 10000;    // 10 MB
+
+    /**
+     * max allowed number of messages on a queue.
+     */
+    private Integer _maxAllowedMessageCount = 10000;
+
+    /**
+     * max allowed size in  KBytes for all the messages combined together in a queue.
+     */
+    private long _queueDepth = 10000000;  //   10 GB
+
+    /**
+     * total messages received by the queue since startup.
+     */
+    private long _totalMessagesReceived = 0;
+
+    /**
+     * MBean class for AMQQueue. It implements all the management features exposed
+     * for an AMQQueue.
+     */
+    @MBeanDescription("Management Interface for AMQQueue")
+    private final class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
+    {
+        private String _queueName = null;
+        //private MBeanInfo _mbeanInfo;
+
+        // AMQ message attribute names exposed.
+        private String[] _msgAttributeNames = {"MessageId",
+                                               "Redelivered",
+                                               "Content's size",
+                                               "Contents"};
+        // AMQ Message attribute descriptions.
+        private String[] _msgAttributeDescriptions = {"Message Id",
+                                                      "Redelivered",
+                                                      "Message content's size in bytes",
+                                                      "Message content bodies"};
+        // AMQ message attribute types.
+        private OpenType[] _msgAttributeTypes = new OpenType[4];
+        // Messages will be indexed according to the messageId.
+        private String[] _msgAttributeIndex = {"MessageId"};
+        // Composite type for representing AMQ Message data.
+        private CompositeType _messageDataType = null;
+        // Datatype for representing AMQ messages list.
+        private TabularType _messagelistDataType = null;
+
+        private String[] _contentNames = {"SerialNumber", "ContentBody"};
+        private String[] _contentDesc = {"Serial Number", "Content Body"};
+        private String[] _contentIndex = {"SerialNumber"};
+        private OpenType[] _contentType = new OpenType[2];
+        private CompositeType _contentBodyType = null;
+        private TabularType _contentBodyListType = null;
+
+        @MBeanConstructor("Creates an MBean exposing an AMQQueue.")
+        public AMQQueueMBean() throws NotCompliantMBeanException
+        {
+            super(ManagedQueue.class, ManagedQueue.TYPE);
+            init();
+        }
+
+        private void init()
+        {
+            _queueName = jmxEncode(new StringBuffer(_name), 0).toString();
+            try
+            {
+                _contentType[0] = SimpleType.INTEGER;
+                _contentType[1] = new ArrayType(1, SimpleType.BYTE);
+                _contentBodyType = new CompositeType("Content",
+                                                     "Content",
+                                                     _contentNames,
+                                                     _contentDesc,
+                                                     _contentType);
+                _contentBodyListType = new TabularType("MessageContents",
+                                                       "Message Contents",
+                                                       _contentBodyType,
+                                                       _contentIndex);
+
+                _msgAttributeTypes[0] = SimpleType.LONG;
+                _msgAttributeTypes[1] = SimpleType.BOOLEAN;
+                _msgAttributeTypes[2] = SimpleType.LONG;
+                _msgAttributeTypes[3] = _contentBodyListType;
+
+                _messageDataType = new CompositeType("Message",
+                                                     "AMQ Message",
+                                                     _msgAttributeNames,
+                                                     _msgAttributeDescriptions,
+                                                     _msgAttributeTypes);
+                _messagelistDataType = new TabularType("Messages",
+                                                       "List of messages",
+                                                       _messageDataType,
+                                                       _msgAttributeIndex);
+            }
+            catch (OpenDataException ex)
+            {
+                _logger.error("OpenDataTypes could not be created.", ex);
+                throw new RuntimeException(ex);
+            }
+        }
+
+        public String getObjectInstanceName()
+        {
+            return _queueName;
+        }
+
+        public String getName()
+        {
+            return _name;
+        }
+
+        public boolean isDurable()
+        {
+            return _durable;
+        }
+
+        public String getOwner()
+        {
+            return _owner;
+        }
+
+        public boolean isAutoDelete()
+        {
+            return _autoDelete;
+        }
+
+        public Integer getMessageCount()
+        {
+            return _deliveryMgr.getQueueMessageCount();
+        }
+
+        public Long getMaximumMessageSize()
+        {
+            return _maxAllowedMessageSize;
+        }
+
+        public void setMaximumMessageSize(Long value)
+        {
+            _maxAllowedMessageSize = value;
+        }
+
+        public Integer getConsumerCount()
+        {
+            return _subscribers.size();
+        }
+
+        public Integer getActiveConsumerCount()
+        {
+            return _subscribers.getWeight();
+        }
+
+        public Long getReceivedMessageCount()
+        {
+            return _totalMessagesReceived;
+        }
+
+        public Integer getMaximumMessageCount()
+        {
+            return _maxAllowedMessageCount;
+        }
+
+        public void setMaximumMessageCount(Integer value)
+        {
+            _maxAllowedMessageCount = value;
+        }
+
+        public Long getQueueDepth()
+        {
+            return _queueDepth;
+        }
+
+        // Sets the queue depth, the max queue size
+        public void setQueueDepth(Long value)
+        {
+            _queueDepth = value;
+        }
+
+        // Returns the size of messages in the queue
+        public Long getQueueSize()
+        {
+            List<AMQMessage> list = _deliveryMgr.getMessages();
+            if (list.size() == 0)
+            {
+                return 0l;
+            }
+
+            long queueSize = 0;
+            for (AMQMessage message : list)
+            {
+                queueSize = queueSize + getMessageSize(message);
+            }
+            return new Long(Math.round(queueSize / 100));
+        }
+        // Operations
+
+        // calculates the size of an AMQMessage
+
+        private long getMessageSize(AMQMessage msg)
+        {
+            if (msg == null)
+            {
+                return 0l;
+            }
+
+            List<ContentBody> cBodies = msg.getContentBodies();
+            long messageSize = 0;
+            for (ContentBody body : cBodies)
+            {
+                if (body != null)
+                {
+                    messageSize = messageSize + body.getSize();
+                }
+            }
+            return messageSize;
+        }
+
+        // Checks if there is any notification to be send to the listeners
+        private void checkForNotification(AMQMessage msg)
+        {
+            // Check for message count
+            Integer msgCount = getMessageCount();
+            if (msgCount >= getMaximumMessageCount())
+            {
+                notifyClients("MessageCount = " + msgCount + ", Queue has reached its size limit and is now full.");
+            }
+
+            // Check for received message size
+            long messageSize = getMessageSize(msg);
+            if (messageSize >= getMaximumMessageSize())
+            {
+                notifyClients("MessageSize = " + messageSize + ", Message size (MessageID=" + msg.getMessageId() +
+                              ")is higher than the threshold value");
+            }
+
+            // Check for queue size in bytes
+            long queueSize = getQueueSize();
+            if (queueSize >= getQueueDepth())
+            {
+                notifyClients("QueueSize = " + queueSize + ", Queue size has reached the threshold value");
+            }
+        }
+
+        // Send the notification to the listeners
+        private void notifyClients(String notificationMsg)
+        {
+            Notification n = new Notification(
+                    MonitorNotification.THRESHOLD_VALUE_EXCEEDED,
+                    this,
+                    ++_notificationSequenceNumber,
+                    System.currentTimeMillis(),
+                    notificationMsg);
+
+            _broadcaster.sendNotification(n);
+        }
+
+        public void deleteMessageFromTop() throws JMException
+        {
+            try
+            {
+                _deliveryMgr.removeAMessageFromTop();
+            }
+            catch (AMQException ex)
+            {
+                throw new MBeanException(ex, ex.toString());
+            }
+        }
+
+        public void clearQueue() throws JMException
+        {
+            try
+            {
+                _deliveryMgr.clearAllMessages();
+            }
+            catch (AMQException ex)
+            {
+                throw new MBeanException(ex, ex.toString());
+            }
+        }
+
+        /**
+         * Returns the messages stored in this queue in tabular form.
+         *
+         * @param beginIndex
+         * @param endIndex
+         * @return AMQ messages in tabular form.
+         * @throws JMException
+         */
+        public TabularData viewMessages(int beginIndex, int endIndex) throws JMException
+        {
+            if ((beginIndex > endIndex) || (beginIndex < 1))
+            {
+                throw new JMException("FromIndex = " + beginIndex + ", ToIndex = " + endIndex +
+                                      "\nFromIndex should be greater than 0 and less than ToIndex");
+            }
+
+            List<AMQMessage> list = _deliveryMgr.getMessages();
+
+            if (beginIndex > list.size())
+            {
+                throw new JMException("FromIndex = " + beginIndex + ". There are only " + list.size() + " messages in the queue");
+            }
+
+            endIndex = endIndex < list.size() ? endIndex : list.size();
+            TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
+
+            for (int i = beginIndex; i <= endIndex; i++)
+            {
+                AMQMessage msg = list.get(i - 1);
+                long msgId = msg.getMessageId();
+
+                List<ContentBody> cBodies = msg.getContentBodies();
+
+                TabularDataSupport _contentList = new TabularDataSupport(_contentBodyListType);
+                int contentSerialNo = 1;
+                long size = 0;
+
+                for (ContentBody body : cBodies)
+                {
+                    if (body.getSize() != 0)
+                    {
+                        Byte[] byteArray = getByteArray(body.payload);
+                        size = size + byteArray.length;
+
+                        Object[] contentValues = {contentSerialNo, byteArray};
+                        CompositeData contentData = new CompositeDataSupport(_contentBodyType,
+                                                                             _contentNames,
+                                                                             contentValues);
+
+                        _contentList.put(contentData);
+                    }
+                }
+
+                Object[] itemValues = {msgId, true, size, _contentList};
+                CompositeData messageData = new CompositeDataSupport(_messageDataType,
+                                                                     _msgAttributeNames,
+                                                                     itemValues);
+                _messageList.put(messageData);
+            }
+
+            return _messageList;
+        }
+
+        /**
+         * A utility to convert byte[] to Byte[]. Required to create composite
+         * type for message contents.
+         *
+         * @param sequence message content as byte[]
+         * @return Byte[]
+         */
+        private Byte[] getByteArray(ByteSequence sequence)
+        {
+            List<Byte> list = new ArrayList<Byte>();
+
+            for (int i = 0; i < sequence.length; i++)
+            {
+                list.add(sequence.data[i+sequence.offset]);
+            }
+
+            return list.toArray(new Byte[0]);
+        }
+
+        /**
+         * Creates all the notifications this MBean can send.
+         *
+         * @return Notifications broadcasted by this MBean.
+         */
+        @Override
+        public MBeanNotificationInfo[] getNotificationInfo()
+        {
+            String[] notificationTypes = new String[]
+                    {MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
+            String name = MonitorNotification.class.getName();
+            String description = "An attribute of this MBean has reached threshold value";
+            MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes,
+                                                                    name,
+                                                                    description);
+
+            return new MBeanNotificationInfo[]{info1};
+        }
+
+    } // End of AMQMBean class
+
+    public AMQQueue(String name, boolean durable, String owner,
+                    boolean autoDelete, QueueRegistry queueRegistry)
+            throws AMQException
+    {
+        this(name, durable, owner, autoDelete, queueRegistry,
+        		Executors.newCachedThreadPool(), new SubscriptionImpl.Factory());
+    }
+
+    public AMQQueue(String name, boolean durable, String owner,
+                    boolean autoDelete, QueueRegistry queueRegistry, SubscriptionFactory subscriptionFactory)
+            throws AMQException
+    {
+        this(name, durable, owner, autoDelete, queueRegistry,
+        		Executors.newCachedThreadPool(), subscriptionFactory);
+    }
+
+    public AMQQueue(String name, boolean durable, String owner,
+                    boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery,
+                    SubscriptionFactory subscriptionFactory)
+            throws AMQException
+    {
+
+        this(name, durable, owner, autoDelete, queueRegistry, asyncDelivery, new SubscriptionSet(), subscriptionFactory);
+    }
+
+    public AMQQueue(String name, boolean durable, String owner,
+                    boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery)
+            throws AMQException
+    {
+
+        this(name, durable, owner, autoDelete, queueRegistry, asyncDelivery, new SubscriptionSet(),
+             new SubscriptionImpl.Factory());
+    }
+
+    protected AMQQueue(String name, boolean durable, String owner,
+                       boolean autoDelete, QueueRegistry queueRegistry,
+                       SubscriptionSet subscribers, SubscriptionFactory subscriptionFactory)
+            throws AMQException
+    {
+        this(name, durable, owner, autoDelete, queueRegistry,
+        		Executors.newCachedThreadPool(), subscribers, subscriptionFactory);
+    }
+
+    protected AMQQueue(String name, boolean durable, String owner,
+                       boolean autoDelete, QueueRegistry queueRegistry,
+                       SubscriptionSet subscribers)
+            throws AMQException
+    {
+        this(name, durable, owner, autoDelete, queueRegistry,
+        		Executors.newCachedThreadPool(), subscribers, new SubscriptionImpl.Factory());
+    }
+
+    protected AMQQueue(String name, boolean durable, String owner,
+                       boolean autoDelete, QueueRegistry queueRegistry,
+                       Executor asyncDelivery, SubscriptionSet subscribers, SubscriptionFactory subscriptionFactory)
+            throws AMQException
+    {
+        if (name == null)
+        {
+            throw new IllegalArgumentException("Queue name must not be null");
+        }
+        if (queueRegistry == null)
+        {
+            throw new IllegalArgumentException("Queue registry must not be null");
+        }
+        _name = name;
+        _durable = durable;
+        _owner = owner;
+        _autoDelete = autoDelete;
+        _queueRegistry = queueRegistry;
+        _asyncDelivery = asyncDelivery;
+        _managedObject = createMBean();
+        _managedObject.register();
+        _subscribers = subscribers;
+        _subscriptionFactory = subscriptionFactory;
+        _deliveryMgr = new DeliveryManager(_subscribers, this);
+    }
+
+    private AMQQueueMBean createMBean() throws AMQException
+    {
+        try
+        {
+            return new AMQQueueMBean();
+        }
+        catch (NotCompliantMBeanException ex)
+        {
+            throw new AMQException("AMQQueue MBean creation has failed.", ex);
+        }
+    }
+
+    public String getName()
+    {
+        return _name;
+    }
+
+    public boolean isShared()
+    {
+        return _owner == null;
+    }
+
+    public boolean isDurable()
+    {
+        return _durable;
+    }
+
+    public String getOwner()
+    {
+        return _owner;
+    }
+
+    public boolean isAutoDelete()
+    {
+        return _autoDelete;
+    }
+
+    public int getMessageCount()
+    {
+        return _deliveryMgr.getQueueMessageCount();
+    }
+
+    public ManagedObject getManagedObject()
+    {
+        return _managedObject;
+    }
+
+    public void bind(String routingKey, Exchange exchange)
+    {
+        _bindings.addBinding(routingKey, exchange);
+    }
+
+    public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks)
+            throws AMQException
+    {
+        debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
+
+        Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks);
+        _subscribers.addSubscriber(subscription);
+    }
+
+    public void unregisterProtocolSession(AMQProtocolSession ps, int channel, String consumerTag) throws AMQException
+    {
+        debug("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag,
+              this);
+
+        Subscription removedSubscription;
+        if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel,
+                                                                                                         ps,
+                                                                                                         consumerTag)))
+            == null)
+        {
+            throw new AMQException("Protocol session with channel " + channel + " and consumer tag " + consumerTag +
+                                   " and protocol session key " + ps.getKey() + " not registered with queue " + this);
+        }
+
+        // if we are eligible for auto deletion, unregister from the queue registry
+        if (_autoDelete && _subscribers.isEmpty())
+        {
+            autodelete();
+            // we need to manually fire the event to the removed subscription (which was the last one left for this
+            // queue. This is because the delete method uses the subscription set which has just been cleared
+            removedSubscription.queueDeleted(this);
+        }
+    }
+
+    public int delete(boolean checkUnused, boolean checkEmpty) throws AMQException
+    {
+        if (checkUnused && !_subscribers.isEmpty())
+        {
+            _logger.info("Will not delete " + this + " as it is in use.");
+            return 0;
+        }
+        else if (checkEmpty && _deliveryMgr.getQueueMessageCount() > 0)
+        {
+            _logger.info("Will not delete " + this + " as it is not empty.");
+            return 0;
+        }
+        else
+        {
+            delete();
+            return _deliveryMgr.getQueueMessageCount();
+        }
+    }
+
+    public void delete() throws AMQException
+    {
+        _subscribers.queueDeleted(this);
+        _bindings.deregister();
+        _queueRegistry.unregisterQueue(_name);
+        _managedObject.unregister();
+    }
+
+    protected void autodelete() throws AMQException
+    {
+        debug("autodeleting {0}", this);
+        delete();
+    }
+
+    public void deliver(AMQMessage msg) throws AMQException
+    {
+        TxnBuffer buffer = msg.getTxnBuffer();
+        if (buffer == null)
+        {
+            //non-transactional
+            record(msg);
+            process(msg);
+        }
+        else
+        {
+            buffer.enlist(new Deliver(msg));
+        }
+    }
+
+    private void record(AMQMessage msg) throws AMQException
+    {
+        msg.enqueue(this);
+        msg.incrementReference();
+    }
+
+    private void process(AMQMessage msg) throws FailedDequeueException
+    {
+        _deliveryMgr.deliver(getName(), msg);
+        try
+        {
+            msg.checkDeliveredToConsumer();
+            updateReceivedMessageCount(msg);
+        }
+        catch (NoConsumersException e)
+        {
+            // as this message will be returned, it should be removed
+            // from the queue:
+            dequeue(msg);
+        }
+    }
+
+    void dequeue(AMQMessage msg) throws FailedDequeueException
+    {
+        try
+        {
+            msg.dequeue(this);
+            msg.decrementReference();
+        }
+        catch (MessageCleanupException e)
+        {
+            //Message was dequeued, but could notthen be deleted
+            //though it is no longer referenced. This should be very
+            //rare and can be detected and cleaned up on recovery or
+            //done through some form of manual intervention.
+            _logger.error(e, e);
+        }
+        catch (AMQException e)
+        {
+            throw new FailedDequeueException(_name, e);
+        }
+    }
+
+    public void deliverAsync()
+    {
+        _deliveryMgr.processAsync(_asyncDelivery);
+    }
+
+    protected SubscriptionManager getSubscribers()
+    {
+        return _subscribers;
+    }
+
+    protected void updateReceivedMessageCount(AMQMessage msg)
+    {
+        _totalMessagesReceived++;
+        _managedObject.checkForNotification(msg);
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass())
+        {
+            return false;
+        }
+
+        final AMQQueue amqQueue = (AMQQueue) o;
+
+        return (_name.equals(amqQueue._name));
+    }
+
+    public int hashCode()
+    {
+        return _name.hashCode();
+    }
+
+    public String toString()
+    {
+        return "Queue(" + _name + ")@" + System.identityHashCode(this);
+    }
+
+    private void debug(String msg, Object... args)
+    {
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug(MessageFormat.format(msg, args));
+        }
+    }
+
+    private class Deliver implements TxnOp
+    {
+        private final AMQMessage _msg;
+
+        Deliver(AMQMessage msg)
+        {
+            _msg = msg;
+        }
+
+        public void prepare() throws AMQException
+        {
+            //do the persistent part of the record()
+            _msg.enqueue(AMQQueue.this);
+        }
+
+        public void undoPrepare()
+        {
+        }
+
+        public void commit()
+        {
+            //do the memeory part of the record()
+            _msg.incrementReference();
+            //then process the message
+            try
+            {
+                process(_msg);
+            }
+            catch (FailedDequeueException e)
+            {
+                //TODO: is there anything else we can do here? I think not...
+                _logger.error("Error during commit of a queue delivery: " + e, e);
+            }
+        }
+
+        public void rollback()
+        {
+        }
+    }
+
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,47 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class DefaultQueueRegistry implements QueueRegistry
+{
+    private ConcurrentMap<String, AMQQueue> _queueMap = new ConcurrentHashMap<String, AMQQueue>();
+
+    public DefaultQueueRegistry()
+    {
+    }
+
+    public void registerQueue(AMQQueue queue) throws AMQException
+    {
+        _queueMap.put(queue.getName(), queue);
+    }
+
+    public void unregisterQueue(String name) throws AMQException
+    {
+        _queueMap.remove(name);
+    }
+
+    public AMQQueue getQueue(String name)
+    {
+        return _queueMap.get(name);
+    }
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,328 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.activemq.amqp.command.ContentBody;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.util.ConcurrentLinkedQueueNoSize;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Manages delivery of messages on behalf of a queue
+ */
+public class DeliveryManager
+{
+    private static final Logger _log = Logger.getLogger(DeliveryManager.class);
+
+    public boolean compressBufferOnQueue;
+    /**
+     * Holds any queued messages
+     */
+    private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueNoSize<AMQMessage>();
+    //private int _messageCount;
+    /**
+     * Ensures that only one asynchronous task is running for this manager at
+     * any time.
+     */
+    private final AtomicBoolean _processing = new AtomicBoolean();
+    /**
+     * The subscriptions on the queue to whom messages are delivered
+     */
+    private final SubscriptionManager _subscriptions;
+
+    /**
+     * A reference to the queue we are delivering messages for. We need this to be able
+     * to pass the code that handles acknowledgements a handle on the queue.
+     */
+    private final AMQQueue _queue;
+
+
+    private volatile int _queueSize = 0;
+
+    DeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
+    {
+
+        if (compressBufferOnQueue)
+        {
+            _log.info("Compressing Buffers on queue.");
+        }
+
+        _subscriptions = subscriptions;
+        _queue = queue;
+    }
+
+    /**
+     * @return boolean if we are queueing
+     */
+    private boolean queueing()
+    {
+        return getMessageCount() != 0;
+    }
+
+
+    /**
+     * @param msg to enqueue
+     * @return true if we are queue this message
+     */
+    private boolean enqueue(AMQMessage msg)
+    {
+        if (msg.isImmediate())
+        {
+            return false;
+        }
+        else
+        {
+            if (queueing())
+            {
+                return addMessageToQueue(msg);
+            }
+            else
+            {
+                return false;
+            }
+        }
+    }
+
+    private void startQueueing(AMQMessage msg)
+    {
+        if (!msg.isImmediate())
+        {
+            addMessageToQueue(msg);
+        }
+    }
+
+    private boolean addMessageToQueue(AMQMessage msg)
+    {
+        // Shrink the ContentBodies to their actual size to save memory.
+        // synchronize to ensure this msg is the next one to get added.
+        if (compressBufferOnQueue)
+        {
+            synchronized(_messages)
+            {
+                Iterator it = msg.getContentBodies().iterator();
+                while (it.hasNext())
+                {
+                    ContentBody cb = (ContentBody) it.next();
+                    cb.reduceBufferToFit();
+                }
+
+                _messages.offer(msg);
+                _queueSize++;
+            }
+        }
+        else
+        {
+            _messages.offer(msg);
+            _queueSize++;
+        }
+        return true;
+    }
+
+
+    /**
+     * Determines whether there are queued messages. Sets _queueing to false if
+     * there are no queued messages. This needs to be atomic.
+     *
+     * @return true if there are queued messages
+     */
+    private boolean hasQueuedMessages()
+    {
+        return getMessageCount() != 0;
+    }
+
+    public int getQueueMessageCount()
+    {
+        return getMessageCount();
+    }
+
+    /**
+     * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine size.
+     *
+     * @return int the number of messages in the delivery queue.
+     */
+
+    private int getMessageCount()
+    {
+        return _messages.size();
+    }
+
+
+    protected synchronized List<AMQMessage> getMessages()
+    {
+        return new ArrayList<AMQMessage>(_messages);
+    }
+
+    protected synchronized void removeAMessageFromTop() throws AMQException
+    {
+        AMQMessage msg = poll();
+        if (msg != null)
+        {
+            msg.dequeue(_queue);
+        }
+    }
+
+    protected synchronized void clearAllMessages() throws AMQException
+    {
+        AMQMessage msg = poll();
+        while (msg != null)
+        {
+            msg.dequeue(_queue);
+            msg = poll();
+        }
+    }
+
+    /**
+     * Only one thread should ever execute this method concurrently, but
+     * it can do so while other threads invoke deliver().
+     */
+    private void processQueue()
+    {
+        try
+        {
+            boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
+            while (hasQueuedMessages() && hasSubscribers)
+            {
+                // _log.debug("Have messages(" + _messages.size() + ") and subscribers");
+                Subscription next = _subscriptions.nextSubscriber(peek());
+
+                //We don't synchronize access to subscribers so need to re-check
+                if (next != null)
+                {
+                    next.send(poll(), _queue);
+                }
+                else
+                {
+                    hasSubscribers = false;
+                }
+            }
+        }
+        catch (FailedDequeueException e)
+        {
+            _log.error("Unable to deliver message as dequeue failed: " + e, e);
+        }
+        finally
+        {
+            _log.debug("End of processQueue: (" + _queueSize + ")" + " subscribers:" + _subscriptions.hasActiveSubscribers());
+            _processing.set(false);
+        }
+    }
+
+    private AMQMessage peek()
+    {
+        return _messages.peek();
+    }
+
+    private AMQMessage poll()
+    {
+        _queueSize--;
+        return _messages.poll();
+    }
+
+    Runner asyncDelivery = new Runner();
+
+    /**
+     * Requests that the delivery manager start processing the queue asynchronously
+     * if there is work that can be done (i.e. there are messages queued up and
+     * subscribers that can receive them.
+     * <p/>
+     * This should be called when subscribers are added, but only after the consume-ok
+     * message has been returned as message delivery may start immediately. It should also
+     * be called after unsuspending a client.
+     * <p/>
+     *
+     * @param executor the executor on which the delivery should take place
+     */
+    void processAsync(Executor executor)
+    {
+        _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + _queueSize + ")" +
+                   " Active:" + _subscriptions.hasActiveSubscribers() +
+                   " Processing:" + _processing.get());
+
+        if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
+        {
+            //are we already running? if so, don't re-run
+            if (_processing.compareAndSet(false, true))
+            {
+                executor.execute(asyncDelivery);
+            }
+        }
+    }
+
+    /**
+     * Handles message delivery. The delivery manager is always in one of two modes;
+     * it is either queueing messages for asynchronous delivery or delivering
+     * directly.
+     *
+     * @param name the name of the entity on whose behalf we are delivering the message
+     * @param msg  the message to deliver
+     * @throws FailedDequeueException if the message could not be dequeued
+     */
+    void deliver(String name, AMQMessage msg) throws FailedDequeueException
+    {
+        // first check whether we are queueing, and enqueue if we are
+        if (!enqueue(msg))
+        {
+            // not queueing so deliver message to 'next' subscriber
+            Subscription s = _subscriptions.nextSubscriber(msg);
+            if (s == null)
+            {
+                if (!msg.isImmediate())
+                {
+                    if (_subscriptions instanceof SubscriptionSet)
+                    {
+                        if (_log.isDebugEnabled())
+                        {
+                            _log.debug("Start Queueing messages Active Subs:" + _subscriptions.hasActiveSubscribers()
+                                       + " Size :" + ((SubscriptionSet) _subscriptions).size()
+                                       + " Empty :" + ((SubscriptionSet) _subscriptions).isEmpty());
+                        }
+                    }
+                    else
+                    {
+                        if (_log.isDebugEnabled())
+                        {
+                            _log.debug("Start Queueing messages Active Subs:" + _subscriptions.hasActiveSubscribers());
+                        }
+                    }
+                    // no subscribers yet so enter 'queueing' mode and queue this message
+                    startQueueing(msg);
+                }
+            }
+            else
+            {
+                s.send(msg, _queue);
+                msg.setDeliveredToConsumer();
+            }
+        }
+    }
+
+    private class Runner implements Runnable
+    {
+        public void run()
+        {
+            processQueue();
+        }
+    }
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,109 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.AMQException;
+
+import java.util.List;
+import java.util.HashSet;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * When a queue is deleted, it should be deregistered from any
+ * exchange it has been bound to. This class assists in this task,
+ * by keeping track of all bindings for a given queue.
+ */
+class ExchangeBindings
+{
+    static class ExchangeBinding
+    {
+        private final Exchange exchange;
+        private final String routingKey;
+
+        ExchangeBinding(String routingKey, Exchange exchange)
+        {
+            this.routingKey = routingKey;
+            this.exchange = exchange;
+        }
+
+        void unbind(AMQQueue queue) throws AMQException
+        {
+            exchange.deregisterQueue(routingKey, queue);
+        }
+
+        public Exchange getExchange()
+        {
+            return exchange;
+        }
+
+        public String getRoutingKey()
+        {
+            return routingKey;
+        }
+
+        public int hashCode()
+        {
+            return exchange.hashCode() + routingKey.hashCode();
+        }
+
+        public boolean equals(Object o)
+        {
+            if (!(o instanceof ExchangeBinding)) return false;
+            ExchangeBinding eb = (ExchangeBinding) o;
+            return exchange.equals(eb.exchange) && routingKey.equals(eb.routingKey);
+        }
+    }
+
+    private final List<ExchangeBinding> _bindings = new CopyOnWriteArrayList<ExchangeBinding>();
+    private final AMQQueue _queue;
+
+    ExchangeBindings(AMQQueue queue)
+    {
+        _queue = queue;
+    }
+
+    /**
+     * Adds the specified binding to those being tracked.
+     * @param routingKey the routing key with which the queue whose bindings
+     * are being tracked by the instance has been bound to the exchange
+     * @param exchange the exchange bound to
+     */
+    void addBinding(String routingKey, Exchange exchange)
+    {
+        _bindings.add(new ExchangeBinding(routingKey, exchange));
+    }
+
+    /**
+     * Deregisters this queue from any exchange it has been bound to
+     */
+    void deregister() throws AMQException
+    {
+        //remove duplicates at this point
+        HashSet<ExchangeBinding> copy = new HashSet<ExchangeBinding>(_bindings);
+        for (ExchangeBinding b : copy)
+        {
+            b.unbind(_queue);
+        }
+    }
+
+    List<ExchangeBinding> getExchangeBindings()
+    {
+        return _bindings;
+    }
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,36 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+/**
+ * Signals that the dequeue of a message from a queue failed
+ */
+public class FailedDequeueException extends AMQException
+{
+    public FailedDequeueException(String queue)
+    {
+        super("Failed to dequeue message from " + queue);
+    }
+
+    public FailedDequeueException(String queue, AMQException e)
+    {
+        super("Failed to dequeue message from " + queue, e);
+    }
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,212 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.management.MBeanAttribute;
+import org.apache.qpid.server.management.MBeanOperation;
+import org.apache.qpid.server.management.MBeanOperationParameter;
+
+import javax.management.JMException;
+import javax.management.MBeanOperationInfo;
+import javax.management.openmbean.TabularData;
+import java.io.IOException;
+
+/**
+ * The management interface exposed to allow management of a queue.
+ * @author  Robert J. Greig
+ * @author  Bhupendra Bhardwaj
+ * @version 0.1
+ */
+public interface ManagedQueue
+{
+    static final String TYPE = "Queue";
+
+    /**
+     * Returns the Name of the ManagedQueue.
+     * @return the name of the managedQueue.
+     * @throws IOException
+     */
+    @MBeanAttribute(name="Name", description = "Name of the " + TYPE)
+    String getName() throws IOException;
+
+    /**
+     * Tells whether this ManagedQueue is durable or not.
+     * @return true if this ManagedQueue is a durable queue.
+     * @throws IOException
+     */
+    @MBeanAttribute(name="Durable", description = "true if the AMQQueue is durable")
+    boolean isDurable() throws IOException;
+
+    /**
+     * Tells the Owner of the ManagedQueue.
+     * @return the owner's name.
+     * @throws IOException
+     */
+    @MBeanAttribute(name="Owner", description = "Owner")
+    String getOwner() throws IOException;
+
+    /**
+     * Tells if the ManagedQueue is set to AutoDelete.
+     * @return  true if the ManagedQueue is set to AutoDelete.
+     * @throws IOException
+     */
+    @MBeanAttribute(name="AutoDelete", description = "true if the AMQQueue is AutoDelete")
+    boolean isAutoDelete() throws IOException;
+
+    /**
+     * Total number of messages on the queue, which are yet to be delivered to the consumer(s).
+     * @return number of undelivered message in the Queue.
+     * @throws IOException
+     */
+    @MBeanAttribute(name="MessageCount",
+                         description = "Total number of undelivered messages on the queue")
+    Integer getMessageCount() throws IOException;
+
+    /**
+     * Returns the maximum size of a message (in kbytes) allowed to be accepted by the
+     * ManagedQueue. This is useful in setting notifications or taking
+     * appropriate action, if the size of the message received is more than
+     * the allowed size.
+     * @return the maximum size of a message allowed to be aceepted by the
+     *         ManagedQueue.
+     * @throws IOException
+     */
+    Long getMaximumMessageSize() throws IOException;
+
+    /**
+     * Sets the maximum size of the message (in kbytes) that is allowed to be
+     * accepted by the Queue.
+     * @param size  maximum size of message.
+     * @throws IOException
+     */
+    @MBeanAttribute(name="MaximumMessageSize",
+                         description="Maximum size(KB) of a message allowed for this Queue")
+    void setMaximumMessageSize(Long size) throws IOException;
+
+    /**
+     * Returns the total number of subscribers to the queue.
+     * @return the number of subscribers.
+     * @throws IOException
+     */
+    @MBeanAttribute(name="ConsumerCount", description="The total number of subscribers to the queue")
+    Integer getConsumerCount() throws IOException;
+
+    /**
+     *  Returns the total number of active subscribers to the queue.
+     * @return the number of active subscribers
+     * @throws IOException
+     */
+    @MBeanAttribute(name="ActiveConsumerCount", description="The total number of active subscribers to the queue")
+    Integer getActiveConsumerCount() throws IOException;
+
+    /**
+     * Tells the total number of messages receieved by the queue since startup.
+     * @return total number of messages received.
+     * @throws IOException
+     */
+    @MBeanAttribute(name="ReceivedMessageCount",
+                         description="The total number of messages receieved by the queue since startup")
+    Long getReceivedMessageCount() throws IOException;
+
+    /**
+     * Tells the maximum number of messages that can be stored in the queue.
+     * This is useful in setting the notifications or taking required
+     * action is the number of message increase this limit.
+     * @return maximum muber of message allowed to be stored in the queue.
+     * @throws IOException
+     */
+    Integer getMaximumMessageCount() throws IOException;
+
+    /**
+     * Sets the maximum number of messages allowed to be stored in the queue.
+     * @param value  the maximum number of messages allowed to be stored in the queue.
+     * @throws IOException
+     */
+    @MBeanAttribute(name="MaximumMessageCount",
+                         description="The maximum number of messages allowed to be stored in the queue")
+    void setMaximumMessageCount(Integer value) throws IOException;
+
+    /**
+     * Size of messages in the queue
+     * @return
+     * @throws IOException
+     */
+    @MBeanAttribute(name="QueueSize", description="Size of messages(KB) in the queue")
+    Long getQueueSize() throws IOException;
+
+    /**
+     * Tells the maximum size of all the messages combined together,
+     * that can be stored in the queue. This is useful for setting notifications
+     * or taking required action if the size of messages stored in the queue
+     * increases over this limit.
+     * @return maximum size of the all the messages allowed for the queue.
+     * @throws IOException
+     */
+    Long getQueueDepth() throws IOException;
+
+    /**
+     * Sets the maximum size of all the messages together, that can be stored
+     * in the queue.
+     * @param value
+     * @throws IOException
+     */
+    @MBeanAttribute(name="QueueDepth",
+                         description="The size(KB) of all the messages together, that can be stored in the queue")
+    void setQueueDepth(Long value) throws IOException;
+
+
+
+    //********** Operations *****************//
+
+
+    /**
+     * Returns a subset of all the messages stored in the queue. The messages
+     * are returned based on the given index numbers.
+     * @param fromIndex
+     * @param toIndex
+     * @return
+     * @throws IOException
+     * @throws JMException
+     */
+    @MBeanOperation(name="viewMessages",
+                         description="shows messages in this queue with given indexes. eg. from index 1 - 100")
+    TabularData viewMessages(@MBeanOperationParameter(name="from index", description="from index")int fromIndex,
+                             @MBeanOperationParameter(name="to index", description="to index")int toIndex)
+        throws IOException, JMException;
+
+    /**
+     * Deletes the first message from top.
+     * @throws IOException
+     * @throws JMException
+     */
+    @MBeanOperation(name="deleteMessageFromTop",
+                         description="Deletes the first message from top",
+                         impact= MBeanOperationInfo.ACTION)
+    void deleteMessageFromTop() throws IOException, JMException;
+
+    /**
+     * Clears the queue by deleting all the undelivered messages from the queue.
+     * @throws IOException
+     * @throws JMException
+     */
+    @MBeanOperation(name="clearQueue",
+                         description="Clears the queue by deleting all the undelivered messages from the queue",
+                         impact= MBeanOperationInfo.ACTION)
+    void clearQueue() throws IOException, JMException;
+
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+/**
+ * Signals that the removal of a message once its refcount reached
+ * zero failed.
+ */
+public class MessageCleanupException extends AMQException
+{
+    public MessageCleanupException(long messageId, AMQException e)
+    {
+        super("Failed to cleanup message with id " + messageId, e);
+    }
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,52 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.activemq.amqp.command.*;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.protocol.AMQConstant;
+
+import java.util.List;
+
+/**
+ * Signals that no consumers exist for a message at a given point in time.
+ * Used if a message has immediate=true and there are no consumers registered
+ * with the queue.
+ */
+public class NoConsumersException extends RequiredDeliveryException
+{
+    public NoConsumersException(String queue,
+                                BasicPublishBody publishBody,
+                                ContentHeaderBody contentHeaderBody,
+                                List<ContentBody> contentBodies)
+    {
+        super("Immediate delivery to " + queue + " is not possible.", publishBody, contentHeaderBody, contentBodies);
+    }
+
+    public NoConsumersException(BasicPublishBody publishBody,
+                                ContentHeaderBody contentHeaderBody,
+                                List<ContentBody> contentBodies)
+    {
+        super("Immediate delivery is not possible.", publishBody, contentHeaderBody, contentBodies);
+    }
+
+    public int getReplyCode()
+    {
+        return AMQConstant.NO_CONSUMERS.getCode();
+    }
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,30 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+
+public interface QueueRegistry
+{
+    void registerQueue(AMQQueue queue) throws AMQException;
+
+    void unregisterQueue(String name) throws AMQException;
+
+    AMQQueue getQueue(String name);
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/Subscription.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/Subscription.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/Subscription.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,29 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+public interface Subscription
+{
+    void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException;
+
+    boolean isSuspended();
+
+    void queueDeleted(AMQQueue queue);
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,37 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.AMQException;
+
+/**
+ * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This
+ * factory primarily assists testing although in future more sophisticated subscribers may need a different
+ * subscription implementation.
+ *
+ * @see org.apache.qpid.server.queue.AMQQueue
+ */
+public interface SubscriptionFactory
+{
+    Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
+        throws AMQException;
+
+    Subscription createSubscription(int channel, AMQProtocolSession protocolSession,String consumerTag)
+        throws AMQException;
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,190 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.activemq.amqp.command.*;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+
+/**
+ * Encapsulation of a supscription to a queue.
+ * <p/>
+ * Ties together the protocol session of a subscriber, the consumer tag that
+ * was given out by the broker and the channel id.
+ * <p/>
+ */
+public class SubscriptionImpl implements Subscription
+{
+    private static final Logger _logger = Logger.getLogger(SubscriptionImpl.class);
+
+    public final AMQChannel channel;
+
+    public final AMQProtocolSession protocolSession;
+
+    public final String consumerTag;
+
+    private final Object sessionKey;
+
+    /**
+     * True if messages need to be acknowledged
+     */
+    private final boolean _acks;
+
+    public static class Factory implements SubscriptionFactory
+    {
+        public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
+                throws AMQException
+        {
+            return new SubscriptionImpl(channel, protocolSession, consumerTag, acks);
+        }
+
+        public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
+                throws AMQException
+        {
+            return new SubscriptionImpl(channel, protocolSession, consumerTag);
+        }
+    }
+
+    public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
+                            String consumerTag, boolean acks)
+            throws AMQException
+    {
+        AMQChannel channel = protocolSession.getChannel(channelId);
+        if (channel == null)
+        {
+            throw new NullPointerException("channel not found in protocol session");
+        }
+
+        this.channel = channel;
+        this.protocolSession = protocolSession;
+        this.consumerTag = consumerTag;
+        sessionKey = protocolSession.getKey();
+        _acks = acks;
+    }
+
+    public SubscriptionImpl(int channel, AMQProtocolSession protocolSession,
+                            String consumerTag)
+            throws AMQException
+    {
+        this(channel, protocolSession, consumerTag, false);
+    }
+
+    public boolean equals(Object o)
+    {
+        return (o instanceof SubscriptionImpl) && equals((SubscriptionImpl) o);
+    }
+
+    /**
+     * Equality holds if the session matches and the channel and consumer tag are the same.
+     */
+    private boolean equals(SubscriptionImpl psc)
+    {
+        return sessionKey.equals(psc.sessionKey)
+               && psc.channel == channel
+               && psc.consumerTag.equals(consumerTag);
+    }
+
+    public int hashCode()
+    {
+        return sessionKey.hashCode();
+    }
+
+    public String toString()
+    {
+        return "[channel=" + channel + ", consumerTag=" + consumerTag + ", session=" + protocolSession.getKey() + "]";
+    }
+
+    /**
+     * This method can be called by each of the publisher threads.
+     * As a result all changes to the channel object must be thread safe.
+     *
+     * @param msg
+     * @param queue
+     * @throws AMQException
+     */
+    public void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
+    {
+        if (msg != null)
+        {
+            // if we do not need to wait for client acknowledgements
+            // we can decrement the reference count immediately. 
+            
+            // By doing this _before_ the send we ensure that it
+            // doesn't get sent if it can't be dequeued, preventing
+            // duplicate delivery on recovery.
+
+            // The send may of course still fail, in which case, as
+            // the message is unacked, it will be lost.
+            if (!_acks)
+            {
+                queue.dequeue(msg);
+            }
+            synchronized(channel)
+            {
+                long deliveryTag = channel.getNextDeliveryTag();
+
+                if (_acks)
+                {
+                    channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+                }
+
+//                ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
+//                AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
+
+                BasicDeliverBody deliver = createDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
+                CompositeAMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
+                
+                protocolSession.writeFrame(frame);
+            }
+        }
+        else
+        {
+            _logger.error("Attempt to send Null message", new NullPointerException());
+        }
+    }
+
+    public boolean isSuspended()
+    {
+        return channel.isSuspended();
+    }
+
+    /**
+     * Callback indicating that a queue has been deleted.
+     *
+     * @param queue
+     */
+    public void queueDeleted(AMQQueue queue)
+    {
+        channel.queueDeleted(queue);
+    }
+    
+    private BasicDeliverBody createDeliverFrame(long deliveryTag, String routingKey, String exchange)
+    {
+    	BasicDeliverBody basicDeliverBody = new BasicDeliverBody();
+    	basicDeliverBody.setChannel(channel.getChannelId());
+    	basicDeliverBody.setConsumerTag(consumerTag);
+    	basicDeliverBody.setDeliveryTag(deliveryTag);
+    	basicDeliverBody.setRedelivered(false);
+    	basicDeliverBody.setExchange(exchange);
+    	basicDeliverBody.setRoutingKey(routingKey);
+    	return basicDeliverBody;
+    }
+
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,28 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+/**
+ * Abstraction of actor that will determine the subscriber to whom
+ * a message will be sent.
+ */
+public interface SubscriptionManager
+{
+    public boolean hasActiveSubscribers();
+    public Subscription nextSubscriber(AMQMessage msg);
+}