You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/08/15 05:41:24 UTC

svn commit: r686136 [7/17] - in /incubator/qpid/branches/qpid.0-10/java: ./ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/bin/ broker/etc/ broker/...

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Thu Aug 14 20:40:49 2008
@@ -25,131 +25,55 @@
 import org.apache.qpid.framing.AMQBody;
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.exchange.Exchange;
 
-import java.util.HashMap;
-import java.util.HashSet;
+
 import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * A deliverable message.
  */
-public class AMQMessage
+public class AMQMessage implements Filterable<AMQException>
 {
     /** Used for debugging purposes. */
     private static final Logger _log = Logger.getLogger(AMQMessage.class);
 
-    /** Used in clustering. @todo What for? */
-    private Set<Object> _tokens;
-
-    /** Only use in clustering. @todo What for? */
-    private AMQProtocolSession _publisher;
-
-    private final Long _messageId;
-
     private final AtomicInteger _referenceCount = new AtomicInteger(1);
 
-    private AMQMessageHandle _messageHandle;
+    private final AMQMessageHandle _messageHandle;
 
     /** Holds the transactional context in which this message is being processed. */
-    private TransactionalContext _txnContext;
+    private StoreContext _storeContext;
+
+    /** Flag to indicate that this message requires 'immediate' delivery. */
+
+    private static final byte IMMEDIATE = 0x01;
 
     /**
      * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality
      * for messages published with the 'immediate' flag.
      */
-    private boolean _deliveredToConsumer;
 
-    /** Flag to indicate that this message requires 'immediate' delivery. */
-    private boolean _immediate;
+    private static final byte DELIVERED_TO_CONSUMER = 0x02;
 
-    private TransientMessageData _transientMessageData = new TransientMessageData();
+    private byte _flags = 0;
 
     private long _expiration;
 
+    private final long _size;
 
+    private AMQProtocolSession.ProtocolSessionIdentifier _sessionIdentifier;
+    private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
 
 
-    private Exchange _exchange;
-    private static final boolean SYNCED_CLOCKS =
-            ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false);
-
-    private static final long UNKNOWN_SIZE = Long.MIN_VALUE;
-
-    private long _size = UNKNOWN_SIZE;
-
-
-
-    public String debugIdentity()
-    {
-        return "(HC:" + System.identityHashCode(this) + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
-    }
-
-    public void setExpiration()
-    {
-        long expiration =
-                ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration();
-        long timestamp =
-                ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
-
-        if (SYNCED_CLOCKS)
-        {
-            _expiration = expiration;
-        }
-        else
-        {
-            // Update TTL to be in broker time.
-            if (expiration != 0L)
-            {
-                if (timestamp != 0L)
-                {
-                    // todo perhaps use arrival time
-                    long diff = (System.currentTimeMillis() - timestamp);
-
-                    if ((diff > 1000L) || (diff < 1000L))
-                    {
-                        _expiration = expiration + diff;
-                    }
-                }
-            }
-        }
-
-    }
-
-    public boolean isReferenced()
-    {
-        return _referenceCount.get() > 0;
-    }
-
-    public void setExchange(final Exchange exchange)
-    {
-        _exchange = exchange;
-    }
-
-    public void route() throws AMQException
-    {
-        _exchange.route(this);
-    }
-
-    public void enqueue(final List<AMQQueue> queues)
-    {
-        _transientMessageData.setDestinationQueues(queues);
-    }
 
     /**
      * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
@@ -172,7 +96,7 @@
         {
             try
             {
-                return _index < (_messageHandle.getBodyCount(getStoreContext(), _messageId) - 1);
+                return _index < (_messageHandle.getBodyCount(getStoreContext()) - 1);
             }
             catch (AMQException e)
             {
@@ -189,7 +113,7 @@
 
                 AMQBody cb =
                         getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(),
-                                                                                                         _messageId, ++_index));
+                                                                                                         ++_index));
 
                 return new AMQFrame(_channel, cb);
             }
@@ -212,9 +136,14 @@
         }
     }
 
+    public void clearStoreContext()
+    {
+        _storeContext = new StoreContext();
+    }
+
     public StoreContext getStoreContext()
     {
-        return _txnContext.getStoreContext();
+        return _storeContext;
     }
 
     private class BodyContentIterator implements Iterator<ContentChunk>
@@ -226,7 +155,7 @@
         {
             try
             {
-                return _index < (_messageHandle.getBodyCount(getStoreContext(), _messageId) - 1);
+                return _index < (_messageHandle.getBodyCount(getStoreContext()) - 1);
             }
             catch (AMQException e)
             {
@@ -240,7 +169,7 @@
         {
             try
             {
-                return _messageHandle.getContentChunk(getStoreContext(), _messageId, ++_index);
+                return _messageHandle.getContentChunk(getStoreContext(), ++_index);
             }
             catch (AMQException e)
             {
@@ -254,13 +183,7 @@
         }
     }
 
-    public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext)
-    {
-        _messageId = messageId;
-        _txnContext = txnContext;
-        _immediate = info.isImmediate();
-        _transientMessageData.setMessagePublishInfo(info);
-    }
+
 
     /**
      * Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal
@@ -276,141 +199,85 @@
     public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext)
             throws AMQException
     {
-        _messageId = messageId;
         _messageHandle = factory.createMessageHandle(messageId, store, true);
-        _txnContext = txnConext;
-        _transientMessageData = null;
-
+        _storeContext = txnConext.getStoreContext();
+        _size = _messageHandle.getBodySize(txnConext.getStoreContext());
     }
 
-    /**
-     * Used in testing only. This allows the passing of the content header immediately on construction.
-     *
-     * @param messageId
-     * @param info
-     * @param txnContext
-     * @param contentHeader
-     */
-    public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext,
-                      ContentHeaderBody contentHeader) throws AMQException
-    {
-        this(messageId, info, txnContext);
-        setContentHeaderBody(contentHeader);
-    }
-
-    /* *
-     * Used in testing only. This allows the passing of the content header and some body fragments on construction.
+        /**
+     * Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal
+     * enqueue/routingComplete is not done since the recovery process is responsible for routing the messages to
+     * queues.
      *
-     * @param messageId
-     * @param info
-     * @param txnContext
-     * @param contentHeader
-     * @param destinationQueues
-     * @param contentBodies
+     * @param messageHandle
      *
      * @throws AMQException
-     */        /*
-    public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext,
-                      ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, List<ContentChunk> contentBodies,
-                      MessageStore messageStore, StoreContext storeContext, MessageHandleFactory messageHandleFactory) throws AMQException
+     */
+    public AMQMessage(
+                AMQMessageHandle messageHandle,
+                StoreContext storeConext,
+                MessagePublishInfo info)
+            throws AMQException
     {
-        this(messageId, info, txnContext, contentHeader);
-        _transientMessageData.setDestinationQueues(destinationQueues);
-        routingComplete(messageStore, storeContext, messageHandleFactory);
-        for (ContentChunk cb : contentBodies)
+        _messageHandle = messageHandle;
+        _storeContext = storeConext;
+
+        if(info.isImmediate())
         {
-            addContentBodyFrame(storeContext, cb);
+            _flags |= IMMEDIATE;
         }
+        _size = messageHandle.getBodySize(storeConext);
+
     }
-                 */
+
+
     protected AMQMessage(AMQMessage msg) throws AMQException
     {
-        _messageId = msg._messageId;
         _messageHandle = msg._messageHandle;
-        _txnContext = msg._txnContext;
-        _deliveredToConsumer = msg._deliveredToConsumer;
-        _transientMessageData = msg._transientMessageData;
-    }
+        _storeContext = msg._storeContext;
+        _flags = msg._flags;
+        _size = msg._size;
 
-    public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel)
-    {
-        return new BodyFrameIterator(protocolSession, channel);
     }
 
-    public Iterator<ContentChunk> getContentBodyIterator()
+
+    public String debugIdentity()
     {
-        return new BodyContentIterator();
+        return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageId() + " Ref:" + _referenceCount.get() + ")";
     }
 
