You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/12/19 00:00:43 UTC

svn commit: r605352 [1/2] - in /incubator/qpid/branches/M2.1/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/ack/ broker/src/main/java/org/apache/qpid/server/exchange/ broker/src/main/java/org/apache/qpid/...

Author: rgodfrey
Date: Tue Dec 18 15:00:40 2007
New Revision: 605352

URL: http://svn.apache.org/viewvc?rev=605352&view=rev
Log:
QPID-711 : create a QueueEntry class and move message-on-queue functions (such as taken()) to this class

Added:
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
Modified:
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
    incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
    incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
    incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
    incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
    incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java
    incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
    incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
    incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Dec 18 15:00:40 2007
@@ -36,10 +36,7 @@
 import org.apache.qpid.server.exchange.MessageRouter;
 import org.apache.qpid.server.exchange.NoRouteException;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.queue.Subscription;
+import org.apache.qpid.server.queue.*;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.txn.LocalTransactionalContext;
@@ -421,33 +418,32 @@
     /**
      * Add a message to the channel-based list of unacknowledged messages
      *
-     * @param message     the message that was delivered
+     * @param entry       the record of the message on the queue that was delivered
      * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the
      *                    delivery tag)
      * @param consumerTag The tag for the consumer that is to acknowledge this message.
-     * @param queue       the queue from which the message was delivered
      */
-    public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, AMQShortString consumerTag, AMQQueue queue)
+    public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, AMQShortString consumerTag)
     {
         if (_log.isDebugEnabled())
         {
-            if (queue == null)
+            if (entry.getQueue() == null)
             {
-                _log.debug("Adding unacked message with a null queue:" + message.debugIdentity());
+                _log.debug("Adding unacked message with a null queue:" + entry.debugIdentity());
             }
             else
             {
                 if (_log.isDebugEnabled())
                 {
-                    _log.debug(debugIdentity() + " Adding unacked message(" + message.toString() + " DT:" + deliveryTag
-                               + ") with a queue(" + queue + ") for " + consumerTag);
+                    _log.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag
+                               + ") with a queue(" + entry.getQueue() + ") for " + consumerTag);
                 }
             }
         }
 
         synchronized (_unacknowledgedMessageMap.getLock())
         {
-            _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
+            _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(entry, consumerTag, deliveryTag));
             checkSuspension();
         }
     }
