You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/12/19 00:00:43 UTC

svn commit: r605352 [2/2] - in /incubator/qpid/branches/M2.1/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/ack/ broker/src/main/java/org/apache/qpid/server/exchange/ broker/src/main/java/org/apache/qpid/...

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Tue Dec 18 15:00:40 2007
@@ -58,9 +58,9 @@
 
     private final Object _sessionKey;
 
-    private MessageQueue<AMQMessage> _messages;
+    private MessageQueue<QueueEntry> _messages;
 
-    private Queue<AMQMessage> _resendQueue;
+    private Queue<QueueEntry> _resendQueue;
 
     private final boolean _noLocal;
 
@@ -160,7 +160,7 @@
 
         if (filtersMessages())
         {
-            _messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>();
+            _messages = new ConcurrentLinkedMessageQueueAtomicSize<QueueEntry>();
         }
         else
         {
@@ -226,7 +226,7 @@
      *
      * @throws AMQException
      */
-    public void send(AMQMessage msg, AMQQueue queue) throws AMQException
+    public void send(QueueEntry msg, AMQQueue queue) throws AMQException
     {
         if (msg != null)
         {
@@ -245,7 +245,7 @@
         }
     }
 
-    private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws AMQException
+    private void sendToBrowser(QueueEntry msg, AMQQueue queue) throws AMQException
     {
         // We don't decrement the reference here as we don't want to consume the message
         // but we do want to send it to the client.
@@ -266,11 +266,11 @@
                 _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!");
             }
 
-            protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
+            protocolSession.getProtocolOutputConverter().writeDeliver(msg.getMessage(), channel.getChannelId(), deliveryTag, consumerTag);
         }
     }
 
-    private void sendToConsumer(StoreContext storeContext, AMQMessage msg, AMQQueue queue)
+    private void sendToConsumer(StoreContext storeContext, QueueEntry entry, AMQQueue queue)
             throws AMQException
     {
         try
@@ -287,9 +287,9 @@
             {
                 if (_logger.isDebugEnabled())
                 {
-                    _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
+                    _logger.debug("No ack mode so dequeuing message immediately: " + entry.getMessage().getMessageId());
                 }
-                queue.dequeue(storeContext, msg);
+                queue.dequeue(storeContext, entry);
             }
 
             synchronized (channel)
@@ -298,19 +298,19 @@
 
                 if (_sendLock.get())
                 {
-                    _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!");
+                    _logger.error("Sending " + entry + " when subscriber(" + this + ") is closed!");
                 }
 
                 if (_acks)
                 {
-                    channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+                    channel.addUnacknowledgedMessage(entry, deliveryTag, consumerTag);
                 }
 
-                protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
+                protocolSession.getProtocolOutputConverter().writeDeliver(entry.getMessage(), channel.getChannelId(), deliveryTag, consumerTag);
 
                 if (!_acks)
                 {
-                    msg.decrementReference(storeContext);
+                    entry.getMessage().decrementReference(storeContext);
                 }
             }
         }
@@ -320,7 +320,7 @@
             // using a try->finally would set it even if an error occured.
             // Is this what we want? 
 
-            msg.setDeliveredToConsumer();
+            entry.setDeliveredToConsumer();
         }
     }
 
@@ -355,19 +355,19 @@
         return _filters != null || _noLocal;
     }
 
-    public boolean hasInterest(AMQMessage msg)
+    public boolean hasInterest(QueueEntry entry)
     {
         //check that the message hasn't been rejected
-        if (msg.isRejectedBy(this))
+        if (entry.isRejectedBy(this))
         {
             if (_logger.isDebugEnabled())
             {
-                _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + msg.debugIdentity());
+                _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + entry.debugIdentity());
             }
 //            return false;
         }
 
-        final AMQProtocolSession publisher = msg.getPublisher();
+        final AMQProtocolSession publisher = entry.getMessage().getPublisher();
 
         //todo - client id should be recoreded and this test removed but handled below
         if (_noLocal && publisher != null)
@@ -418,9 +418,9 @@
 
         if (_logger.isTraceEnabled())
         {
-            _logger.trace("(" + debugIdentity() + ") checking filters for message (" + msg.debugIdentity());
+            _logger.trace("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity());
         }
-        return checkFilters(msg);
+        return checkFilters(entry);
 
     }
 
@@ -431,7 +431,7 @@
         return id;
     }
 