-    public ContentHeaderBody getContentHeaderBody() throws AMQException
+    public void setExpiration(final long expiration)
     {
-        if (_transientMessageData != null)
-        {
-            return _transientMessageData.getContentHeaderBody();
-        }
-        else
-        {
-            return _messageHandle.getContentHeaderBody(getStoreContext(), _messageId);
-        }
+
+        _expiration = expiration;
+
     }
 
-    public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException
+    public boolean isReferenced()
     {
-        _transientMessageData.setContentHeaderBody(contentHeaderBody);
-        _size = _transientMessageData.getContentHeaderBody().bodySize;
+        return _referenceCount.get() > 0;
     }
 
-    public void routingComplete(MessageStore store, StoreContext storeContext, MessageHandleFactory factory)
-            throws AMQException
+    public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel)
     {
-        final boolean persistent = isPersistent();
-        _messageHandle = factory.createMessageHandle(_messageId, store, persistent);
-        if (persistent)
-        {
-            _txnContext.beginTranIfNecessary();
-        }
-
-        // enqueuing the messages ensure that if required the destinations are recorded to a
-        // persistent store
-
-        for (AMQQueue q : _transientMessageData.getDestinationQueues())
-        {
-            _messageHandle.enqueue(storeContext, _messageId, q);
-        }
-
-        if (_transientMessageData.getContentHeaderBody().bodySize == 0)
-        {
-            deliver(storeContext);
-        }
-
-
+        return new BodyFrameIterator(protocolSession, channel);
     }
 
-    public boolean addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk) throws AMQException
+    public Iterator<ContentChunk> getContentBodyIterator()
     {
-        _transientMessageData.addBodyLength(contentChunk.getSize());
-        final boolean allContentReceived = isAllContentReceived();
-        _messageHandle.addContentBodyFrame(storeContext, _messageId, contentChunk, allContentReceived);
-        if (allContentReceived)
-        {
-            deliver(storeContext);
-
-            return true;
-        }
-        else
-        {
-            return false;
-        }
+        return new BodyContentIterator();
     }
 
-    public boolean isAllContentReceived() throws AMQException
+    public ContentHeaderBody getContentHeaderBody() throws AMQException
     {
-        return _transientMessageData.isAllContentReceived();
+        return _messageHandle.getContentHeaderBody(getStoreContext());
     }
 
+
+
     public Long getMessageId()
     {
-        return _messageId;
+        return _messageHandle.getMessageId();
     }
 
     /**
@@ -424,14 +291,24 @@
         return this;
     }
 
-    /** Threadsafe. Increment the reference count on the message. */
-    public void incrementReference()
+    public boolean incrementReference()
+    {
+        return incrementReference(1);
+    }
+
+    /* Threadsafe. Increment the reference count on the message. */
+    public boolean incrementReference(int count)
     {
-        _referenceCount.incrementAndGet();
-        // if (_log.isDebugEnabled())
-        // {
-        // _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
-        // }
+        if(_referenceCount.addAndGet(count) <= 1)
+        {
+            _referenceCount.addAndGet(-count);
+            return false;
+        }
+        else
+        {
+            return true;
+        }
+
     }
 
     /**
@@ -445,6 +322,7 @@
      */
     public void decrementReference(StoreContext storeContext) throws MessageCleanupException
     {
+
         int count = _referenceCount.decrementAndGet();
 
         // note that the operation of decrementing the reference count and then removing the message does not
@@ -453,25 +331,25 @@
         // not relying on the all the increments having taken place before the delivery manager decrements.
         if (count == 0)
         {
+            // set the reference count way below 0 so that we can detect that the message has been deleted
+            // this is to guard against the message being spontaneously recreated (from the mgmt console)
+            // by copying from other queues at the same time as it is being removed.
+            _referenceCount.set(Integer.MIN_VALUE/2);
+
             try
             {
-                // if (_log.isDebugEnabled())
-                // {
-                // _log.debug("Decremented ref count on message " + debugIdentity() + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
-                // }
-
                 // must check if the handle is null since there may be cases where we decide to throw away a message
                 // and the handle has not yet been constructed
                 if (_messageHandle != null)
                 {
-                    _messageHandle.removeMessage(storeContext, _messageId);
+                    _messageHandle.removeMessage(storeContext);
                 }
             }
             catch (AMQException e)
             {
                 // to maintain consistency, we revert the count
                 incrementReference();
-                throw new MessageCleanupException(_messageId, e);
+                throw new MessageCleanupException(getMessageId(), e);
             }
         }
         else
@@ -484,15 +362,6 @@
         }
     }
 
-    public void setPublisher(AMQProtocolSession publisher)
-    {
-        _publisher = publisher;
-    }
-
-    public AMQProtocolSession getPublisher()
-    {
-        return _publisher;
-    }
 
     /**
      * Called selectors to determin if the message has already been sent
@@ -501,101 +370,30 @@
      */
     public boolean getDeliveredToConsumer()
     {
-        return _deliveredToConsumer;
-    }
-
-
-    public boolean checkToken(Object token)
-    {
-
-        if (_tokens == null)
-        {
-            _tokens = new HashSet<Object>();
-        }
-
-        if (_tokens.contains(token))
-        {
-            return true;
-        }
-        else
-        {
-            _tokens.add(token);
-
-            return false;
-        }
-    }
-
-    /**
-     * Registers a queue to which this message is to be delivered. This is called from the exchange when it is routing
-     * the message. This will be called before any content bodies have been received so that the choice of
-     * AMQMessageHandle implementation can be picked based on various criteria.
-     *
-     * @param queue the queue
-     *
-     * @throws org.apache.qpid.AMQException if there is an error enqueuing the message
-     */
-    public void enqueue(AMQQueue queue) throws AMQException
-    {
-        _transientMessageData.addDestinationQueue(queue);
-    }
-
-    /**
-     * NOTE: Think about why you are using this method. Normal usages would want to do
-     * AMQQueue.dequeue(StoreContext, AMQMessage)
-     * This will keep the queue statistics up-to-date.
-     * Currently this method is only called _correctly_ from AMQQueue dequeue.
-     * Ideally we would have a better way for the queue to dequeue the message.
-     * Especially since enqueue isn't the recipriocal of this method.
-     * @deprecated
-     * @param storeContext
-     * @param queue
-     * @throws AMQException
-     */
-    void dequeue(StoreContext storeContext, AMQQueue queue) throws AMQException
-    {
-        _messageHandle.dequeue(storeContext, _messageId, queue);
+        return (_flags & DELIVERED_TO_CONSUMER) != 0;
     }
 
     public boolean isPersistent() throws AMQException
     {
-        if (_transientMessageData != null)
-        {
-            return _transientMessageData.isPersistent();
-        }
-        else
-        {
-            return _messageHandle.isPersistent(getStoreContext(), _messageId);
-        }
+        return _messageHandle.isPersistent();
     }
 
     /**
      * Called to enforce the 'immediate' flag.
      *
-     * @throws NoConsumersException if the message is marked for immediate delivery but has not been marked as delivered
+     * @returns  true if the message is marked for immediate delivery but has not been marked as delivered
      *                              to a consumer
      */
-    public void checkDeliveredToConsumer() throws NoConsumersException
+    public boolean immediateAndNotDelivered() 
     {
 
-        if (_immediate && !_deliveredToConsumer)
-        {
-            throw new NoConsumersException(this);
-        }
+        return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE;
+
     }
 
     public MessagePublishInfo getMessagePublishInfo() throws AMQException
     {
-        MessagePublishInfo pb;
-        if (_transientMessageData != null)
-        {
-            pb = _transientMessageData.getMessagePublishInfo();
-        }
-        else
-        {
-            pb = _messageHandle.getMessagePublishInfo(getStoreContext(), _messageId);
-        }
-
-        return pb;
+        return _messageHandle.getMessagePublishInfo(getStoreContext());
     }
 
     public boolean isRedelivered()