@@ -499,16 +495,16 @@
 
         for (UnacknowledgedMessage unacked : messagesToBeDelivered)
         {
-            if (unacked.queue != null)
+            if (!unacked.isQueueDeleted())
             {
                 // Ensure message is released for redelivery
-                unacked.message.release(unacked.queue);
+                unacked.entry.release();
 
                 // Mark message redelivered
-                unacked.message.setRedelivered(true);
+                unacked.getMessage().setRedelivered(true);
 
                 // Deliver Message
-                deliveryContext.deliver(unacked.message, unacked.queue, false);
+                deliveryContext.deliver(unacked.entry, false);
 
                 // Should we allow access To the DM to directy deliver the message?
                 // As we don't need to check for Consumers or worry about incrementing the message count?
@@ -533,13 +529,13 @@
         {
 
             // Ensure message is released for redelivery
-            if (unacked.queue != null)
+            if (!unacked.isQueueDeleted())
             {
-                unacked.message.release(unacked.queue);
+                unacked.entry.release();
             }
 
             // Mark message redelivered
-            unacked.message.setRedelivered(true);
+            unacked.getMessage().setRedelivered(true);
 
             // Deliver these messages out of the transaction as their delivery was never
             // part of the transaction only the receive.
@@ -559,16 +555,16 @@
                 deliveryContext = _txnContext;
             }
 
-            if (unacked.queue != null)
+            if (!unacked.isQueueDeleted())
             {
                 // Redeliver the messages to the front of the queue
-                deliveryContext.deliver(unacked.message, unacked.queue, true);
+                deliveryContext.deliver(unacked.entry, true);
                 // Deliver increments the message count but we have already deliverted this once so don't increment it again
                 // this was because deliver did an increment changed this.
             }
             else
             {
-                _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.message.debugIdentity()
+                _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.getMessage().debugIdentity()
                           + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
                 // _log.error("Requested requeue of message:" + deliveryTag +
                 // " but no queue defined using DeadLetter queue:" + getDeadLetterQueue());
@@ -591,7 +587,7 @@
                     public boolean callback(UnacknowledgedMessage message) throws AMQException
                     {
                         _log.debug(
-                                (count++) + ": (" + message.message.debugIdentity() + ")" + "[" + message.deliveryTag + "]");
+                                (count++) + ": (" + message.getMessage().debugIdentity() + ")" + "[" + message.deliveryTag + "]");
 
                         return false; // Continue
                     }
@@ -630,7 +626,7 @@
             public boolean callback(UnacknowledgedMessage message) throws AMQException
             {
                 AMQShortString consumerTag = message.consumerTag;
-                AMQMessage msg = message.message;
+                AMQMessage msg = message.getMessage();
                 msg.setRedelivered(true);
                 if (consumerTag != null)
                 {
@@ -649,7 +645,7 @@
                     // Message has no consumer tag, so was "delivered" to a GET
                     // or consumer no longer registered
                     // cannot resend, so re-queue.
-                    if (message.queue != null)
+                    if (!message.isQueueDeleted())
                     {
                         if (requeue)
                         {
@@ -690,7 +686,7 @@
 
         for (UnacknowledgedMessage message : msgToResend)
         {
-            AMQMessage msg = message.message;
+            AMQMessage msg = message.getMessage();
 
             // Our Java Client will always suspend the channel when resending!
             // If the client has requested the messages be resent then it is
@@ -705,13 +701,13 @@
             // else
             // {
             // release to allow it to be delivered
-            msg.release(message.queue);
+            message.entry.release();
 
             // Without any details from the client about what has been processed we have to mark
             // all messages in the unacked map as redelivered.
             msg.setRedelivered(true);
 
-            Subscription sub = msg.getDeliveredSubscription(message.queue);
+            Subscription sub = message.entry.getDeliveredSubscription();
 
             if (sub != null)
             {
@@ -741,7 +737,7 @@
                                        + System.identityHashCode(sub));
                         }
 
-                        sub.addToResendQueue(msg);
+                        sub.addToResendQueue(message.entry);
                         _unacknowledgedMessageMap.remove(message.deliveryTag);
                     }
                 } // sync(sub.getSendLock)
@@ -789,10 +785,10 @@
         // Process Messages to Requeue at the front of the queue
         for (UnacknowledgedMessage message : msgToRequeue)
         {
-            message.message.release(message.queue);
-            message.message.setRedelivered(true);
+            message.entry.release();
+            message.entry.setRedelivered(true);
 
-            deliveryContext.deliver(message.message, message.queue, true);
+            deliveryContext.deliver(message.entry, true);
 
             _unacknowledgedMessageMap.remove(message.deliveryTag);
         }
@@ -813,17 +809,18 @@
         {
             public boolean callback(UnacknowledgedMessage message) throws AMQException
             {
-                if (message.queue == queue)
+                if (message.getQueue() == queue)
                 {
                     try
                     {
                         message.discard(_storeContext);
-                        message.queue = null;
+                        message.setQueueDeleted(true);
+                        
                     }
                     catch (AMQException e)
                     {
                         _log.error(
-                                "Error decrementing ref count on message " + message.message.getMessageId() + ": " + e, e);
+                                "Error decrementing ref count on message " + message.getMessage().getMessageId() + ": " + e, e);
                     }
                 }
 

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java Tue Dec 18 15:00:40 2007
@@ -87,7 +87,7 @@
         //buffer must be marked as persistent:
         for (UnacknowledgedMessage msg : _unacked)
         {
-            if (msg.message.isPersistent())
+            if (msg.getMessage().isPersistent())
             {
                 return true;
             }
@@ -100,7 +100,7 @@
         //make persistent changes, i.e. dequeue and decrementReference
         for (UnacknowledgedMessage msg : _unacked)
         {
-            msg.restoreTransientMessageData();
+            //msg.restoreTransientMessageData();
 
             //Message has been ack so discard it. This will dequeue and decrement the reference.
             msg.discard(storeContext);
@@ -116,7 +116,7 @@
         for (UnacknowledgedMessage msg : _unacked)
         {
             msg.clearTransientMessageData();
-            msg.message.takeReference();
+            msg.getMessage().takeReference();
         }
     }
 

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java Tue Dec 18 15:00:40 2007
@@ -24,19 +24,21 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.StoreContext;
 
 public class UnacknowledgedMessage
 {
-    public final AMQMessage message;
+    public final QueueEntry entry;
     public final AMQShortString consumerTag;
     public final long deliveryTag;
-    public AMQQueue queue;
 
-    public UnacknowledgedMessage(AMQQueue queue, AMQMessage message, AMQShortString consumerTag, long deliveryTag)
+    private boolean _queueDeleted;
+
+
+    public UnacknowledgedMessage(QueueEntry entry, AMQShortString consumerTag, long deliveryTag)
     {
-        this.queue = queue;
-        this.message = message;
+        this.entry = entry;
         this.consumerTag = consumerTag;
         this.deliveryTag = deliveryTag;
     }
@@ -45,9 +47,9 @@
     {
         StringBuilder sb = new StringBuilder();
         sb.append("Q:");
-        sb.append(queue);
+        sb.append(entry.getQueue());
         sb.append(" M:");
-        sb.append(message);
+        sb.append(entry.getMessage());
         sb.append(" CT:");
         sb.append(consumerTag);
         sb.append(" DT:");
@@ -58,22 +60,42 @@
 
     public void discard(StoreContext storeContext) throws AMQException
     {
-        if (queue != null)
+        if (entry.getQueue() != null)
         {
-            queue.dequeue(storeContext, message);
+            entry.getQueue().dequeue(storeContext, entry);
         }
         //if the queue is null then the message is waiting to be acked, but has been removed.
-        message.decrementReference(storeContext);
+        entry.getMessage().decrementReference(storeContext);
     }
 
     public void restoreTransientMessageData() throws AMQException
     {
-        message.restoreTransientMessageData();
+        entry.getMessage().restoreTransientMessageData();
     }
 
     public void clearTransientMessageData()
     {
-        message.clearTransientMessageData();
+        entry.getMessage().clearTransientMessageData();
+    }
+
+    public AMQMessage getMessage()
+    {
+        return entry.getMessage();
+    }
+
+    public AMQQueue getQueue()
+    {
+        return entry.getQueue();
+    }
+
+    public void setQueueDeleted(boolean queueDeleted)
+    {
+        _queueDeleted = queueDeleted;
+    }
+
+    public boolean isQueueDeleted()
+    {
+        return _queueDeleted;
     }
 }
 

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java Tue Dec 18 15:00:40 2007
@@ -97,7 +97,7 @@
             UnacknowledgedMessage message = _map.remove(deliveryTag);
             if(message != null)
             {
-                _unackedSize -= message.message.getSize();
+                _unackedSize -= message.getMessage().getSize();
             }
 
             return message;
@@ -127,7 +127,7 @@
         synchronized (_lock)
         {
             _map.put(deliveryTag, message);
-            _unackedSize += message.message.getSize();            
+            _unackedSize += message.getMessage().getSize();
             _lastDeliveryTag = deliveryTag;
         }
     }
@@ -186,7 +186,7 @@
                 }
 
                 it.remove();
-                _unackedSize -= unacked.getValue().message.getSize();
+                _unackedSize -= unacked.getValue().getMessage().getSize();
 
                 destination.add(unacked.getValue());
                 if (unacked.getKey() == deliveryTag)

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Tue Dec 18 15:00:40 2007
@@ -143,7 +143,7 @@
 
     	public AMQShortString getDefaultExchangeName()
     	{
-    		return ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+    		return ExchangeDefaults.FANOUT_EXCHANGE_NAME;
     	}
     };
 

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java Tue Dec 18 15:00:40 2007
@@ -85,7 +85,7 @@
         }
         else
         {
-            if (message.queue == null || message.queue.isDeleted())
+            if (message.isQueueDeleted() || message.getQueue().isDeleted())
             {
                 _logger.warn("Message's Queue as already been purged, unable to Reject. " +
                              "Dropping message should use Dead Letter Queue");
@@ -93,7 +93,7 @@
                 return;
             }
 
-            if (!message.message.isReferenced())
+            if (!message.getMessage().isReferenced())
             {
                 _logger.warn("Message as already been purged, unable to Reject.");
                 return;
@@ -102,7 +102,7 @@
 
             if (_logger.isTraceEnabled())
             {
-                _logger.trace("Rejecting: DT:" + deliveryTag + "-" + message.message.debugIdentity() +
+                _logger.trace("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() +
                               ": Requeue:" + body.getRequeue() +
                               //": Resend:" + evt.getMethod().resend +
                               " on channel:" + channel.debugIdentity());
@@ -111,7 +111,7 @@
             // If we haven't requested message to be resent to this consumer then reject it from ever getting it.
             //if (!evt.getMethod().resend)
             {
-                message.message.reject(message.message.getDeliveredSubscription(message.queue));
+                message.entry.reject();
             }
 
             if (body.getRequeue())

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Tue Dec 18 15:00:40 2007
@@ -77,22 +77,13 @@
     /** Flag to indicate that this message requires 'immediate' delivery. */
     private boolean _immediate;
 
-    // private Subscription _takenBySubcription;
-    // private AtomicBoolean _taken = new AtomicBoolean(false);
     private TransientMessageData _transientMessageData = new TransientMessageData();
 
-    //todo: this should be part of a messageOnQueue object
-    private Set<Subscription> _rejectedBy = null;
+    private long _expiration;
 
-    //todo: this should be part of a messageOnQueue object
-    private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>();
-    //todo: this should be part of a messageOnQueue object
-    private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>();
 
     private final int hashcode = System.identityHashCode(this);
 
-    //todo: this should be part of a messageOnQueue object
-    private long _expiration;
 
     public String debugIdentity()
     {
@@ -282,7 +273,7 @@
         setContentHeaderBody(contentHeader);
     }
 
-    /**
+    /* *
      * Used in testing only. This allows the passing of the content header and some body fragments on construction.
      *
      * @param messageId
@@ -293,7 +284,7 @@
      * @param contentBodies
      *
      * @throws AMQException
-     */
+     */        /*
     public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext,
                       ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, List<ContentChunk> contentBodies,
                       MessageStore messageStore, StoreContext storeContext, MessageHandleFactory messageHandleFactory) throws AMQException
@@ -306,7 +297,7 @@
             addContentBodyFrame(storeContext, cb);
         }
     }
-
+                 */
     protected AMQMessage(AMQMessage msg) throws AMQException
     {
         _messageId = msg._messageId;
@@ -485,84 +476,6 @@
         return _deliveredToConsumer;
     }
 
-    public boolean isTaken(AMQQueue queue)
-    {
-        // return _taken.get();
-
-        synchronized (this)
-        {
-            AtomicBoolean taken = _takenMap.get(queue);
-            if (taken == null)
-            {
-                taken = new AtomicBoolean(false);
-                _takenMap.put(queue, taken);
-            }
-
-            return taken.get();
-        }
-    }
-
-    public boolean taken(AMQQueue queue, Subscription sub)
-    {
-        // if (_taken.getAndSet(true))
-        // {
-        // return true;
-        // }
-        // else
-        // {
-        // _takenBySubcription = sub;
-        // return false;
-        // }
-
-        synchronized (this)
-        {
-            AtomicBoolean taken = _takenMap.get(queue);
-            if (taken == null)
-            {
-                taken = new AtomicBoolean(false);
-            }
-
-            if (taken.getAndSet(true))
-            {
-                return true;
-            }
-            else
-            {
-                _takenMap.put(queue, taken);
-                _takenBySubcriptionMap.put(queue, sub);
-
-                return false;
-            }
-        }
-    }
-
-    public void release(AMQQueue queue)
-    {
-        if (_log.isTraceEnabled())
-        {
-            _log.trace("Releasing Message:" + debugIdentity());
-        }
-
-        // _taken.set(false);
-        // _takenBySubcription = null;
-
-        synchronized (this)
-        {
-            AtomicBoolean taken = _takenMap.get(queue);
-            if (taken == null)
-            {
-                taken = new AtomicBoolean(false);
-            }
-            else
-            {
-                taken.set(false);
-            }
-
-            _deliveredToConsumer = false;
-            _takenMap.put(queue, taken);
-            _takenBySubcriptionMap.put(queue, null);
-        }
-    }
 
     public boolean checkToken(Object token)
     {
@@ -683,7 +596,6 @@
      */
     public boolean expired(AMQQueue queue) throws AMQException
     {
-        // note: If the storecontext isn't need then we can remove the getChannel() from Subscription.
 
         if (_expiration != 0L)
         {
@@ -732,7 +644,7 @@
                 // Increment the references to this message for each queue delivery.
                 incrementReference();
                 // normal deliver so add this message at the end.
-                _txnContext.deliver(this, q, false);
+                _txnContext.deliver(q.createEntry(this), false);
             }
         }
         finally
@@ -743,175 +655,6 @@
         }
     }
 
-    /*
-        public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
-                throws AMQException
-        {
-            ByteBuffer deliver = createEncodedDeliverFrame(protocolSession, channelId, deliveryTag, consumerTag);
-            AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                          getContentHeaderBody());
-
-            final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
-            if (bodyCount == 0)
-            {
-                SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
-                                                                                           contentHeader);
-
-                protocolSession.writeFrame(compositeBlock);
-            }
-            else
-            {
-
-                //
-                // Optimise the case where we have a single content body. In that case we create a composite block
-                // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
-                //
-                ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
-
-                AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
-                AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
-                CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
-                protocolSession.writeFrame(compositeBlock);
-
-                //
-                // Now start writing out the other content bodies
-                //
-                for (int i = 1; i < bodyCount; i++)
-                {
-                    cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
-                    protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
-                }
-
-
-            }
-
-
-        }
-
-        public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException
-        {
-            ByteBuffer deliver = createEncodedGetOkFrame(protocolSession, channelId, deliveryTag, queueSize);
-            AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                          getContentHeaderBody());
-
-            final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
-            if (bodyCount == 0)
-            {
-                SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
-                                                                                           contentHeader);
-                protocolSession.writeFrame(compositeBlock);
-            }
-            else
-            {
-
-                //
-                // Optimise the case where we have a single content body. In that case we create a composite block
-                // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
-                //
-                ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
-
-                AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
-                AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
-                CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
-                protocolSession.writeFrame(compositeBlock);
-
-                //
-                // Now start writing out the other content bodies
-                //
-                for (int i = 1; i < bodyCount; i++)
-                {
-                    cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
-                    protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
-                }
-
-
-            }
-
-
-        }
-
-
-        private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
-                throws AMQException
-        {
-            MessagePublishInfo pb = getMessagePublishInfo();
-            AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag,
-                                                                    deliveryTag, pb.getExchange(), _messageHandle.isRedelivered(),
-                                                                    pb.getRoutingKey());
-            ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
-            deliverFrame.writePayload(buf);
-            buf.flip();
-            return buf;
-        }
-
-        private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize)
-                throws AMQException
-        {
-            MessagePublishInfo pb = getMessagePublishInfo();
-            AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
-                                                                protocolSession.getProtocolMajorVersion(),
-                                                                protocolSession.getProtocolMinorVersion(),
-                                                                deliveryTag, pb.getExchange(),
-                                                                queueSize,
-                                                                _messageHandle.isRedelivered(),
-                                                                pb.getRoutingKey());
-            ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
-            getOkFrame.writePayload(buf);
-            buf.flip();
-            return buf;
-        }
-
-        private ByteBuffer createEncodedReturnFrame(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) throws AMQException
-        {
-            AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
-                                                                  protocolSession.getProtocolMajorVersion(),
-                                                                  protocolSession.getProtocolMinorVersion(),
-                                                                  getMessagePublishInfo().getExchange(),
-                                                                  replyCode, replyText,
-                                                                  getMessagePublishInfo().getRoutingKey());
-            ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
-            returnFrame.writePayload(buf);
-            buf.flip();
-            return buf;
-        }
-
-        public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText)
-                throws AMQException
-        {
-            ByteBuffer returnFrame = createEncodedReturnFrame(protocolSession, channelId, replyCode, replyText);
-
-            AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                          getContentHeaderBody());
-
-            Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(protocolSession, channelId);
-            //
-            // Optimise the case where we have a single content body. In that case we create a composite block
-            // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
-            //
-            if (bodyFrameIterator.hasNext())
-            {
-                AMQDataBlock firstContentBody = bodyFrameIterator.next();
-                AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
-                CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
-                protocolSession.writeFrame(compositeBlock);
-            }
-            else
-            {
-                CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
-                                                                                 new AMQDataBlock[]{contentHeader});
-                protocolSession.writeFrame(compositeBlock);
-            }
-
-            //
-            // Now start writing out the other content bodies
-            // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
-            //
-            while (bodyFrameIterator.hasNext())
-            {
-                protocolSession.writeFrame(bodyFrameIterator.next());
-            }
-        }
-     */
 
     public AMQMessageHandle getMessageHandle()
     {
@@ -954,47 +697,7 @@
         // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
         // _taken + " by :" + _takenBySubcription;
 
-        return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: "
-               + _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
-    }
-
-    public Subscription getDeliveredSubscription(AMQQueue queue)
-    {
-        // return _takenBySubcription;
-        synchronized (this)
-        {
-            return _takenBySubcriptionMap.get(queue);
-        }
+        return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount;
     }
 
-    public void reject(Subscription subscription)
-    {
-        if (subscription != null)
-        {
-            if (_rejectedBy == null)
-            {
-                _rejectedBy = new HashSet<Subscription>();
-            }
-
-            _rejectedBy.add(subscription);
-        }
-        else
-        {
-            _log.warn("Requesting rejection by null subscriber:" + debugIdentity());
-        }
-    }
-
-    public boolean isRejectedBy(Subscription subscription)
-    {
-        boolean rejected = _rejectedBy != null;
-
-        if (rejected) // We have subscriptions that rejected this message
-        {
-            return _rejectedBy.contains(subscription);
-        }
-        else // This messasge hasn't been rejected yet.
-        {
-            return rejected;
-        }
-    }
 }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue Dec 18 15:00:40 2007
@@ -49,6 +49,7 @@
  */
 public class AMQQueue implements Managable, Comparable
 {
+
     /**
      * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
      * already exists.
@@ -250,7 +251,7 @@
     }
 
     /** @return List of messages(undelivered) on the queue. */
-    public List<AMQMessage> getMessagesOnTheQueue()
+    public List<QueueEntry> getMessagesOnTheQueue()
     {
         return _deliveryMgr.getMessages();
     }
@@ -263,7 +264,7 @@
      *
      * @return List of messages
      */
-    public List<AMQMessage> getMessagesOnTheQueue(long fromMessageId, long toMessageId)
+    public List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId)
     {
         return _deliveryMgr.getMessages(fromMessageId, toMessageId);
     }
@@ -276,11 +277,11 @@
     /**
      * @param messageId
      *
-     * @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist.
+     * @return QueueEntry with give id if exists. null if QueueEntry with given id doesn't exist.
      */
-    public AMQMessage getMessageOnTheQueue(long messageId)
+    public QueueEntry getMessageOnTheQueue(long messageId)
     {
-        List<AMQMessage> list = getMessagesOnTheQueue(messageId, messageId);
+        List<QueueEntry> list = getMessagesOnTheQueue(messageId, messageId);
         if ((list == null) || (list.size() == 0))
         {
             return null;
@@ -319,15 +320,16 @@
             toQueue.startMovingMessages();
 
             // Get the list of messages to move.
-            List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
+            List<QueueEntry> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
 
             try
             {
                 fromStore.beginTran(storeContext);
 
                 // Move the messages in on the message store.
-                for (AMQMessage message : foundMessagesList)
+                for (QueueEntry entry : foundMessagesList)
                 {
+                    AMQMessage message = entry.getMessage();
                     fromStore.dequeueMessage(storeContext, _name, message.getMessageId());
                     toStore.enqueueMessage(storeContext, toQueue._name, message.getMessageId());
                 }
@@ -397,15 +399,16 @@
             toQueue.startMovingMessages();
 
             // Get the list of messages to move.
-            List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
+            List<QueueEntry> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
 
             try
             {
                 fromStore.beginTran(storeContext);
 
                 // Move the messages in on the message store.
-                for (AMQMessage message : foundMessagesList)
+                for (QueueEntry entry : foundMessagesList)
                 {
+                    AMQMessage message = entry.getMessage();
                     toStore.enqueueMessage(storeContext, toQueue._name, message.getMessageId());
                     message.takeReference();
                 }
@@ -463,15 +466,16 @@
             startMovingMessages();
 
             // Get the list of messages to move.
-            List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
+            List<QueueEntry> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
 
             try
             {
                 fromStore.beginTran(storeContext);
 
                 // remove the messages in on the message store.
-                for (AMQMessage message : foundMessagesList)
+                for (QueueEntry entry : foundMessagesList)
                 {
+                    AMQMessage message = entry.getMessage();
                     fromStore.dequeueMessage(storeContext, _name, message.getMessageId());
                 }
 
@@ -513,7 +517,7 @@
         _deliveryMgr.startMovingMessages();
     }
 
-    private void enqueueMovedMessages(StoreContext storeContext, List<AMQMessage> messageList)
+    private void enqueueMovedMessages(StoreContext storeContext, List<QueueEntry> messageList)
     {
         _deliveryMgr.enqueueMovedMessages(storeContext, messageList);
         _totalMessagesReceived.addAndGet(messageList.size());
@@ -583,7 +587,7 @@
 
     }
 
-    /** Removes the AMQMessage from the top of the queue. */
+    /** Removes the QueueEntry from the top of the queue. */
     public synchronized void deleteMessageFromTop(StoreContext storeContext) throws AMQException
     {
         _deliveryMgr.removeAMessageFromTop(storeContext, this);
@@ -798,27 +802,28 @@
     // return _deliveryMgr;
     // }
 
-    public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
+    public void process(StoreContext storeContext, QueueEntry entry, boolean deliverFirst) throws AMQException
     {
-        _deliveryMgr.deliver(storeContext, getName(), msg, deliverFirst);
+        AMQMessage msg = entry.getMessage();
+        _deliveryMgr.deliver(storeContext, getName(), entry, deliverFirst);
         try
         {
             msg.checkDeliveredToConsumer();
-            updateReceivedMessageCount(msg);
+            updateReceivedMessageCount(entry);
         }
         catch (NoConsumersException e)
         {
             // as this message will be returned, it should be removed
             // from the queue:
-            dequeue(storeContext, msg);
+            dequeue(storeContext, entry);
         }
     }
 
-    public void dequeue(StoreContext storeContext, AMQMessage msg) throws FailedDequeueException
+    public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
     {
         try
         {
-            msg.dequeue(storeContext, this);
+            entry.getMessage().dequeue(storeContext, this);
         }
         catch (MessageCleanupException e)
         {
@@ -844,8 +849,10 @@
         return _subscribers;
     }
 
-    protected void updateReceivedMessageCount(AMQMessage msg) throws AMQException
+    protected void updateReceivedMessageCount(QueueEntry entry) throws AMQException
     {
+        AMQMessage msg = entry.getMessage();
+
         if (!msg.isRedelivered())
         {
             _totalMessagesReceived.incrementAndGet();
@@ -933,8 +940,14 @@
         _maximumMessageAge = maximumMessageAge;
     }
 
-    public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, AMQMessage msg)
+    public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, QueueEntry entry)
     {
-        _deliveryMgr.subscriberHasPendingResend(hasContent, subscription, msg);
+        _deliveryMgr.subscriberHasPendingResend(hasContent, subscription, entry);
     }
+
+    public QueueEntry createEntry(AMQMessage amqMessage)
+    {
+        return new QueueEntry(this, amqMessage);
+    }
+
 }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Tue Dec 18 15:00:40 2007
@@ -321,11 +321,14 @@
      */
     public CompositeData viewMessageContent(long msgId) throws JMException
     {
-        AMQMessage msg = _queue.getMessageOnTheQueue(msgId);
-        if (msg == null)
+        QueueEntry entry = _queue.getMessageOnTheQueue(msgId);
+
+        if (entry == null)
         {
             throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
         }
+
+        AMQMessage msg = entry.getMessage();
         // get message content
         Iterator<ContentChunk> cBodies = msg.getContentBodyIterator();
         List<Byte> msgContent = new ArrayList<Byte>();
@@ -381,7 +384,7 @@
                 + "\n\"From Index\" should be greater than 0 and less than \"To Index\"");
         }
 
-        List<AMQMessage> list = _queue.getMessagesOnTheQueue();
+        List<QueueEntry> list = _queue.getMessagesOnTheQueue();
         TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
 
         try
@@ -389,7 +392,7 @@
             // Create the tabular list of message header contents
             for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++)
             {
-                AMQMessage msg = list.get(i - 1);
+                AMQMessage msg = list.get(i - 1).getMessage();
                 ContentHeaderBody headerBody = msg.getContentHeaderBody();
                 // Create header attributes list
                 String[] headerAttributes = getMessageHeaderProperties(headerBody);

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Tue Dec 18 15:00:40 2007
@@ -55,7 +55,7 @@
                 defaultValue = "false")
     public boolean compressBufferOnQueue;
     /** Holds any queued messages */
-    private final MessageQueue<AMQMessage> _messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>();
+    private final MessageQueue<QueueEntry> _messages = new ConcurrentLinkedMessageQueueAtomicSize<QueueEntry>();
 
     /** Ensures that only one asynchronous task is running for this manager at any time. */
     private final AtomicBoolean _processing = new AtomicBoolean();
@@ -107,8 +107,9 @@
     }
 
 
-    private boolean addMessageToQueue(AMQMessage msg, boolean deliverFirst)
+    private boolean addMessageToQueue(QueueEntry entry, boolean deliverFirst)
     {
+        AMQMessage msg = entry.getMessage();
         // Shrink the ContentBodies to their actual size to save memory.
         if (compressBufferOnQueue)
         {
@@ -124,12 +125,12 @@
         {
             synchronized (_queueHeadLock)
             {
-                _messages.pushHead(msg);
+                _messages.pushHead(entry);
             }
         }
         else
         {
-            _messages.offer(msg);
+            _messages.offer(entry);
         }
 
         _totalMessageSize.addAndGet(msg.getSize());
@@ -175,11 +176,11 @@
 
     public long getOldestMessageArrival()
     {
-        AMQMessage msg = _messages.peek();
-        return msg == null ? Long.MAX_VALUE : msg.getArrivalTime();
+        QueueEntry entry = _messages.peek();
+        return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime();
     }
 
-    public void subscriberHasPendingResend(boolean hasContent, Subscription subscription, AMQMessage msg)
+    public void subscriberHasPendingResend(boolean hasContent, Subscription subscription, QueueEntry entry)
     {
         _lock.lock();
         try
@@ -188,19 +189,19 @@
             {
                 _log.debug("Queue has adding subscriber content");
                 _hasContent.add(subscription);
-                _totalMessageSize.addAndGet(msg.getSize());
+                _totalMessageSize.addAndGet(entry.getSize());
                 _extraMessages.addAndGet(1);
             }
             else
             {
                 _log.debug("Queue has removing subscriber content");
-                if (msg == null)
+                if (entry == null)
                 {
                     _hasContent.remove(subscription);
                 }
                 else
                 {
-                    _totalMessageSize.addAndGet(-msg.getSize());
+                    _totalMessageSize.addAndGet(-entry.getSize());
                     _extraMessages.addAndGet(-1);
                 }
             }
@@ -222,14 +223,14 @@
      *
      * @return List of messages
      */
-    public List<AMQMessage> getMessages()
+    public List<QueueEntry> getMessages()
     {
         _lock.lock();
-        List<AMQMessage> list = new ArrayList<AMQMessage>();
+        List<QueueEntry> list = new ArrayList<QueueEntry>();
 
-        for (AMQMessage message : _messages)
+        for (QueueEntry entry : _messages)
         {
-            list.add(message);
+            list.add(entry);
         }
         _lock.unlock();
 
@@ -244,7 +245,7 @@
      *
      * @return
      */
-    public List<AMQMessage> getMessages(long fromMessageId, long toMessageId)
+    public List<QueueEntry> getMessages(long fromMessageId, long toMessageId)
     {
         if (fromMessageId <= 0 || toMessageId <= 0)
         {
@@ -255,14 +256,14 @@
 
         _lock.lock();
 
-        List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>();
+        List<QueueEntry> foundMessagesList = new ArrayList<QueueEntry>();
 
-        for (AMQMessage message : _messages)
+        for (QueueEntry entry : _messages)
         {
-            long msgId = message.getMessageId();
+            long msgId = entry.getMessage().getMessageId();
             if (msgId >= fromMessageId && msgId <= toMessageId)
             {
-                foundMessagesList.add(message);
+                foundMessagesList.add(entry);
             }
             // break if the no of messages are found
             if (foundMessagesList.size() == maxMessageCount)
@@ -282,16 +283,17 @@
             _log.trace("Populating PreDeliveryQueue for Subscription(" + System.identityHashCode(subscription) + ")");
         }
 
-        Iterator<AMQMessage> currentQueue = _messages.iterator();
+        Iterator<QueueEntry> currentQueue = _messages.iterator();
 
         while (currentQueue.hasNext())
         {
-            AMQMessage message = currentQueue.next();
-            if (!message.getDeliveredToConsumer())
+            QueueEntry entry = currentQueue.next();
+
+            if (!entry.getDeliveredToConsumer())
             {
-                if (subscription.hasInterest(message))
+                if (subscription.hasInterest(entry))
                 {
-                    subscription.enqueueForPreDelivery(message, false);
+                    subscription.enqueueForPreDelivery(entry, false);
                 }
             }
         }
@@ -299,8 +301,8 @@
 
     public boolean performGet(AMQProtocolSession protocolSession, AMQChannel channel, boolean acks) throws AMQException
     {
-        AMQMessage msg = getNextMessage();
-        if (msg == null)
+        QueueEntry entry = getNextMessage();
+        if (entry == null)
         {
             return false;
         }
@@ -322,9 +324,9 @@
                 {
                     if (_log.isDebugEnabled())
                     {
-                        _log.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
+                        _log.debug("No ack mode so dequeuing message immediately: " + entry.getMessage().getMessageId());
                     }
-                    _queue.dequeue(channel.getStoreContext(), msg);
+                    _queue.dequeue(channel.getStoreContext(), entry);
                 }
                 synchronized (channel)
                 {
@@ -332,22 +334,22 @@
 
                     if (acks)
                     {
-                        channel.addUnacknowledgedMessage(msg, deliveryTag, null, _queue);
+                        channel.addUnacknowledgedMessage(entry, deliveryTag, null);
                     }
 
-                    protocolSession.getProtocolOutputConverter().writeGetOk(msg, channel.getChannelId(),
+                    protocolSession.getProtocolOutputConverter().writeGetOk(entry.getMessage(), channel.getChannelId(),
                                                                             deliveryTag, _queue.getMessageCount());
-                    _totalMessageSize.addAndGet(-msg.getSize());
+                    _totalMessageSize.addAndGet(-entry.getSize());
                 }
 
                 if (!acks)
                 {
-                    msg.decrementReference(channel.getStoreContext());
+                    entry.getMessage().decrementReference(channel.getStoreContext());
                 }
             }
             finally
             {
-                msg.setDeliveredToConsumer();
+                entry.setDeliveredToConsumer();
             }
             return true;
 
@@ -381,7 +383,7 @@
      *
      * @param messageList
      */
-    public void removeMovedMessages(List<AMQMessage> messageList)
+    public void removeMovedMessages(List<QueueEntry> messageList)
     {
         // Remove from the
         boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
@@ -391,20 +393,20 @@
             {
                 if (!sub.isSuspended() && sub.filtersMessages())
                 {
-                    Queue<AMQMessage> preDeliveryQueue = sub.getPreDeliveryQueue();
-                    for (AMQMessage msg : messageList)
+                    Queue<QueueEntry> preDeliveryQueue = sub.getPreDeliveryQueue();
+                    for (QueueEntry entry : messageList)
                     {
-                        preDeliveryQueue.remove(msg);
+                        preDeliveryQueue.remove(entry);
                     }
                 }
             }
         }
 
-        for (AMQMessage msg : messageList)
+        for (QueueEntry entry : messageList)
         {
-            if (_messages.remove(msg))
+            if (_messages.remove(entry))
             {
-                _totalMessageSize.getAndAdd(-msg.getSize());
+                _totalMessageSize.getAndAdd(-entry.getSize());
             }
         }
     }
@@ -420,16 +422,16 @@
     {
         _lock.lock();
 
-        AMQMessage message = _messages.poll();
+        QueueEntry entry = _messages.poll();
 
-        if (message != null)
+        if (entry != null)
         {
-            queue.dequeue(storeContext, message);
+            queue.dequeue(storeContext, entry);
 
-            _totalMessageSize.addAndGet(-message.getSize());
+            _totalMessageSize.addAndGet(-entry.getSize());
 
             //If this causes ref count to hit zero then data will be purged so message.getSize() will NPE.
-            message.decrementReference(storeContext);
+            entry.getMessage().decrementReference(storeContext);
 
         }
 
@@ -443,17 +445,17 @@
 
         synchronized (_queueHeadLock)
         {
-            AMQMessage msg = getNextMessage();
-            while (msg != null)
+            QueueEntry entry = getNextMessage();
+            while (entry != null)
             {
                 //and remove it
                 _messages.poll();
 
-                _queue.dequeue(storeContext, msg);
+                _queue.dequeue(storeContext, entry);
 
-                msg.decrementReference(_reapingStoreContext);
+                entry.getMessage().decrementReference(_reapingStoreContext);
 
-                msg = getNextMessage();
+                entry = getNextMessage();
                 count++;
             }
             _totalMessageSize.set(0L);
@@ -469,34 +471,35 @@
      *
      * @throws org.apache.qpid.AMQException
      */
-    private AMQMessage getNextMessage() throws AMQException
+    private QueueEntry getNextMessage() throws AMQException
     {
         return getNextMessage(_messages, null, false);
     }
 
-    private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub, boolean purgeOnly) throws AMQException
+    private QueueEntry getNextMessage(Queue<QueueEntry> messages, Subscription sub, boolean purgeOnly) throws AMQException
     {
-        AMQMessage message = messages.peek();
+        QueueEntry entry = messages.peek();
 
         //while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.)
-        while (purgeMessage(message, sub, purgeOnly))
+        while (purgeMessage(entry, sub, purgeOnly))
         {
+            AMQMessage message = entry.getMessage();
             // if we are purging then ensure we mark this message taken for the current subscriber
             // the current subscriber may be null in the case of a get or a purge but this is ok.
 //            boolean alreadyTaken = message.taken(_queue, sub);
 
             //remove the already taken message or expired
-            AMQMessage removed = messages.poll();
+            QueueEntry removed = messages.poll();
 
-            assert removed == message;
+            assert removed == entry;
 
             // if the message expired then the _totalMessageSize needs adjusting
-            if (message.expired(_queue) && !message.getDeliveredToConsumer())
+            if (message.expired(_queue) && !entry.getDeliveredToConsumer())
             {
-                _totalMessageSize.addAndGet(-message.getSize());
+                _totalMessageSize.addAndGet(-entry.getSize());
 
                 // Use the reapingStoreContext as any sub(if we have one) may be in a tx.
-                _queue.dequeue(_reapingStoreContext, message);
+                _queue.dequeue(_reapingStoreContext, entry);
 
                 message.decrementReference(_reapingStoreContext);
 
@@ -515,10 +518,10 @@
             }
 
             // try the next message
-            message = messages.peek();
+            entry = messages.peek();
         }
 
-        return message;
+        return entry;
     }
 
     /**
@@ -534,7 +537,7 @@
      *
      * @throws AMQException
      */
-    private boolean purgeMessage(AMQMessage message, Subscription sub) throws AMQException
+    private boolean purgeMessage(QueueEntry message, Subscription sub) throws AMQException
     {
         return purgeMessage(message, sub, false);
     }
@@ -552,7 +555,7 @@
      *
      * @throws AMQException
      */
-    private boolean purgeMessage(AMQMessage message, Subscription sub, boolean purgeOnly) throws AMQException
+    private boolean purgeMessage(QueueEntry message, Subscription sub, boolean purgeOnly) throws AMQException
     {
         //Original.. complicated while loop control
 //                (message != null
@@ -567,7 +570,7 @@
         if (message != null)
         {
             // Check that the message hasn't expired.
-            if (message.expired(_queue))
+            if (message.expired())
             {
                 return true;
             }
@@ -576,13 +579,13 @@
             if (sub != null)
             {
                 // if we have a queue browser(we don't purge) so check mark the message as taken
-                purge = ((!sub.isBrowser() || message.isTaken(_queue)));
+                purge = ((!sub.isBrowser() || message.isTaken()));
             }
             else
             {
                 // if there is no subscription we are doing
                 // a get or purging so mark message as taken.
-                message.isTaken(_queue);
+                message.isTaken();
                 // and then ensure that it gets purged
                 purge = true;
             }
@@ -592,20 +595,20 @@
         {
             // If we are simply purging the queue don't take the message
             // just purge up to the next non-taken msg.
-            return purge && message.isTaken(_queue);
+            return purge && message.isTaken();
         }
         else
         {
             // if we are purging then ensure we mark this message taken for the current subscriber
             // the current subscriber may be null in the case of a get or a purge but this is ok.
-            return purge && message.taken(_queue, sub);
+            return purge && message.taken(sub);
         }
     }
 
-    public void sendNextMessage(Subscription sub, AMQQueue queue)//Queue<AMQMessage> messageQueue)
+    public void sendNextMessage(Subscription sub, AMQQueue queue)
     {
 
-        Queue<AMQMessage> messageQueue = sub.getNextQueue(_messages);
+        Queue<QueueEntry> messageQueue = sub.getNextQueue(_messages);
 
         if (_log.isTraceEnabled())
         {
@@ -624,16 +627,16 @@
             return;
         }
 
-        AMQMessage message = null;
-        AMQMessage removed = null;
+        QueueEntry entry = null;
+        QueueEntry removed = null;
         try
         {
             synchronized (_queueHeadLock)
             {
-                message = getNextMessage(messageQueue, sub, false);
+                entry = getNextMessage(messageQueue, sub, false);
 
                 // message will be null if we have no messages in the messageQueue.
-                if (message == null)
+                if (entry == null)
                 {
                     if (_log.isTraceEnabled())
                     {
@@ -643,7 +646,7 @@
                 }
                 if (_log.isDebugEnabled())
                 {
-                    _log.debug(debugIdentity() + "Async Delivery Message :" + message + "(" + System.identityHashCode(message) +
+                    _log.debug(debugIdentity() + "Async Delivery Message :" + entry + "(" + System.identityHashCode(entry) +
                                ") by :" + System.identityHashCode(this) +
                                ") to :" + System.identityHashCode(sub));
                 }
@@ -651,10 +654,10 @@
 
                 if (messageQueue == _messages)
                 {
-                    _totalMessageSize.addAndGet(-message.getSize());
+                    _totalMessageSize.addAndGet(-entry.getSize());
                 }
 
-                sub.send(message, _queue);
+                sub.send(entry, _queue);
 
                 //remove sent message from our queue.
                 removed = messageQueue.poll();
@@ -662,14 +665,14 @@
                 // Otherwise the Async send will never end
             }
 
-            if (removed != message)
+            if (removed != entry)
             {
-                _log.error("Just send message:" + message.debugIdentity() + " BUT removed this from queue:" + removed);
+                _log.error("Just send message:" + entry.getMessage().debugIdentity() + " BUT removed this from queue:" + removed);
             }
 
             if (_log.isDebugEnabled())
             {
-                _log.debug(debugIdentity() + "Async Delivered Message r:" + removed.debugIdentity() + "d:" + message +
+                _log.debug(debugIdentity() + "Async Delivered Message r:" + removed.getMessage().debugIdentity() + "d:" + entry +
                            ") by :" + System.identityHashCode(this) +
                            ") to :" + System.identityHashCode(sub));
             }
@@ -704,9 +707,9 @@
         }
         catch (AMQException e)
         {
-            if (message != null)
+            if (entry != null)
             {
-                message.release(_queue);
+                entry.release();
             }
             else
             {
@@ -734,23 +737,23 @@
      * @param storeContext
      * @param movedMessageList
      */
-    public void enqueueMovedMessages(StoreContext storeContext, List<AMQMessage> movedMessageList)
+    public void enqueueMovedMessages(StoreContext storeContext, List<QueueEntry> movedMessageList)
     {
         _lock.lock();
-        for (AMQMessage msg : movedMessageList)
+        for (QueueEntry entry : movedMessageList)
         {
-            addMessageToQueue(msg, false);
+            addMessageToQueue(entry, false);
         }
 
         // enqueue on the pre delivery queues
         for (Subscription sub : _subscriptions.getSubscriptions())
         {
-            for (AMQMessage msg : movedMessageList)
+            for (QueueEntry entry : movedMessageList)
             {
                 // Only give the message to those that want them.
-                if (sub.hasInterest(msg))
+                if (sub.hasInterest(entry))
                 {
-                    sub.enqueueForPreDelivery(msg, true);
+                    sub.enqueueForPreDelivery(entry, true);
                 }
             }
         }
@@ -802,30 +805,30 @@
 
     }
 
-    public void deliver(StoreContext context, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws AMQException
+    public void deliver(StoreContext context, AMQShortString name, QueueEntry entry, boolean deliverFirst) throws AMQException
     {
 
         final boolean debugEnabled = _log.isDebugEnabled();
         if (debugEnabled)
         {
-            _log.debug(debugIdentity() + "deliver :first(" + deliverFirst + ") :" + msg);
+            _log.debug(debugIdentity() + "deliver :first(" + deliverFirst + ") :" + entry);
         }
 
         //Check if we have someone to deliver the message to.
         _lock.lock();
         try
         {
-            Subscription s = _subscriptions.nextSubscriber(msg);
+            Subscription s = _subscriptions.nextSubscriber(entry);
 
             if (s == null || (!s.filtersMessages() && hasQueuedMessages())) //no-one can take the message right now or we're queueing
             {
                 if (debugEnabled)
                 {
-                    _log.debug(debugIdentity() + "Testing Message(" + msg + ") for Queued Delivery:" + currentStatus());
+                    _log.debug(debugIdentity() + "Testing Message(" + entry + ") for Queued Delivery:" + currentStatus());
                 }
-                if (!msg.getMessagePublishInfo().isImmediate())
+                if (!entry.getMessage().getMessagePublishInfo().isImmediate())
                 {
-                    addMessageToQueue(msg, deliverFirst);
+                    addMessageToQueue(entry, deliverFirst);
 
                     //release lock now message is on queue.
                     _lock.unlock();
@@ -840,25 +843,25 @@
                     {
 
                         // stop if the message gets delivered whilst PreDelivering if we have a shared queue.
-                        if (_queue.isShared() && msg.getDeliveredToConsumer())
+                        if (_queue.isShared() && entry.getDeliveredToConsumer())
                         {
                             if (debugEnabled)
                             {
-                                _log.debug(debugIdentity() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) +
+                                _log.debug(debugIdentity() + "Stopping PreDelivery as message(" + System.identityHashCode(entry) +
                                            ") is already delivered.");
                             }
                             continue;
                         }
 
                         // Only give the message to those that want them.
-                        if (sub.hasInterest(msg))
+                        if (sub.hasInterest(entry))
                         {
                             if (debugEnabled)
                             {
-                                _log.debug(debugIdentity() + "Queuing message(" + System.identityHashCode(msg) +
+                                _log.debug(debugIdentity() + "Queuing message(" + System.identityHashCode(entry) +
                                            ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
                             }
-                            sub.enqueueForPreDelivery(msg, deliverFirst);
+                            sub.enqueueForPreDelivery(entry, deliverFirst);
                         }
                     }
 
@@ -893,11 +896,11 @@
                     {
                         if (_log.isTraceEnabled())
                         {
-                            _log.trace(debugIdentity() + "Delivering Message:" + msg.debugIdentity() + " to(" +
+                            _log.trace(debugIdentity() + "Delivering Message:" + entry.getMessage().debugIdentity() + " to(" +
                                        System.identityHashCode(s) + ") :" + s);
                         }
 
-                        if (msg.taken(_queue, s))
+                        if (entry.taken(s))
                         {
                             //Message has been delivered so don't redeliver.
                             // This can currently occur because of the recursive call below
@@ -927,14 +930,14 @@
                             return;
                         }
                         //Deliver the message
-                        s.send(msg, _queue);
+                        s.send(entry, _queue);
                     }
                     else
                     {
                         if (debugEnabled)
                         {
                             _log.debug(debugIdentity() + " Subscription(" + System.identityHashCode(s) + ") became " +
-                                       "suspended between nextSubscriber and send for message:" + msg.debugIdentity());
+                                       "suspended between nextSubscriber and send for message:" + entry.getMessage().debugIdentity());
                         }
                     }
                 }
@@ -943,21 +946,21 @@
                 // Why do we do this? What was the reasoning? We should have a better approach
                 // than recursion and rejecting if someone else sends it before we do.
                 //
-                if (!msg.isTaken(_queue))
+                if (!entry.isTaken())
                 {
                     if (debugEnabled)
                     {
-                        _log.debug(debugIdentity() + " Message(" + msg.debugIdentity() + ") has not been taken so recursing!:" +
+                        _log.debug(debugIdentity() + " Message(" + entry.getMessage().debugIdentity() + ") has not been taken so recursing!:" +
                                    " Subscriber:" + System.identityHashCode(s));
                     }
 
-                    deliver(context, name, msg, deliverFirst);
+                    deliver(context, name, entry, deliverFirst);
                 }
                 else
                 {
                     if (debugEnabled)
                     {
-                        _log.debug(debugIdentity() + " Message(" + msg.toString() +
+                        _log.debug(debugIdentity() + " Message(" + entry.toString() +
                                    ") has been taken so disregarding deliver request to Subscriber:" +
                                    System.identityHashCode(s));
                     }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java Tue Dec 18 15:00:40 2007
@@ -64,13 +64,13 @@
      *
      * @param storeContext
      * @param name         the name of the entity on whose behalf we are delivering the message
-     * @param msg          the message to deliver
+     * @param entry          the message to deliver
      * @param deliverFirst
      *
      * @throws org.apache.qpid.server.queue.FailedDequeueException
      *          if the message could not be dequeued
      */
-    void deliver(StoreContext storeContext, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws FailedDequeueException, AMQException;
+    void deliver(StoreContext storeContext, AMQShortString name, QueueEntry entry, boolean deliverFirst) throws FailedDequeueException, AMQException;
 
     void removeAMessageFromTop(StoreContext storeContext, AMQQueue queue) throws AMQException;
 
@@ -78,15 +78,15 @@
 
     void startMovingMessages();
 
-    void enqueueMovedMessages(StoreContext context, List<AMQMessage> messageList);
+    void enqueueMovedMessages(StoreContext context, List<QueueEntry> messageList);
 
     void stopMovingMessages();
 
-    void removeMovedMessages(List<AMQMessage> messageListToRemove);
+    void removeMovedMessages(List<QueueEntry> messageListToRemove);
 
-    List<AMQMessage> getMessages();
+    List<QueueEntry> getMessages();
 
-    List<AMQMessage> getMessages(long fromMessageId, long toMessageId);
+    List<QueueEntry> getMessages(long fromMessageId, long toMessageId);
 
     void populatePreDeliveryQueue(Subscription subscription);
 
@@ -96,5 +96,5 @@
 
     long getOldestMessageArrival();
 
-    void subscriberHasPendingResend(boolean hasContent, Subscription subscription, AMQMessage msg);
+    void subscriberHasPendingResend(boolean hasContent, Subscription subscription, QueueEntry msg);
 }

Added: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=605352&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (added)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Tue Dec 18 15:00:40 2007
@@ -0,0 +1,173 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.log4j.Logger;
+
+import java.util.Set;
+import java.util.HashSet;
+import java.util.concurrent.atomic.AtomicReference;
+
+
+public class QueueEntry
+{
+
+    /**
+     * Used for debugging purposes.
+     */
+    private static final Logger _log = Logger.getLogger(QueueEntry.class);
+
+    private final AMQQueue _queue;
+    private final AMQMessage _message;
+
+    private Set<Subscription> _rejectedBy = null;
+
+    private AtomicReference<Object> _owner = new AtomicReference<Object>();
+
+
+    public QueueEntry(AMQQueue queue, AMQMessage message)
+    {
+        _queue = queue;
+        _message = message;
+    }
+
+
+    public AMQQueue getQueue()
+    {
+        return _queue;
+    }
+
+    public AMQMessage getMessage()
+    {
+        return _message;
+    }
+
+    public long getSize()
+    {
+        return getMessage().getSize();
+    }
+
+    public boolean getDeliveredToConsumer()
+    {
+        return getMessage().getDeliveredToConsumer();
+    }
+
+    public boolean expired() throws AMQException
+    {
+        return getMessage().expired(_queue);
+    }
+
+    public boolean isTaken()
+    {
+        return _owner.get() != null;
+    }
+
+    public boolean taken(Subscription sub)
+    {
+        return !(_owner.compareAndSet(null, sub == null ? this : sub));
+    }
+
+    public void setDeliveredToConsumer()
+    {
+        getMessage().setDeliveredToConsumer();
+    }
+
+    public void release()
+    {
+        _owner.set(null);
+    }
+
+    public String debugIdentity()
+    {
+        return getMessage().debugIdentity();
+    }
+
+    public void process(StoreContext storeContext, boolean deliverFirst) throws AMQException
+    {
+        _queue.process(storeContext, this, deliverFirst);
+    }
+
+    public void checkDeliveredToConsumer() throws NoConsumersException
+    {
+        _message.checkDeliveredToConsumer();
+    }
+
+    public void setRedelivered(boolean b)
+    {
+        getMessage().setRedelivered(true);
+    }
+
+    public Subscription getDeliveredSubscription()
+    {
+        synchronized (this)
+        {
+            Object owner = _owner.get();
+            if (owner instanceof Subscription)
+            {
+                return (Subscription) owner;
+            }
+            else
+            {
+                return null;
+            }
+        }
+    }
+
+    public void reject()
+    {
+        reject(getDeliveredSubscription());
+    }
+
+    public void reject(Subscription subscription)
+    {
+        if (subscription != null)
+        {
+            if (_rejectedBy == null)
+            {
+                _rejectedBy = new HashSet<Subscription>();
+            }
+
+            _rejectedBy.add(subscription);
+        }
+        else
+        {
+            _log.warn("Requesting rejection by null subscriber:" + debugIdentity());
+        }
+    }
+
+    public boolean isRejectedBy(Subscription subscription)
+    {
+        boolean rejected = _rejectedBy != null;
+
+        if (rejected) // We have subscriptions that rejected this message
+        {
+            return _rejectedBy.contains(subscription);
+        }
+        else // This messasge hasn't been rejected yet.
+        {
+            return rejected;
+        }
+    }
+
+
+}

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java Tue Dec 18 15:00:40 2007
@@ -27,7 +27,7 @@
 
 public interface Subscription
 {
-    void send(AMQMessage msg, AMQQueue queue) throws AMQException;
+    void send(QueueEntry msg, AMQQueue queue) throws AMQException;
 
     boolean isSuspended();
 
@@ -35,15 +35,15 @@
 
     boolean filtersMessages();
 
-    boolean hasInterest(AMQMessage msg);
+    boolean hasInterest(QueueEntry msg);
 
-    Queue<AMQMessage> getPreDeliveryQueue();
+    Queue<QueueEntry> getPreDeliveryQueue();
 
-    Queue<AMQMessage> getResendQueue();
+    Queue<QueueEntry> getResendQueue();
 
-    Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages);    
+    Queue<QueueEntry> getNextQueue(Queue<QueueEntry> messages);
 
-    void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst);
+    void enqueueForPreDelivery(QueueEntry msg, boolean deliverFirst);
 
     boolean isAutoClose();
 
@@ -53,9 +53,9 @@
 
     boolean isBrowser();
 
-    boolean wouldSuspend(AMQMessage msg);
+    boolean wouldSuspend(QueueEntry msg);
 
-    void addToResendQueue(AMQMessage msg);
+    void addToResendQueue(QueueEntry msg);
 
     Object getSendLock();