-    private boolean checkFilters(AMQMessage msg)
+    private boolean checkFilters(QueueEntry msg)
     {
         if (_filters != null)
         {
@@ -439,7 +439,7 @@
 //            {
 //                _logger.trace("(" + debugIdentity() + ") has filters.");
 //            }
-            return _filters.allAllow(msg);
+            return _filters.allAllow(msg.getMessage());
         }
         else
         {
@@ -452,12 +452,12 @@
         }
     }
 
-    public Queue<AMQMessage> getPreDeliveryQueue()
+    public Queue<QueueEntry> getPreDeliveryQueue()
     {
         return _messages;
     }
 
-    public void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst)
+    public void enqueueForPreDelivery(QueueEntry msg, boolean deliverFirst)
     {
         if (_messages != null)
         {
@@ -561,19 +561,19 @@
 
             while (!_resendQueue.isEmpty())
             {
-                AMQMessage resent = _resendQueue.poll();
+                QueueEntry resent = _resendQueue.poll();
 
                 if (_logger.isTraceEnabled())
                 {
                     _logger.trace("Removed for resending:" + resent.debugIdentity());
                 }
 
-                resent.release(_queue);
+                resent.release();
                 _queue.subscriberHasPendingResend(false, this, resent);
 
                 try
                 {
-                    channel.getTransactionalContext().deliver(resent, _queue, true);
+                    channel.getTransactionalContext().deliver(resent, true);
                 }
                 catch (AMQException e)
                 {
@@ -611,22 +611,22 @@
         return _isBrowser;
     }
 
-    public boolean wouldSuspend(AMQMessage msg)
+    public boolean wouldSuspend(QueueEntry msg)
     {
-        return channel.wouldSuspend(msg);
+        return channel.wouldSuspend(msg.getMessage());
     }
 
-    public Queue<AMQMessage> getResendQueue()
+    public Queue<QueueEntry> getResendQueue()
     {
         if (_resendQueue == null)
         {
-            _resendQueue = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+            _resendQueue = new ConcurrentLinkedQueueAtomicSize<QueueEntry>();
         }
         return _resendQueue;
     }
 
 
-    public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages)
+    public Queue<QueueEntry> getNextQueue(Queue<QueueEntry> messages)
     {
         if (_resendQueue != null && !_resendQueue.isEmpty())
         {
@@ -651,7 +651,7 @@
         }
     }
 
-    public void addToResendQueue(AMQMessage msg)
+    public void addToResendQueue(QueueEntry msg)
     {
         // add to our resend queue
         getResendQueue().add(msg);

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java Tue Dec 18 15:00:40 2007
@@ -30,5 +30,5 @@
 {
     public List<Subscription> getSubscriptions();
     public boolean hasActiveSubscribers();
-    public Subscription nextSubscriber(AMQMessage msg);
+    public Subscription nextSubscriber(QueueEntry entry);
 }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java Tue Dec 18 15:00:40 2007
@@ -113,7 +113,7 @@
      * concurrently. Also note that because of race conditions and when subscriptions are removed between calls to
      * nextSubscriber, the IndexOutOfBoundsException also causes the scan to start at the beginning.
      */
-    public Subscription nextSubscriber(AMQMessage msg)
+    public Subscription nextSubscriber(QueueEntry msg)
     {
         if (_subscriptions.isEmpty())
         {
@@ -140,7 +140,7 @@
         }
     }
 
-    private Subscription nextSubscriberImpl(AMQMessage msg)
+    private Subscription nextSubscriberImpl(QueueEntry msg)
     {
         final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
         while (iterator.hasNext())

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java Tue Dec 18 15:00:40 2007
@@ -30,6 +30,7 @@
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 
@@ -64,14 +65,13 @@
 
     private static class DeliveryDetails
     {
-        public AMQMessage message;
-        public AMQQueue queue;
+        public QueueEntry entry;
+
         private boolean deliverFirst;
 
-        public DeliveryDetails(AMQMessage message, AMQQueue queue, boolean deliverFirst)
+        public DeliveryDetails(QueueEntry entry, boolean deliverFirst)
         {
-            this.message = message;
-            this.queue = queue;
+            this.entry = entry;
             this.deliverFirst = deliverFirst;
         }
     }
@@ -103,7 +103,7 @@
         _postCommitDeliveryList.clear();
     }
 
-    public void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException
+    public void deliver(QueueEntry entry, boolean deliverFirst) throws AMQException
     {
         // A publication will result in the enlisting of several
         // TxnOps. The first is an op that will store the message.
@@ -112,9 +112,9 @@
         // enqueued. Finally a cleanup op will be added to decrement
         // the reference associated with the routing.
         // message.incrementReference();
-        _postCommitDeliveryList.add(new DeliveryDetails(message, queue, deliverFirst));
+        _postCommitDeliveryList.add(new DeliveryDetails(entry, deliverFirst));
         _messageDelivered = true;
-        _txnBuffer.enlist(new CleanupMessageOperation(message, _returnMessages));
+        _txnBuffer.enlist(new CleanupMessageOperation(entry.getMessage(), _returnMessages));
         /*_txnBuffer.enlist(new DeliverMessageOperation(message, queue));
         if (_log.isDebugEnabled())
         {
@@ -242,11 +242,11 @@
         {
             for (DeliveryDetails dd : _postCommitDeliveryList)
             {
-                dd.queue.process(_storeContext, dd.message, dd.deliverFirst);
+                dd.entry.process(_storeContext, dd.deliverFirst);
 
                 try
                 {
-                    dd.message.checkDeliveredToConsumer();
+                    dd.entry.checkDeliveredToConsumer();
                 }
                 catch (NoConsumersException nce)
                 {

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Tue Dec 18 15:00:40 2007
@@ -34,6 +34,7 @@
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 
@@ -92,14 +93,14 @@
         // Does not apply to this context
     }
 
-    public void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException
+    public void deliver(QueueEntry entry, boolean deliverFirst) throws AMQException
     {
         try
         {
-            queue.process(_storeContext, message, deliverFirst);
+            entry.process(_storeContext, deliverFirst);
             //following check implements the functionality
             //required by the 'immediate' flag:
-            message.checkDeliveredToConsumer();
+            entry.checkDeliveredToConsumer();
         }
         catch (NoConsumersException e)
         {
@@ -128,7 +129,7 @@
                         {
                             if (_log.isDebugEnabled())
                             {
-                                _log.debug("Discarding message: " + message.message.getMessageId());
+                                _log.debug("Discarding message: " + message.getMessage().getMessageId());
                             }
 
                             //Message has been ack so discard it. This will dequeue and decrement the reference.
@@ -162,7 +163,7 @@
                     {
                         if (_log.isDebugEnabled())
                         {
-                            _log.debug("Discarding message: " + msg.message.getMessageId());
+                            _log.debug("Discarding message: " + msg.getMessage().getMessageId());
                         }
 
                         //Message has been ack so discard it. This will dequeue and decrement the reference.
@@ -192,7 +193,7 @@
             {
                 if (_log.isDebugEnabled())
                 {
-                    _log.debug("Discarding message: " + msg.message.getMessageId());
+                    _log.debug("Discarding message: " + msg.getMessage().getMessageId());
                 }
 
                 //Message has been ack so discard it. This will dequeue and decrement the reference.
@@ -206,7 +207,7 @@
             if (_log.isDebugEnabled())
             {
                 _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
-                           msg.message.getMessageId());
+                           msg.getMessage().getMessageId());
             }
         }
     }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java Tue Dec 18 15:00:40 2007
@@ -25,6 +25,7 @@
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.StoreContext;
 
 /**
@@ -111,14 +112,13 @@
      *
      * <p/>This is an 'enqueue' operation.
      *
-     * @param message      The message to deliver.
-     * @param queue        The queue to deliver the message to.
+     * @param entry        The message to deliver, and the queue to deliver to.
      * @param deliverFirst <tt>true</tt> to place the message on the front of the queue for redelivery, <tt>false</tt>
      *                     for normal FIFO message ordering.
      *
      * @throws AMQException If the message cannot be delivered for any reason.
      */
-    void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException;
+    void deliver(QueueEntry entry, boolean deliverFirst) throws AMQException;
 
     /**
      * Acknowledges a message or many messages as delivered. All messages up to a specified one, may be acknowledged by

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java Tue Dec 18 15:00:40 2007
@@ -24,6 +24,7 @@
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.tools.messagestore.MessageStoreTool;
 import org.apache.qpid.tools.utils.Console;
 
@@ -85,7 +86,7 @@
     }
 
 
-    protected List<List> createMessageData(java.util.List<Long> msgids, List<AMQMessage> messages, boolean showHeaders, boolean showRouting,
+    protected List<List> createMessageData(java.util.List<Long> msgids, List<QueueEntry> messages, boolean showHeaders, boolean showRouting,
                                            boolean showMessageHeaders)
     {
 
@@ -96,8 +97,9 @@
         display.add(hex);
         display.add(ascii);
 
-        for (AMQMessage msg : messages)
+        for (QueueEntry entry : messages)
         {
+            AMQMessage msg = entry.getMessage();
             if (!includeMsg(msg, msgids))
             {
                 continue;
@@ -252,8 +254,8 @@
     private void addShowInformation(List<String> column1, List<String> column2, AMQMessage msg,
                                     String title, boolean routing, boolean headers, boolean messageHeaders)
     {
-        List<AMQMessage> single = new LinkedList<AMQMessage>();
-        single.add(msg);
+        List<QueueEntry> single = new LinkedList<QueueEntry>();
+        single.add(new QueueEntry(null,msg));
 
         List<List> routingData = super.createMessageData(null, single, headers, routing, messageHeaders);
 

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java Tue Dec 18 15:00:40 2007
@@ -23,6 +23,7 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.tools.messagestore.MessageStoreTool;
 
@@ -166,12 +167,12 @@
 
         if (fromQueue != null)
         {
-            List<AMQMessage> messages = fromQueue.getMessagesOnTheQueue();
+            List<QueueEntry> messages = fromQueue.getMessagesOnTheQueue();
             if (messages != null)
             {
-                for (AMQMessage msg : messages)
+                for (QueueEntry msg : messages)
                 {
-                    ids.add(msg.getMessageId());
+                    ids.add(msg.getMessage().getMessageId());
                 }
             }
         }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java Tue Dec 18 15:00:40 2007
@@ -27,6 +27,7 @@
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.tools.messagestore.MessageStoreTool;
 import org.apache.qpid.tools.utils.Console;
 
@@ -114,7 +115,7 @@
 
         if (_queue != null)
         {
-            List<AMQMessage> messages = _queue.getMessagesOnTheQueue();
+            List<QueueEntry> messages = _queue.getMessagesOnTheQueue();
             if (messages == null || messages.size() == 0)
             {
                 _console.println("No messages on queue");
@@ -153,7 +154,7 @@
      * @param showMessageHeaders show the msg headers be shown
      * @return the formated data lists for printing
      */
-    protected List<List> createMessageData(List<Long> msgids, List<AMQMessage> messages, boolean showHeaders, boolean showRouting,
+    protected List<List> createMessageData(List<Long> msgids, List<QueueEntry> messages, boolean showHeaders, boolean showRouting,
                                            boolean showMessageHeaders)
     {
 
@@ -334,8 +335,9 @@
         }
 
         //Add create the table of data
-        for (AMQMessage msg : messages)
+        for (QueueEntry entry : messages)
         {
+            AMQMessage msg = entry.getMessage();
             if (!includeMsg(msg, msgids))
             {
                 continue;

Modified: incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java Tue Dec 18 15:00:40 2007
@@ -104,7 +104,7 @@
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
 
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
@@ -146,7 +146,7 @@
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
 
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
@@ -166,7 +166,7 @@
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
 
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
@@ -207,7 +207,7 @@
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
 
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
@@ -227,7 +227,7 @@
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
 
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
@@ -247,7 +247,7 @@
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
 
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
@@ -266,7 +266,7 @@
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
 
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
@@ -308,7 +308,7 @@
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
 
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
@@ -327,7 +327,7 @@
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
 
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
@@ -369,7 +369,7 @@
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
 
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
@@ -403,7 +403,7 @@
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
 
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
@@ -446,7 +446,7 @@
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
 
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
@@ -487,7 +487,7 @@
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
 
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());

Modified: incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Tue Dec 18 15:00:40 2007
@@ -286,7 +286,7 @@
 
         for (int i = 0; i < messageCount; i++)
         {
-            _queue.process(_storeContext, messages[i], false);
+            _queue.process(_storeContext, new QueueEntry(_queue,messages[i]), false);
         }
     }
 

Modified: incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Tue Dec 18 15:00:40 2007
@@ -211,7 +211,7 @@
 
         msg.enqueue(_queue);
         msg.routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
-        _queue.process(_storeContext, msg, false);
+        _queue.process(_storeContext, new QueueEntry(_queue, msg), false);
         _queueMBean.viewMessageContent(id);
         try
         {

Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java Tue Dec 18 15:00:40 2007
@@ -29,6 +29,7 @@
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.RequiredDeliveryException;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.txn.NonTransactionalContext;
@@ -133,7 +134,7 @@
                 };
 
                 TestMessage message = new TestMessage(deliveryTag, i, info, txnContext);
-                _map.add(deliveryTag, new UnacknowledgedMessage(null, message, null, deliveryTag));
+                _map.add(deliveryTag, new UnacknowledgedMessage(new QueueEntry(null,message), null, deliveryTag));
             }
             _acked = acked;
             _unacked = unacked;
@@ -150,7 +151,7 @@
             {
                 UnacknowledgedMessage u = _map.get(tag);
                 assertTrue("Message not found for tag " + tag, u != null);
-                ((TestMessage) u.message).assertCountEquals(expected);
+                ((TestMessage) u.getMessage()).assertCountEquals(expected);
             }
         }
 

Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Tue Dec 18 15:00:40 2007
@@ -27,6 +27,7 @@
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.SkeletonMessageStore;
@@ -250,9 +251,9 @@
          * @param deliverFirst
          * @throws AMQException
          */
-        public void process(StoreContext context, AMQMessage msg, boolean deliverFirst) throws AMQException
+        public void process(StoreContext context, QueueEntry msg, boolean deliverFirst) throws AMQException
         {
-            messages.add(new HeadersExchangeTest.Message(msg));
+            messages.add(new HeadersExchangeTest.Message(msg.getMessage()));
         }
     }
 

Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java Tue Dec 18 15:00:40 2007
@@ -142,7 +142,7 @@
             msg.incrementReference();
             msg.routingComplete(_messageStore, _storeContext, factory);
             // we manually send the message to the subscription
-            _subscription.send(msg, _queue);
+            _subscription.send(new QueueEntry(_queue,msg), _queue);
         }
     }
 