@@ -641,46 +439,9 @@
      */
     public void setDeliveredToConsumer()
     {
-        _deliveredToConsumer = true;
+        _flags |= DELIVERED_TO_CONSUMER;
     }
 
-    private void deliver(StoreContext storeContext) throws AMQException
-    {
-        // we get a reference to the destination queues now so that we can clear the
-        // transient message data as quickly as possible
-        List<AMQQueue> destinationQueues = _transientMessageData.getDestinationQueues();
-        if (_log.isDebugEnabled())
-        {
-            _log.debug("Delivering message " + debugIdentity() + " to " + destinationQueues);
-        }
-
-        try
-        {
-            // first we allow the handle to know that the message has been fully received. This is useful if it is
-            // maintaining any calculated values based on content chunks
-            _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId,
-                                                          _transientMessageData.getMessagePublishInfo(), _transientMessageData.getContentHeaderBody());
-
-            // we then allow the transactional context to do something with the message content
-            // now that it has all been received, before we attempt delivery
-            _txnContext.messageFullyReceived(isPersistent());
-
-            for (AMQQueue q : destinationQueues)
-            {
-                // Increment the references to this message for each queue delivery.
-                incrementReference();
-                // normal deliver so add this message at the end.
-                _txnContext.deliver(q.createEntry(this), false);
-            }
-        }
-        finally
-        {
-
-            // Remove refence for routing process . Reference count should now == delivered queue count
-            decrementReference(storeContext);
-            _transientMessageData = null;
-        }
-    }
 
 
     public AMQMessageHandle getMessageHandle()
@@ -690,28 +451,23 @@
 
     public long getSize()
     {
-        if(_size == UNKNOWN_SIZE)
-        {
-            try
-            {
-                _size = getContentHeaderBody().bodySize;
-            }
-            catch (AMQException e)
-            {
-                _log.warn("Unable to retrieve message meta data for message:" + this, e);
-                return 0;
-            }
-        }
         return _size;
+
+    }
+
+    public Object getPublisherClientInstance()
+    {
+        return _sessionIdentifier.getSessionInstance();
+    }
+                                                                                          
+    public Object getPublisherIdentifier()
+    {
+        return _sessionIdentifier.getSessionIdentifier();
     }
 
-    public void restoreTransientMessageData() throws AMQException
+    public void setClientIdentifier(final AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier)
     {
-        TransientMessageData transientMessageData = new TransientMessageData();
-        transientMessageData.setMessagePublishInfo(getMessagePublishInfo());
-        transientMessageData.setContentHeaderBody(getContentHeaderBody());
-        transientMessageData.addBodyLength(getContentHeaderBody().getSize());
-        _transientMessageData = transientMessageData;
+        _sessionIdentifier = sessionIdentifier;
     }
 
 
@@ -720,7 +476,7 @@
         // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
         // _taken + " by :" + _takenBySubcription;
 
-        return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount;
+        return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + _referenceCount;
     }
 
 }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java Thu Aug 14 20:40:49 2008
@@ -29,23 +29,27 @@
 /**
  * A pluggable way of getting message data. Implementations can provide intelligent caching for example or
  * even no caching at all to minimise the broker memory footprint.
- *
- * The method all take a messageId to avoid having to store it in the instance - the AMQMessage container
- * must already keen the messageId so it is pointless storing it twice.
  */
 public interface AMQMessageHandle
 {
-    ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId) throws AMQException;
+    ContentHeaderBody getContentHeaderBody(StoreContext context) throws AMQException;
+
+    /**
+     *
+     * @return the messageId for the message associated with this handle
+     */
+    Long getMessageId();
+
 
     /**
      * @return the number of body frames associated with this message
      */
-    int getBodyCount(StoreContext context, Long messageId) throws AMQException;
+    int getBodyCount(StoreContext context) throws AMQException;
 
     /**
      * @return the size of the body
      */
-    long getBodySize(StoreContext context, Long messageId) throws AMQException;
+    long getBodySize(StoreContext context) throws AMQException;
 
     /**
      * Get a particular content body
@@ -53,27 +57,23 @@
      * @return a content body
      * @throws IllegalArgumentException if the index is invalid
      */
-    ContentChunk getContentChunk(StoreContext context, Long messageId, int index) throws IllegalArgumentException, AMQException;
+    ContentChunk getContentChunk(StoreContext context, int index) throws IllegalArgumentException, AMQException;
 
-    void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentChunk contentBody, boolean isLastContentBody) throws AMQException;
+    void addContentBodyFrame(StoreContext storeContext, ContentChunk contentBody, boolean isLastContentBody) throws AMQException;
 
-    MessagePublishInfo getMessagePublishInfo(StoreContext context, Long messageId) throws AMQException;
+    MessagePublishInfo getMessagePublishInfo(StoreContext context) throws AMQException;
 
     boolean isRedelivered();
 
     void setRedelivered(boolean redelivered);
 
-    boolean isPersistent(StoreContext context, Long messageId) throws AMQException;
+    boolean isPersistent();
 
-    void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, MessagePublishInfo messagePublishInfo,
+    void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo,
                                         ContentHeaderBody contentHeaderBody)
             throws AMQException;
 
-    void removeMessage(StoreContext storeContext, Long messageId) throws AMQException;
-
-    void enqueue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException;
-
-    void dequeue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException;
+    void removeMessage(StoreContext storeContext) throws AMQException;    
 
     long getArrivalTime();
 }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Thu Aug 14 20:40:49 2008
@@ -20,1005 +20,191 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.configuration.Configured;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.management.Managable;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.AMQException;
 
-import javax.management.JMException;
-import java.text.MessageFormat;
-import java.util.*;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like that. It is described
- * fully in RFC 006.
- */
-public class AMQQueue implements Managable, Comparable
-{
-
-    /**
-     * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
-     * already exists.
-     *
-     * <p/><table id="crc"><caption>CRC Card</caption>
-     * <tr><th> Responsibilities <th> Collaborations
-     * <tr><td> Represent failure to create a subscription, because an exclusive subscription already exists.
-     * </table>
-     *
-     * @todo Not an AMQP exception as no status code.
-     *
-     * @todo Move to top level, used outside this class.
-     */
-    public static final class ExistingExclusiveSubscription extends AMQException
-    {
-
-        public ExistingExclusiveSubscription()
-        {
-            super("");
-        }
-    }
-
-    /**
-     * ExistingSubscriptionPreventsExclusive signals a failure to create an exclusize subscription, as a subscription
-     * already exists.
-     *
-     * <p/><table id="crc"><caption>CRC Card</caption>
-     * <tr><th> Responsibilities <th> Collaborations
-     * <tr><td> Represent failure to create an exclusize subscription, as a subscription already exists.
-     * </table>
-     *
-     * @todo Not an AMQP exception as no status code.
-     *
-     * @todo Move to top level, used outside this class.
-     */
-    public static final class ExistingSubscriptionPreventsExclusive extends AMQException
-    {
-        public ExistingSubscriptionPreventsExclusive()
-        {
-            super("");
-        }
-    }
-
-    private static final Logger _logger = Logger.getLogger(AMQQueue.class);
-
-    private final AMQShortString _name;
-
-    /** null means shared */
-    private final AMQShortString _owner;
-
-    private final boolean _durable;
-
-    /** If true, this queue is deleted when the last subscriber is removed */
-    private final boolean _autoDelete;
-
-    /** Holds subscribers to the queue. */
-    private final SubscriptionSet _subscribers;
-
-    private final SubscriptionFactory _subscriptionFactory;
-
-    private final AtomicInteger _subscriberCount = new AtomicInteger();
-
-    private final AtomicBoolean _isExclusive = new AtomicBoolean();
-
-    private final AtomicBoolean _deleted = new AtomicBoolean(false);
-
-    private List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
-
-    /** Manages message delivery. */
-    private final DeliveryManager _deliveryMgr;
-
-    /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
-    private final ExchangeBindings _bindings = new ExchangeBindings(this);
-
-    /** Executor on which asynchronous delivery will be carriedout where required */
-    private final Executor _asyncDelivery;
-
-    private final AMQQueueMBean _managedObject;
-
-    private final VirtualHost _virtualHost;
-
-    /** max allowed size(KB) of a single message */
-    @Configured(path = "maximumMessageSize", defaultValue = "0")
-    public long _maximumMessageSize;
-
-    /** max allowed number of messages on a queue. */
-    @Configured(path = "maximumMessageCount", defaultValue = "0")
-    public long _maximumMessageCount;
-
-    /** max queue depth for the queue */
-    @Configured(path = "maximumQueueDepth", defaultValue = "0")
-    public long _maximumQueueDepth;
-
-    /** maximum message age before alerts occur */
-    @Configured(path = "maximumMessageAge", defaultValue = "0")
-    public long _maximumMessageAge;
-
-    /** the minimum interval between sending out consequetive alerts of the same type */
-    @Configured(path = "minimumAlertRepeatGap", defaultValue = "0")
-    public long _minimumAlertRepeatGap;
-
-    /** total messages received by the queue since startup. */
-    public AtomicLong _totalMessagesReceived = new AtomicLong();
-
-
-    private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
-
-
-    public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
-            throws AMQException
-    {
-        this(name, durable, owner, autoDelete, virtualHost, AsyncDeliveryConfig.getAsyncDeliveryExecutor(),
-             new SubscriptionSet(), new SubscriptionImpl.Factory());
-    }
-
-    protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete,
-                       VirtualHost virtualHost, SubscriptionSet subscribers) throws AMQException
-    {
-        this(name, durable, owner, autoDelete, virtualHost, AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers,
-             new SubscriptionImpl.Factory());
-    }
-
-    protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete,
-                       VirtualHost virtualHost, Executor asyncDelivery, SubscriptionSet subscribers,
-                       SubscriptionFactory subscriptionFactory) throws AMQException
-    {
-        if (name == null)
-        {
-            throw new IllegalArgumentException("Queue name must not be null");
-        }
-
-        if (virtualHost == null)
-        {
-            throw new IllegalArgumentException("Virtual Host must not be null");
-        }
-
-        _name = name;
-        _durable = durable;
-        _owner = owner;
-        _autoDelete = autoDelete;
-        _virtualHost = virtualHost;
-        _asyncDelivery = asyncDelivery;
-
-        _managedObject = createMBean();
-        _managedObject.register();
-
-        _subscribers = subscribers;
-        _subscriptionFactory = subscriptionFactory;
-        _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
-
-        // This ensure that the notification checks for the configured alerts are created.
-        setMaximumMessageAge(_maximumMessageAge);
-        setMaximumMessageCount(_maximumMessageCount);
-        setMaximumMessageSize(_maximumMessageSize);
-        setMaximumQueueDepth(_maximumQueueDepth);
-
-    }
-
-    private AMQQueueMBean createMBean() throws AMQException
-    {
-        try
-        {
-            return new AMQQueueMBean(this);
-        }
-        catch (JMException ex)
-        {
-            throw new AMQException("AMQQueue MBean creation has failed ", ex);
-        }
-    }
-
-    public final AMQShortString getName()
-    {
-        return _name;
-    }
+import java.util.List;
+import java.util.Set;
 
-    public boolean isShared()
-    {
-        return _owner == null;
-    }
+public interface AMQQueue extends Managable, Comparable<AMQQueue>
+{
 
-    public boolean isDurable()
-    {
-        return _durable;
-    }
+    AMQShortString getName();
 
-    public AMQShortString getOwner()
-    {
-        return _owner;
-    }
+    boolean isDurable();
 
-    public boolean isAutoDelete()
-    {
-        return _autoDelete;
-    }
+    boolean isAutoDelete();
 
-    public boolean isDeleted()
-    {
-        return _deleted.get();
-    }    
+    AMQShortString getOwner();
 
-    /** @return no of messages(undelivered) on the queue. */
-    public int getMessageCount()
-    {
-        return _deliveryMgr.getQueueMessageCount();
-    }
+    VirtualHost getVirtualHost();
 
-    /** @return List of messages(undelivered) on the queue. */
-    public List<QueueEntry> getMessagesOnTheQueue()
-    {
-        return _deliveryMgr.getMessages();
-    }
 
-    /**
-     * Returns messages within the given range of message Ids.
-     *
-     * @param fromMessageId
-     * @param toMessageId
-     *
-     * @return List of messages
-     */
-    public List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId)
-    {
-        return _deliveryMgr.getMessages(fromMessageId, toMessageId);
-    }
+    void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException;
 
-    public long getQueueDepth()
-    {
-        return _deliveryMgr.getTotalMessageSize();
-    }
+    void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException;
 
-    /**
-     * @param messageId
-     *
-     * @return QueueEntry with give id if exists. null if QueueEntry with given id doesn't exist.
-     */
-    public QueueEntry getMessageOnTheQueue(long messageId)
-    {
-        List<QueueEntry> list = getMessagesOnTheQueue(messageId, messageId);
-        if ((list == null) || (list.size() == 0))
-        {
-            return null;
-        }
+    List<ExchangeBinding> getExchangeBindings();
 
-        return list.get(0);
-    }
 
-    /**
-     * Moves messages from this queue to another queue, and also commits the move on the message store. Delivery activity
-     * on the queues being moved between is suspended during the move.
-     *
-     * @param fromMessageId The first message id to move.
-     * @param toMessageId   The last message id to move.
-     * @param queueName     The queue to move the messages to.
-     * @param storeContext  The context of the message store under which to perform the move. This is associated with
-     *                      the stores transactional context.
-     */
-    public synchronized void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
-                                                        StoreContext storeContext)
-    {
-        AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+    void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException;
 
-        MessageStore fromStore = getVirtualHost().getMessageStore();
-        MessageStore toStore = toQueue.getVirtualHost().getMessageStore();
+    void unregisterSubscription(final Subscription subscription) throws AMQException;
 
-        if (toStore != fromStore)
-        {
-            throw new RuntimeException("Can only move messages between queues on the same message store.");
-        }
 
-        try
-        {
-            // Obtain locks to prevent activity on the queues being moved between.
-            startMovingMessages();
-            toQueue.startMovingMessages();
-
-            // Get the list of messages to move.
-            List<QueueEntry> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
-
-            try
-            {
-                fromStore.beginTran(storeContext);
-
-                // Move the messages in on the message store.
-                for (QueueEntry entry : foundMessagesList)
-                {
-                    AMQMessage message = entry.getMessage();
-                    fromStore.dequeueMessage(storeContext, _name, message.getMessageId());
-                    toStore.enqueueMessage(storeContext, toQueue._name, message.getMessageId());
-                }
-
-                // Commit and flush the move transcations.
-                try
-                {
-                    fromStore.commitTran(storeContext);
-                }
-                catch (AMQException e)
-                {
-                    throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
-                }
-
-                // Move the messages on the in-memory queues.
-                toQueue.enqueueMovedMessages(storeContext, foundMessagesList);
-                _deliveryMgr.removeMovedMessages(foundMessagesList);
-            }
-            // Abort the move transactions on move failures.
-            catch (AMQException e)
-            {
-                try
-                {
-                    fromStore.abortTran(storeContext);
-                }
-                catch (AMQException ae)
-                {
-                    throw new RuntimeException("Failed to abort transaction whilst moving messages on message store.", ae);
-                }
-            }
-        }
-        // Release locks to allow activity on the queues being moved between to continue.
-        finally
-        {
-            toQueue.stopMovingMessages();
-            stopMovingMessages();
-        }
-    }
+    int getConsumerCount();
 