@@ -167,7 +167,7 @@
             assertTrue(deliveryTag == i);
             i++;
             UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
-            assertTrue(unackedMsg.queue == _queue);
+            assertTrue(unackedMsg.getQueue() == _queue);
         }
 
         assertTrue(map.size() == msgCount);
@@ -228,7 +228,7 @@
         {
             assertTrue(deliveryTag == i);
             UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
-            assertTrue(unackedMsg.queue == _queue);
+            assertTrue(unackedMsg.getQueue() == _queue);
             // 5 is the delivery tag of the message that *should* be removed
             if (++i == 5)
             {
@@ -257,7 +257,7 @@
         {
             assertTrue(deliveryTag == i + 5);
             UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
-            assertTrue(unackedMsg.queue == _queue);
+            assertTrue(unackedMsg.getQueue() == _queue);
             ++i;
         }
     }
@@ -281,7 +281,7 @@
         {
             assertTrue(deliveryTag == i + 5);
             UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
-            assertTrue(unackedMsg.queue == _queue);
+            assertTrue(unackedMsg.getQueue() == _queue);
             ++i;
         }
     }

Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java Tue Dec 18 15:00:40 2007
@@ -42,9 +42,9 @@
 
     private final List<SubscriptionTestHelper> _subscribers = new ArrayList<SubscriptionTestHelper>();
     private final Set<Subscription> _active = new HashSet<Subscription>();