-    /**
-     * Copies messages on this queue to another queue, and also commits the move on the message store. Delivery activity
-     * on the queues being moved between is suspended during the move.
-     *
-     * @param fromMessageId The first message id to move.
-     * @param toMessageId   The last message id to move.
-     * @param queueName     The queue to move the messages to.
-     * @param storeContext  The context of the message store under which to perform the move. This is associated with
-     *                      the stores transactional context.
-     */
-    public synchronized void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
-                                                        StoreContext storeContext)
-    {
-        AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
-
-        MessageStore fromStore = getVirtualHost().getMessageStore();
-        MessageStore toStore = toQueue.getVirtualHost().getMessageStore();
-
-        if (toStore != fromStore)
-        {
-            throw new RuntimeException("Can only move messages between queues on the same message store.");
-        }
-
-        try
-        {
-            // Obtain locks to prevent activity on the queues being moved between.
-            startMovingMessages();
-            toQueue.startMovingMessages();
-
-            // Get the list of messages to move.
-            List<QueueEntry> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
-
-            try
-            {
-                fromStore.beginTran(storeContext);
-
-                // Move the messages in on the message store.
-                for (QueueEntry entry : foundMessagesList)
-                {
-                    AMQMessage message = entry.getMessage();
-                    toStore.enqueueMessage(storeContext, toQueue._name, message.getMessageId());
-                    message.takeReference();
-                }
-
-                // Commit and flush the move transcations.
-                try
-                {
-                    fromStore.commitTran(storeContext);
-                }
-                catch (AMQException e)
-                {
-                    throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
-                }
-
-                // Move the messages on the in-memory queues.
-                toQueue.enqueueMovedMessages(storeContext, foundMessagesList);
-            }
-            // Abort the move transactions on move failures.
-            catch (AMQException e)
-            {
-                try
-                {
-                    fromStore.abortTran(storeContext);
-                }
-                catch (AMQException ae)
-                {
-                    throw new RuntimeException("Failed to abort transaction whilst moving messages on message store.", ae);
-                }
-            }
-        }
-        // Release locks to allow activity on the queues being moved between to continue.
-        finally
-        {
-            toQueue.stopMovingMessages();
-            stopMovingMessages();
-        }
-    }
-
-    /**
-     * Removes messages from this queue, and also commits the remove on the message store. Delivery activity
-     * on the queues being moved between is suspended during the remove.
-     *
-     * @param fromMessageId The first message id to move.
-     * @param toMessageId   The last message id to move.
-     * @param storeContext  The context of the message store under which to perform the move. This is associated with
-     *                      the stores transactional context.
-     */
-    public synchronized void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext)
-    {
-        MessageStore fromStore = getVirtualHost().getMessageStore();
-
-        try
-        {
-            // Obtain locks to prevent activity on the queues being moved between.
-            startMovingMessages();
-
-            // Get the list of messages to move.
-            List<QueueEntry> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
-
-            try
-            {
-                fromStore.beginTran(storeContext);
-
-                // remove the messages in on the message store.
-                for (QueueEntry entry : foundMessagesList)
-                {
-                    AMQMessage message = entry.getMessage();
-                    fromStore.dequeueMessage(storeContext, _name, message.getMessageId());
-                }
-
-                // Commit and flush the move transcations.
-                try
-                {
-                    fromStore.commitTran(storeContext);
-                }
-                catch (AMQException e)
-                {
-                    throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
-                }
-
-                // remove the messages on the in-memory queues.
-                _deliveryMgr.removeMovedMessages(foundMessagesList);
-            }
-            // Abort the move transactions on move failures.
-            catch (AMQException e)
-            {
-                try
-                {
-                    fromStore.abortTran(storeContext);
-                }
-                catch (AMQException ae)
-                {
-                    throw new RuntimeException("Failed to abort transaction whilst moving messages on message store.", ae);
-                }
-            }
-        }
-        // Release locks to allow activity on the queues being moved between to continue.
-        finally
-        {
-            stopMovingMessages();
-        }
-    }
+    int getActiveConsumerCount();
 
-    public void startMovingMessages()
-    {
-        _deliveryMgr.startMovingMessages();
-    }
+    boolean isUnused();
 
-    private void enqueueMovedMessages(StoreContext storeContext, List<QueueEntry> messageList)
-    {
-        _deliveryMgr.enqueueMovedMessages(storeContext, messageList);
-        _totalMessagesReceived.addAndGet(messageList.size());
-    }
+    boolean isEmpty();
 
-    public void stopMovingMessages()
-    {
-        _deliveryMgr.stopMovingMessages();
-        _deliveryMgr.processAsync(_asyncDelivery);
-    }
+    int getMessageCount();
 
-    /** @return MBean object associated with this Queue */
-    public ManagedObject getManagedObject()
-    {
-        return _managedObject;
-    }
+    int getUndeliveredMessageCount();
 
-    public long getMaximumMessageSize()
-    {
-        return _maximumMessageSize;
-    }
 
-    public void setMaximumMessageSize(final long maximumMessageSize)
-    {
-        _maximumMessageSize = maximumMessageSize;
-        if(maximumMessageSize == 0L)
-        {
-            _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT);
-        }
-        else
-        {
-            _notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT);
-        }
-    }
+    long getQueueDepth();
 
-    public int getConsumerCount()
-    {
-        return _subscribers.size();
-    }
+    long getReceivedMessageCount();
 
-    public int getActiveConsumerCount()
-    {
-        return _subscribers.getWeight();
-    }
+    long getOldestMessageArrivalTime();
 
-    public long getReceivedMessageCount()
-    {
-        return _totalMessagesReceived.get();
-    }
+    boolean isDeleted();
 
-    public long getMaximumMessageCount()
-    {
-        return _maximumMessageCount;
-    }
 
-    public void setMaximumMessageCount(final long maximumMessageCount)
-    {
-        _maximumMessageCount = maximumMessageCount;
-        if(maximumMessageCount == 0L)
-        {
-            _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT);
-        }
-        else
-        {
-            _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT);
-        }
+    int delete() throws AMQException;
 
 
+    QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException;
 
-    }
+    void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException;
 
-    public long getMaximumQueueDepth()
-    {
-        return _maximumQueueDepth;
-    }
+    void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException;
 
-    // Sets the queue depth, the max queue size
-    public void setMaximumQueueDepth(final long maximumQueueDepth)
-    {
-        _maximumQueueDepth = maximumQueueDepth;
-        if(maximumQueueDepth == 0L)
-        {
-            _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT);
-        }
-        else
-        {
-            _notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT);
-        }
 
-    }
 
-    public long getOldestMessageArrivalTime()
-    {
-        return _deliveryMgr.getOldestMessageArrival();
+    boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
 
-    }
+    
 
-    /** Removes the QueueEntry from the top of the queue. */
-    public synchronized void deleteMessageFromTop(StoreContext storeContext) throws AMQException
-    {
-        _deliveryMgr.removeAMessageFromTop(storeContext, this);
-    }
+    void addQueueDeleteTask(final Task task);
 
-    /** removes all the messages from the queue. */
-    public synchronized long clearQueue(StoreContext storeContext) throws AMQException
-    {
-        return _deliveryMgr.clearAllMessages(storeContext);
-    }
 
-    public void bind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException
-    {
-        exchange.registerQueue(routingKey, this, arguments);
-        if (isDurable() && exchange.isDurable())
-        {
-            _virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments);
-        }
+    List<QueueEntry> getMessagesOnTheQueue();
 
-        _bindings.addBinding(routingKey, arguments, exchange);
-    }
+    List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId);
 
-    public void unBind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException
-    {
-        exchange.deregisterQueue(routingKey, this, arguments);
-        if (isDurable() && exchange.isDurable())
-        {
-            _virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments);
-        }
+    List<Long> getMessagesOnTheQueue(int num);
 
-        _bindings.remove(routingKey, arguments, exchange);
-    }
+    List<Long> getMessagesOnTheQueue(int num, int offest);
 
-    public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks,
-                                        FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException
-    {
-        if (incrementSubscriberCount() > 1)
-        {
-            if (isExclusive())
-            {
-                decrementSubscriberCount();
-                throw new ExistingExclusiveSubscription();
-            }
-            else if (exclusive)
-            {
-                decrementSubscriberCount();
-                throw new ExistingSubscriptionPreventsExclusive();
-            }
+    QueueEntry getMessageOnTheQueue(long messageId);
 
-        }
-        else if (exclusive)
-        {
-            setExclusive(true);
-        }
 
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug(MessageFormat.format("Registering protocol session {0} with channel {1} and "
-                                               + "consumer tag {2} with {3}", ps, channel, consumerTag, this));
-        }
+    void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
+                                                        StoreContext storeContext);
 
-        Subscription subscription =
-                _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this);
+    void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, StoreContext storeContext);
 
-        if (subscription.filtersMessages())
-        {
-            if (_deliveryMgr.hasQueuedMessages())
-            {
-                _deliveryMgr.populatePreDeliveryQueue(subscription);
-            }
-        }
+    void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext);
 
-        _subscribers.addSubscriber(subscription);
-        if(exclusive)
-        {
-            _subscribers.setExclusive(true);
-        }
 
-        subscription.start();
-    }
 
-    private boolean isExclusive()
-    {
-        return _isExclusive.get();
-    }
+    long getMaximumMessageSize();
 
-    private void setExclusive(boolean exclusive)
-    {
-        _isExclusive.set(exclusive);
-    }
+    void setMaximumMessageSize(long value);
 
-    private int incrementSubscriberCount()
-    {
-        return _subscriberCount.incrementAndGet();
-    }
 
-    private int decrementSubscriberCount()
-    {
-        return _subscriberCount.decrementAndGet();
-    }
+    long getMaximumMessageCount();
 
-    public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) throws AMQException
-    {
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug(MessageFormat.format(
-                    "Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}",
-                    ps, channel, consumerTag, this));
-        }
+    void setMaximumMessageCount(long value);
 
-        _subscribers.setExclusive(false);
-        Subscription removedSubscription;
-        if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps,
-                                                                                                         consumerTag)))
-            == null)
-        {
-            throw new AMQException("Protocol session with channel " + channel + " and consumer tag " + consumerTag
-                                   + " and protocol session key " + ps.getKey() + " not registered with queue " + this);
-        }
 
-        removedSubscription.close();
-        setExclusive(false);
-        decrementSubscriberCount();
+    long getMaximumQueueDepth();
 
-        // if we are eligible for auto deletion, unregister from the queue registry
-        if (_autoDelete && _subscribers.isEmpty())
-        {
-            if (_logger.isInfoEnabled())
-            {
-                _logger.info("Auto-deleteing queue:" + this);
-            }
-
-            autodelete();
-            // we need to manually fire the event to the removed subscription (which was the last one left for this
-            // queue. This is because the delete method uses the subscription set which has just been cleared
-            removedSubscription.queueDeleted(this);
-        }
-    }
+    void setMaximumQueueDepth(long value);
 
-    public boolean isUnused()
-    {
-        return _subscribers.isEmpty();
-    }
 
-    public boolean isEmpty()
-    {
-        return !_deliveryMgr.hasQueuedMessages();
-    }
+    long getMaximumMessageAge();
 
-    public int delete(boolean checkUnused, boolean checkEmpty) throws AMQException
-    {
-        if (checkUnused && !_subscribers.isEmpty())
-        {
-            _logger.info("Will not delete " + this + " as it is in use.");
+    void setMaximumMessageAge(final long maximumMessageAge);
 
-            return 0;
-        }
-        else if (checkEmpty && _deliveryMgr.hasQueuedMessages())
-        {
-            _logger.info("Will not delete " + this + " as it is not empty.");
 
-            return 0;
-        }
-        else
-        {
-            delete();
+    long getMinimumAlertRepeatGap();
 
-            return _deliveryMgr.getQueueMessageCount();
-        }
-    }
 
-    public void delete() throws AMQException
-    {
-        if (!_deleted.getAndSet(true))
-        {
-            _subscribers.queueDeleted(this);
-            _bindings.deregister();
-            _virtualHost.getQueueRegistry().unregisterQueue(_name);
-            _managedObject.unregister();
-            for (Task task : _deleteTaskList)
-            {
-                task.doTask(this);
-            }
+    void deleteMessageFromTop(StoreContext storeContext) throws AMQException;
 
-            _deleteTaskList.clear();
-        }
-    }
+    long clearQueue(StoreContext storeContext) throws AMQException;
 
-    protected void autodelete() throws AMQException
-    {
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug(MessageFormat.format("autodeleting {0}", this));
-        }
 
-        delete();
-    }
 
-    /*public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
-    {
-        // fixme not sure what this is doing. should we be passing deliverFirst through here?
-        // This code is not used so when it is perhaps it should
-        _deliveryMgr.deliver(storeContext, getName(), msg, deliverFirst);
-        try
-        {
-            msg.checkDeliveredToConsumer();
-            updateReceivedMessageCount(msg);
-        }
-        catch (NoConsumersException e)
-        {
-            // as this message will be returned, it should be removed
-            // from the queue:
-            dequeue(storeContext, msg);
-        }
-    }*/
+    void removeExpiredIfNoSubscribers() throws AMQException;
 
-    // public DeliveryManager getDeliveryManager()
-    // {
-    // return _deliveryMgr;
-    // }
+    Set<NotificationCheck> getNotificationChecks();
 
-    public void process(StoreContext storeContext, QueueEntry entry, boolean deliverFirst) throws AMQException
-    {
-        AMQMessage msg = entry.getMessage();
-        _deliveryMgr.deliver(storeContext, _name, entry, deliverFirst);
-        try
-        {
-            msg.checkDeliveredToConsumer();
-            updateReceivedMessageCount(entry);
-        }
-        catch (NoConsumersException e)
-        {
-            // as this message will be returned, it should be removed
-            // from the queue:
-            dequeue(storeContext, entry);
-        }
-    }
+    void flushSubscription(final Subscription sub) throws AMQException;
 
-    public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
-    {
-        try
-        {
-            entry.getMessage().dequeue(storeContext, this);
-        }
-        catch (MessageCleanupException e)
-        {
-            // Message was dequeued, but could not then be deleted
-            // though it is no longer referenced. This should be very
-            // rare and can be detected and cleaned up on recovery or
-            // done through some form of manual intervention.
-            _logger.error(e, e);
-        }
-        catch (AMQException e)
-        {
-            throw new FailedDequeueException(_name.toString(), e);
-        }
-    }
+    void deliverAsync(final Subscription sub);
 
-    public void deliverAsync()
-    {
-        _deliveryMgr.processAsync(_asyncDelivery);
-    }
+    void deliverAsync();
 
-    protected SubscriptionManager getSubscribers()
-    {
-        return _subscribers;
-    }
 
-    protected void updateReceivedMessageCount(QueueEntry entry) throws AMQException
+    /**
+     * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
+     * already exists.
+     *
+     * <p/><table id="crc"><caption>CRC Card</caption>
+     * <tr><th> Responsibilities <th> Collaborations
+     * <tr><td> Represent failure to create a subscription, because an exclusive subscription already exists.
+     * </table>
+     *
+     * @todo Not an AMQP exception as no status code.
+     *
+     * @todo Move to top level, used outside this class.
+     */
+    static final class ExistingExclusiveSubscription extends AMQException
     {
-        AMQMessage msg = entry.getMessage();
-
-        if (!msg.isRedelivered())
-        {
-            _totalMessagesReceived.incrementAndGet();
-        }
 
-        try
-        {
-            _managedObject.checkForNotification(msg);
-        }
-        catch (JMException e)
+        public ExistingExclusiveSubscription()
         {
-            throw new AMQException("Unable to get notification from manage queue: " + e, e);
+            super("");
         }
     }
 