-    private final List<AMQMessage> _messages = new ArrayList<AMQMessage>();
+    private final List<QueueEntry> _messages = new ArrayList<QueueEntry>();
     private int next = 0;//index to next message to send
-    private final List<AMQMessage> _received = Collections.synchronizedList(new ArrayList<AMQMessage>());
+    private final List<QueueEntry> _received = Collections.synchronizedList(new ArrayList<QueueEntry>());
     private final Executor _executor = new OnCurrentThreadExecutor();
     private final List<Thread> _threads = new ArrayList<Thread>();
 
@@ -159,7 +159,7 @@
         }
     }
 
-    private AMQMessage nextMessage()
+    private QueueEntry nextMessage()
     {
         synchronized (_messages)
         {
@@ -191,7 +191,7 @@
     {
         void doRun() throws Throwable
         {
-            AMQMessage msg = nextMessage();
+            QueueEntry msg = nextMessage();
             if (msg != null)
             {
                 _deliveryMgr.deliver(null, new AMQShortString(toString()), msg, false);

Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java Tue Dec 18 15:00:40 2007
@@ -40,7 +40,7 @@
 
     public void testStartInQueueingMode() throws AMQException
     {
-        AMQMessage[] messages = new AMQMessage[10];
+        QueueEntry[] messages = new QueueEntry[10];
         for (int i = 0; i < messages.length; i++)
         {
             messages[i] = message();
@@ -85,7 +85,7 @@
 
     public void testStartInDirectMode() throws AMQException
     {
-        AMQMessage[] messages = new AMQMessage[10];
+        QueueEntry[] messages = new QueueEntry[10];
         for (int i = 0; i < messages.length; i++)
         {
             messages[i] = message();
@@ -132,7 +132,7 @@
     {
         try
         {
-            AMQMessage msg = message(true);
+            QueueEntry msg = message(true);
             _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg, false);
             msg.checkDeliveredToConsumer();
             fail("expected exception did not occur");
@@ -154,7 +154,7 @@
             SubscriptionTestHelper s = new SubscriptionTestHelper("A");
             _subscriptions.addSubscriber(s);
             s.setSuspended(true);
-            AMQMessage msg = message(true);
+            QueueEntry msg = message(true);
             _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg, false);
             msg.checkDeliveredToConsumer();
             fail("expected exception did not occur");

Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java Tue Dec 18 15:00:40 2007
@@ -55,12 +55,12 @@
         ApplicationRegistry.initialise(new NullApplicationRegistry());
     }
 
-    AMQMessage message() throws AMQException
+    QueueEntry message() throws AMQException
     {
         return message(false);
     }
 
-    AMQMessage message(final boolean immediate) throws AMQException
+    QueueEntry message(final boolean immediate) throws AMQException
     {
         MessagePublishInfo publish = new MessagePublishInfo()
         {
@@ -86,8 +86,8 @@
             }
         };
                               
-        return new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext,
-                              new ContentHeaderBody());
+        return new QueueEntry(null,new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext,
+                              new ContentHeaderBody()));
     }
 
 }

Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Tue Dec 18 15:00:40 2007
@@ -28,13 +28,13 @@
 
 public class SubscriptionTestHelper implements Subscription
 {
-    private final List<AMQMessage> messages;
+    private final List<QueueEntry> messages;
     private final Object key;
     private boolean isSuspended;
 
     public SubscriptionTestHelper(Object key)
     {
-        this(key, new ArrayList<AMQMessage>());
+        this(key, new ArrayList<QueueEntry>());
     }
 
     public SubscriptionTestHelper(final Object key, final boolean isSuspended)
@@ -43,18 +43,18 @@
         setSuspended(isSuspended);
     }
 
-    SubscriptionTestHelper(Object key, List<AMQMessage> messages)
+    SubscriptionTestHelper(Object key, List<QueueEntry> messages)
     {
         this.key = key;
         this.messages = messages;
     }
 
-    List<AMQMessage> getMessages()
+    List<QueueEntry> getMessages()
     {
         return messages;
     }
 
-    public void send(AMQMessage msg, AMQQueue queue)
+    public void send(QueueEntry msg, AMQQueue queue)
     {
         messages.add(msg);
     }
@@ -69,12 +69,12 @@
         return isSuspended;
     }
 
-    public boolean wouldSuspend(AMQMessage msg)
+    public boolean wouldSuspend(QueueEntry msg)
     {
         return isSuspended;
     }
 
-    public void addToResendQueue(AMQMessage msg)
+    public void addToResendQueue(QueueEntry msg)
     {
         //no-op
     }
@@ -98,27 +98,27 @@
         return false;
     }
 
-    public boolean hasInterest(AMQMessage msg)
+    public boolean hasInterest(QueueEntry msg)
     {
         return true;
     }
 
-    public Queue<AMQMessage> getPreDeliveryQueue()
+    public Queue<QueueEntry> getPreDeliveryQueue()
     {
         return null;
     }
 
-    public Queue<AMQMessage> getResendQueue()
+    public Queue<QueueEntry> getResendQueue()
     {
         return null;
     }
 
-    public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages)
+    public Queue<QueueEntry> getNextQueue(Queue<QueueEntry> messages)
     {
         return messages;
     }
 
-    public void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst)
+    public void enqueueForPreDelivery(QueueEntry msg, boolean deliverFirst)
     {
         //no-op
     }