-    public boolean equals(Object o)
+    /**
+     * ExistingSubscriptionPreventsExclusive signals a failure to create an exclusize subscription, as a subscription
+     * already exists.
+     *
+     * <p/><table id="crc"><caption>CRC Card</caption>
+     * <tr><th> Responsibilities <th> Collaborations
+     * <tr><td> Represent failure to create an exclusize subscription, as a subscription already exists.
+     * </table>
+     *
+     * @todo Not an AMQP exception as no status code.
+     *
+     * @todo Move to top level, used outside this class.
+     */
+    static final class ExistingSubscriptionPreventsExclusive extends AMQException
     {
-        if (this == o)
-        {
-            return true;
-        }
-
-        if ((o == null) || (getClass() != o.getClass()))
+        public ExistingSubscriptionPreventsExclusive()
         {
-            return false;
+            super("");
         }
-
-        final AMQQueue amqQueue = (AMQQueue) o;
-
-        return (_name.equals(amqQueue._name));
-    }
-
-    public int hashCode()
-    {
-        return _name.hashCode();
     }
 
-    public String toString()
-    {
-        return "Queue(" + _name + ")@" + System.identityHashCode(this);
-    }
-
-    public boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException
-    {
-        return _deliveryMgr.performGet(session, channel, acks);
-    }
-
-    public QueueRegistry getQueueRegistry()
-    {
-        return _virtualHost.getQueueRegistry();
-    }
-
-    public VirtualHost getVirtualHost()
-    {
-        return _virtualHost;
-    }
-
-    public static interface Task
+    static interface Task
     {
         public void doTask(AMQQueue queue) throws AMQException;
     }
-
-    public void addQueueDeleteTask(Task task)
-    {
-        _deleteTaskList.add(task);
-    }
-
-    public long getMinimumAlertRepeatGap()
-    {
-        return _minimumAlertRepeatGap;
-    }
-
-    public void setMinimumAlertRepeatGap(long minimumAlertRepeatGap)
-    {
-        _minimumAlertRepeatGap = minimumAlertRepeatGap;
-    }
-
-    public long getMaximumMessageAge()
-    {
-        return _maximumMessageAge;
-    }
-
-    public void setMaximumMessageAge(long maximumMessageAge)
-    {
-        _maximumMessageAge = maximumMessageAge;
-        if(maximumMessageAge == 0L)
-        {
-            _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT);
-        }
-        else
-        {
-            _notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT);
-        }
-    }
-
-    public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, QueueEntry entry)
-    {
-        _deliveryMgr.subscriberHasPendingResend(hasContent, subscription, entry);
-    }
-
-    public QueueEntry createEntry(AMQMessage amqMessage)
-    {
-        return new QueueEntry(this, amqMessage);
-    }
-
-    public int compareTo(Object o)
-    {
-        return _name.compareTo(((AMQQueue) o).getName());
-    }
-
-
-    public void removeExpiredIfNoSubscribers() throws AMQException
-    {
-        synchronized(_subscribers.getChangeLock())
-        {
-            if(_subscribers.isEmpty())
-            {
-                _deliveryMgr.removeExpired();
-            }
-        }
-    }
-
-    public final Set<NotificationCheck> getNotificationChecks()
-    {
-        return _notificationChecks;
-    }
 }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Thu Aug 14 20:40:49 2008
@@ -292,7 +292,7 @@
     }
 
     /**
-     * @see org.apache.qpid.server.queue.AMQQueue#deleteMessageFromTop
+     * @see AMQQueue#deleteMessageFromTop
      */
     public void deleteMessageFromTop() throws JMException
     {
@@ -307,7 +307,7 @@
     }
 
     /**
-     * @see org.apache.qpid.server.queue.AMQQueue#clearQueue
+     * @see AMQQueue#clearQueue
      */
     public void clearQueue() throws JMException
     {

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java Thu Aug 14 20:40:49 2008
@@ -36,59 +36,6 @@
  */
 class ExchangeBindings
 {
-    private static final FieldTable EMPTY_ARGUMENTS = new FieldTable();
-
-    static class ExchangeBinding
-    {
-        private final Exchange _exchange;
-        private final AMQShortString _routingKey;
-        private final FieldTable _arguments;
-
-        ExchangeBinding(AMQShortString routingKey, Exchange exchange)
-        {
-            this(routingKey, exchange, EMPTY_ARGUMENTS);
-        }
-
-        ExchangeBinding(AMQShortString routingKey, Exchange exchange, FieldTable arguments)
-        {
-            _routingKey = routingKey == null ? AMQShortString.EMPTY_STRING : routingKey;
-            _exchange = exchange;
-            _arguments = arguments == null ? EMPTY_ARGUMENTS : arguments;
-        }
-
-        void unbind(AMQQueue queue) throws AMQException
-        {
-            _exchange.deregisterQueue(_routingKey, queue, _arguments);
-        }
-
-        public Exchange getExchange()
-        {
-            return _exchange;
-        }
-
-        public AMQShortString getRoutingKey()
-        {
-            return _routingKey;
-        }
-
-        public int hashCode()
-        {
-            return (_exchange == null ? 0 : _exchange.hashCode())
-                   + (_routingKey == null ? 0 : _routingKey.hashCode());
-        }
-
-        public boolean equals(Object o)
-        {
-            if (!(o instanceof ExchangeBinding))
-            {
-                return false;
-            }
-            ExchangeBinding eb = (ExchangeBinding) o;
-            return _exchange.equals(eb._exchange)
-                   && _routingKey.equals(eb._routingKey);
-        }
-    }
-
     private final List<ExchangeBinding> _bindings = new CopyOnWriteArrayList<ExchangeBinding>();
     private final AMQQueue _queue;
 
@@ -109,9 +56,9 @@
     }
 
 
-    public void remove(AMQShortString routingKey, FieldTable arguments, Exchange exchange)
+    public boolean remove(AMQShortString routingKey, FieldTable arguments, Exchange exchange)
     {
-        _bindings.remove(new ExchangeBinding(routingKey, exchange, arguments));
+        return _bindings.remove(new ExchangeBinding(routingKey, exchange, arguments));
     }
 
 

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java Thu Aug 14 20:40:49 2008
@@ -22,11 +22,10 @@
 
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Collections;
 import java.util.ArrayList;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.ContentChunk;
@@ -41,32 +40,40 @@
 
     private MessagePublishInfo _messagePublishInfo;
 
-    private List<ContentChunk> _contentBodies = new ArrayList<ContentChunk>();
+    private List<ContentChunk> _contentBodies;
 
     private boolean _redelivered;
 
     private long _arrivalTime;
 
-    public InMemoryMessageHandle()
+    private final Long _messageId;
+
+    public InMemoryMessageHandle(final Long messageId)
     {
+        _messageId = messageId;
     }
 
-    public ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId) throws AMQException
+    public ContentHeaderBody getContentHeaderBody(StoreContext context) throws AMQException
     {
         return _contentHeaderBody;
     }
 
-    public int getBodyCount(StoreContext context, Long messageId)
+    public Long getMessageId()
+    {
+        return _messageId;
+    }
+
+    public int getBodyCount(StoreContext context)
     {
         return _contentBodies.size();
     }
 
-    public long getBodySize(StoreContext context, Long messageId) throws AMQException
+    public long getBodySize(StoreContext context) throws AMQException
     {
-        return getContentHeaderBody(context, messageId).bodySize;
+        return getContentHeaderBody(context).bodySize;
     }
 
-    public ContentChunk getContentChunk(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException
+    public ContentChunk getContentChunk(StoreContext context, int index) throws AMQException, IllegalArgumentException
     {
         if (index > _contentBodies.size() - 1)
         {
@@ -76,13 +83,28 @@
         return _contentBodies.get(index);
     }
 
-    public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentChunk contentBody, boolean isLastContentBody)
+    public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentBody, boolean isLastContentBody)
             throws AMQException
     {
-        _contentBodies.add(contentBody);
+        if(_contentBodies == null)
+        {
+            if(isLastContentBody)
+            {
+                _contentBodies = Collections.singletonList(contentBody);
+            }
+            else
+            {
+                _contentBodies = new ArrayList<ContentChunk>();
+                _contentBodies.add(contentBody);
+            }
+        }
+        else
+        {
+            _contentBodies.add(contentBody);
+        }
     }
 
-    public MessagePublishInfo getMessagePublishInfo(StoreContext context, Long messageId) throws AMQException
+    public MessagePublishInfo getMessagePublishInfo(StoreContext context) throws AMQException
     {
         return _messagePublishInfo;
     }
@@ -98,12 +120,9 @@
         _redelivered = redelivered;
     }
 
-    public boolean isPersistent(StoreContext context, Long messageId) throws AMQException
+    public boolean isPersistent()
     {
-        //todo remove literal values to a constant file such as AMQConstants in common
-        ContentHeaderBody chb = getContentHeaderBody(context, messageId);
-        return chb.properties instanceof BasicContentHeaderProperties &&
-               ((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2;
+        return false;
     }
 
     /**
@@ -112,26 +131,20 @@
      * @param contentHeaderBody
      * @throws AMQException
      */
-    public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, MessagePublishInfo messagePublishInfo,
+    public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo,
                                                ContentHeaderBody contentHeaderBody)
             throws AMQException
     {
         _messagePublishInfo = messagePublishInfo;
         _contentHeaderBody = contentHeaderBody;
+        if(contentHeaderBody.bodySize == 0)
+        {
+            _contentBodies = Collections.EMPTY_LIST;
+        }
         _arrivalTime = System.currentTimeMillis();
     }
 
-    public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
-    {
-        // NO OP
-    }
-
-    public void enqueue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException
-    {
-        // NO OP
-    }
-
-    public void dequeue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException
+    public void removeMessage(StoreContext storeContext) throws AMQException
     {
         // NO OP
     }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java Thu Aug 14 20:40:49 2008
@@ -126,7 +126,7 @@
      * @param age  maximum age of message.
      * @throws IOException
      */
-    @MBeanAttribute(name="MaximumMessageAge", description="Threshold high value for message age on thr broker")
+    @MBeanAttribute(name="MaximumMessageAge", description="Threshold high value for message age on the broker")
     void setMaximumMessageAge(Long age) throws IOException;
 
     /**

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java Thu Aug 14 20:40:49 2008
@@ -36,11 +36,11 @@
         // just hardcoded for now
         if (persistent)
         {
-            return new WeakReferenceMessageHandle(store);
+            return new WeakReferenceMessageHandle(messageId, store);
         }
         else
         {
-            return new InMemoryMessageHandle();
+            return new InMemoryMessageHandle(messageId);
         }
     }
 }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java Thu Aug 14 20:40:49 2008
@@ -1,129 +1,138 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.AMQException;
-
-public enum NotificationCheck
-{
-
-    MESSAGE_COUNT_ALERT
-    {
-        boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
-        {
-            int msgCount;
-            final long maximumMessageCount = queue.getMaximumMessageCount();
-            if (maximumMessageCount!= 0 && (msgCount =  queue.getMessageCount()) >= maximumMessageCount)
-            {
-                listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.");
-                return true;
-            }
-            return false;
-        }
-    },
-    MESSAGE_SIZE_ALERT(true)
-    {
-        boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
-        {
-            final long maximumMessageSize = queue.getMaximumMessageSize();
-            if(maximumMessageSize != 0)
-            {
-                // Check for threshold message size
-                long messageSize = (msg == null) ? 0 : msg.getSize();
-
-                if (messageSize >= maximumMessageSize)
-                {
-                    listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageId() + "]");
-                    return true;
-                }
-            }
-            return false;
-        }
-
-    },
-    QUEUE_DEPTH_ALERT
-    {
-        boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
-        {
-            // Check for threshold queue depth in bytes
-            final long maximumQueueDepth = queue.getMaximumQueueDepth();
-
-            if(maximumQueueDepth != 0)
-            {
-                final long queueDepth = queue.getQueueDepth();
-
-                if (queueDepth >= maximumQueueDepth)
-                {
-                    listener.notifyClients(this, queue, (queueDepth>>10) + "Kb : Maximum queue depth threshold ("+(maximumQueueDepth>>10)+"Kb) breached.");
-                    return true;
-                }
-            }
-            return false;
-        }
-
-    },
-    MESSAGE_AGE_ALERT
-    {
-        boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
-        {
-
-            final long maxMessageAge = queue.getMaximumMessageAge();
-            if(maxMessageAge != 0)
-            {
-                final long currentTime = System.currentTimeMillis();
-                final long thresholdTime = currentTime - maxMessageAge;
-                final long firstArrivalTime = queue.getOldestMessageArrivalTime();
-
-                if(firstArrivalTime < thresholdTime)
-                {
-                    long oldestAge = currentTime - firstArrivalTime;
-                    listener.notifyClients(this, queue, (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached.");
-
-                    return true;
-                }
-            }
-            return false;
-                    
-        }
-
-    }
-    ;
-
-    private final boolean _messageSpecific;
-
-    NotificationCheck()
-    {
-        this(false);
-    }
-
-    NotificationCheck(boolean messageSpecific)
-    {
-        _messageSpecific = messageSpecific;
-    }
-
-    public boolean isMessageSpecific()
-    {
-        return _messageSpecific;
-    }
-
-    abstract boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener);
-
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+public enum NotificationCheck
+{
+
+    MESSAGE_COUNT_ALERT
+    {
+        boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+        {
+            int msgCount;
+            final long maximumMessageCount = queue.getMaximumMessageCount();
+            if (maximumMessageCount!= 0 && (msgCount =  queue.getMessageCount()) >= maximumMessageCount)
+            {
+                listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.");
+                return true;
+            }
+            return false;
+        }
+    },
+    MESSAGE_SIZE_ALERT(true)
+    {
+        boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+        {
+            final long maximumMessageSize = queue.getMaximumMessageSize();
+            if(maximumMessageSize != 0)
+            {
+                // Check for threshold message size
+                long messageSize;
+                try
+                {
+                    messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
+                }
+                catch (AMQException e)
+                {
+                    messageSize = 0;
+                }
+
+
+                if (messageSize >= maximumMessageSize)
+                {
+                    listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageId() + "]");
+                    return true;
+                }
+            }
+            return false;
+        }
+
+    },
+    QUEUE_DEPTH_ALERT
+    {
+        boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+        {
+            // Check for threshold queue depth in bytes
+            final long maximumQueueDepth = queue.getMaximumQueueDepth();
+
+            if(maximumQueueDepth != 0)
+            {
+                final long queueDepth = queue.getQueueDepth();
+
+                if (queueDepth >= maximumQueueDepth)
+                {
+                    listener.notifyClients(this, queue, (queueDepth>>10) + "Kb : Maximum queue depth threshold ("+(maximumQueueDepth>>10)+"Kb) breached.");
+                    return true;
+                }
+            }
+            return false;
+        }
+
+    },
+    MESSAGE_AGE_ALERT
+    {
+        boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+        {
+
+            final long maxMessageAge = queue.getMaximumMessageAge();
+            if(maxMessageAge != 0)
+            {
+                final long currentTime = System.currentTimeMillis();
+                final long thresholdTime = currentTime - maxMessageAge;
+                final long firstArrivalTime = queue.getOldestMessageArrivalTime();
+
+                if(firstArrivalTime < thresholdTime)
+                {
+                    long oldestAge = currentTime - firstArrivalTime;
+                    listener.notifyClients(this, queue, (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached.");
+
+                    return true;
+                }
+            }
+            return false;
+                    
+        }
+
+    }
+    ;
+
+    private final boolean _messageSpecific;
+
+    NotificationCheck()
+    {
+        this(false);
+    }
+
+    NotificationCheck(boolean messageSpecific)
+    {
+        _messageSpecific = messageSpecific;
+    }
+
+    public boolean isMessageSpecific()
+    {
+        return _messageSpecific;
+    }
+
+    abstract boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener);
+
+}

Propchange: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
            ('svn:eol-style' removed)