You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/02/23 11:20:45 UTC

svn commit: r510897 [1/2] - in /incubator/qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/ack/ broker/src/main/java/org/apache/qpid/server/handler/ broker/src/main/java/org/apache/qpid/serv...

Author: ritchiem
Date: Fri Feb 23 02:20:44 2007
New Revision: 510897

URL: http://svn.apache.org/viewvc?view=rev&rev=510897
Log:
QPID-346 Message loss after rollback
QPID-348 Problems of prefetching messages
QPID-355 Closing a consumer does not ensure messages delivery will stop for that subscription 

BROKER
AMQChannel - updated requeue to either resend via the Delivery Manager not directly via msg.writedeliver.
BasicRejectMethodHandler - initial place holder.
TxRollbackHandler - Added comment
AMQMessage - added ability to record who has taken the message so that it can be resent to that subscriber on resend/requeue.
AMQQueue - added the queue reference to the Subscription creation
ConcurrentSelectorDeliveryManager - Added methods to correctly monitor the size of queue messages. Including messages on the resend queue of a Subscriber. Additional locking to ensure that messages are not sent to the subscriber after Closure. QPID-355 
DeliveryManager - adjusted deliver call to allow delivery to the head of the queue.
Subscription - changes to allow selction of queue(resend or predelivery) methods to add to resend and getSendLock to ensure that sending to the Subscription is allowed.
SubscriptionFactory - changes to allow the AMQQueue to be passed to the Subscription.
SubscriptionImpl - implementation of the interfaces. Local storage of messages to be resent and requeuing of the messages during closure.
SubscriptionSet - changes to retrieve the actual stored Subscription when performing removeSubscriber. So we have access to the the resend queue.
AMQStateManager - Added BasicRejectMethodHandler
TransactionalContext - Added option to deliver the messages to the front of the queue.
LocalTransactionalContext - cleared the _postComitDeliveryList on rollback. Added option to deliver the messages to the front of the queue.
NonTransactionalContext - Added option to deliver the messages to the front of the queue.

DeliverMessageOperation.java DELELTED AS NOT USED.

CLIENT
AMQSession - added ability to get the pervious state of the dispatcher when settting Stopped, fixed the channel suspension problems on broker so uncommented clean up code in rollback and recover.
BasicMessageConsumer - updated the rollback so that it sends reject messages to server.
AbstractJMSMessage - whitespace + added extra message properties to the toString()
AMQProtocolHandler - whitespace + extra debug output
TransactedTest - updated expect to prevent NPEs also added extra logging to help understand what is going on.

CLUSTER
ClusteredQueue - AMQQueue changes for message deliveryFirst.
RemoteSubscriptionImpl - Implementation of Subscription 

SYSTESTS
AbstractHeadersExchangeTestBase - AMQQueue changes for message deliveryFirst.
AMQQueueMBeanTest - changes for message deliveryFirst.
ConcurrencyTest - changes for message deliveryFirst. 
DeliveryManagerTest - changes for message deliveryFirst.
SubscriptionTestHelper - Implementation of Subscription 


WhiteSpace only
UnacknowledgedMessageMapImpl.java


Added:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java   (with props)
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java   (with props)
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/MessageQueue.java   (with props)
Removed:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java
Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=510897&r1=510896&r2=510897
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Fri Feb 23 02:20:44 2007
@@ -46,6 +46,7 @@
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.Subscription;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.txn.LocalTransactionalContext;
@@ -74,28 +75,20 @@
      */
     private AtomicLong _deliveryTag = new AtomicLong(0);
 
-    /**
-     * A channel has a default queue (the last declared) that is used when no queue name is
-     * explictily set
-     */
+    /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */
     private AMQQueue _defaultQueue;
 
-    /**
-     * This tag is unique per subscription to a queue. The server returns this in response to a
-     * basic.consume request.
-     */
+    /** This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. */
     private int _consumerTag;
 
     /**
-     * The current message - which may be partial in the sense that not all frames have been received yet -
-     * which has been received by this channel. As the frames are received the message gets updated and once all
-     * frames have been received the message can then be routed.
+     * The current message - which may be partial in the sense that not all frames have been received yet - which has
+     * been received by this channel. As the frames are received the message gets updated and once all frames have been
+     * received the message can then be routed.
      */
     private AMQMessage _currentMessage;
 
-    /**
-     * Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue.
-     */
+    /** Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue. */
     private final Map<AMQShortString, AMQQueue> _consumerTag2QueueMap = new HashMap<AMQShortString, AMQQueue>();
 
     private final MessageStore _messageStore;
@@ -109,8 +102,8 @@
     private TransactionalContext _txnContext;
 
     /**
-     * A context used by the message store enabling it to track context for a given channel even across
-     * thread boundaries
+     * A context used by the message store enabling it to track context for a given channel even across thread
+     * boundaries
      */
     private final StoreContext _storeContext;
 
@@ -123,7 +116,6 @@
     private final AMQProtocolSession _session;
 
 
-
     public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges)
             throws AMQException
     {
@@ -138,9 +130,7 @@
         _txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
     }
 
-    /**
-     * Sets this channel to be part of a local transaction
-     */
+    /** Sets this channel to be part of a local transaction */
     public void setLocalTransactional()
     {
         _txnContext = new LocalTransactionalContext(_messageStore, _storeContext, _returnMessages);
@@ -293,17 +283,17 @@
     }
 
     /**
-     * Subscribe to a queue. We register all subscriptions in the channel so that
-     * if the channel is closed we can clean up all subscriptions, even if the
-     * client does not explicitly unsubscribe from all queues.
-     *
-     * @param tag     the tag chosen by the client (if null, server will generate one)
-     * @param queue   the queue to subscribe to
-     * @param session the protocol session of the subscriber
+     * Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean
+     * up all subscriptions, even if the client does not explicitly unsubscribe from all queues.
+     *
+     * @param tag       the tag chosen by the client (if null, server will generate one)
+     * @param queue     the queue to subscribe to
+     * @param session   the protocol session of the subscriber
      * @param noLocal
      * @param exclusive
-     * @return the consumer tag. This is returned to the subscriber and used in
-     *         subsequent unsubscribe requests
+     *
+     * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
+     *
      * @throws ConsumerTagNotUniqueException if the tag is not unique
      * @throws AMQException                  if something goes wrong
      */
@@ -335,7 +325,7 @@
     }
 
     /**
-     * Called from the protocol session to close this channel and clean up.
+     * Called from the protocol session to close this channel and clean up. T
      *
      * @throws AMQException if there is an error during closure
      */
@@ -344,8 +334,6 @@
         _txnContext.rollback();
         unsubscribeAllConsumers(session);
         requeue();
-        _txnContext.commit();
-
     }
 
     private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException
@@ -362,8 +350,8 @@
      * Add a message to the channel-based list of unacknowledged messages
      *
      * @param message     the message that was delivered
-     * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of
-     *                    the delivery tag)
+     * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the
+     *                    delivery tag)
      * @param queue       the queue from which the message was delivered
      */
     public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, AMQShortString consumerTag, AMQQueue queue)
@@ -376,8 +364,8 @@
     }
 
     /**
-     * Called to attempt re-enqueue all outstanding unacknowledged messages on the channel.
-     * May result in delivery to this same channel or to other subscribers.
+     * Called to attempt re-enqueue all outstanding unacknowledged messages on the channel. May result in delivery to
+     * this same channel or to other subscribers.
      *
      * @throws org.apache.qpid.AMQException if the requeue fails
      */
@@ -386,23 +374,75 @@
         // we must create a new map since all the messages will get a new delivery tag when they are redelivered
         Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
 
+        TransactionalContext nontransacted = null;
+        if (!(_txnContext instanceof NonTransactionalContext))
+        {
+            nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this,
+                                                        _returnMessages, _browsedAcks);
+        }
+
+
         for (UnacknowledgedMessage unacked : messagesToBeDelivered)
         {
             if (unacked.queue != null)
             {
-                _txnContext.deliver(unacked.message, unacked.queue);
+                // Deliver these messages out of the transaction as their delivery was never
+                // part of the transaction only the receive.
+                if (!(_txnContext instanceof NonTransactionalContext))
+                {
+                    nontransacted.deliver(unacked.message, unacked.queue, false);
+                }
+                else
+                {
+                    _txnContext.deliver(unacked.message, unacked.queue, false);
+                }
             }
         }
 
     }
 
+    public void requeue(long deliveryTag) throws AMQException
+    {
+        UnacknowledgedMessage unacked = _unacknowledgedMessageMap.remove(deliveryTag);
 
-    /**
-     * Called to resend all outstanding unacknowledged messages to this same channel.
-     */
+        if (unacked != null)
+        {
+            TransactionalContext nontransacted = null;
+            if (!(_txnContext instanceof NonTransactionalContext))
+            {
+                nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this,
+                                                            _returnMessages, _browsedAcks);
+            }
+
+            if (!(_txnContext instanceof NonTransactionalContext))
+            {
+                nontransacted.deliver(unacked.message, unacked.queue, false);
+            }
+            else
+            {
+                _txnContext.deliver(unacked.message, unacked.queue, false);
+            }
+            unacked.message.decrementReference(_storeContext);
+        }
+        else
+        {
+            _log.error("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists");
+        }
+
+
+    }
+
+
+    /** Called to resend all outstanding unacknowledged messages to this same channel. */
     public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException
     {
         final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>();
+        final List<UnacknowledgedMessage> msgToResend = new LinkedList<UnacknowledgedMessage>();
+
+        if (_log.isInfoEnabled())
+        {
+            _log.info("unacked map contains " + _unacknowledgedMessageMap.size());
+        }
 
         _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
         {
@@ -412,21 +452,40 @@
                 AMQShortString consumerTag = message.consumerTag;
                 AMQMessage msg = message.message;
                 msg.setRedelivered(true);
-                if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag) && !isSuspended())
+                if (consumerTag != null)
                 {
-                    msg.writeDeliver(session, _channelId, deliveryTag, consumerTag);
+                    // Consumer exists
+                    if (_consumerTag2QueueMap.containsKey(consumerTag))
+                    {
+                        msgToResend.add(message);
+                    }
+                    else // consumer has gone
+                    {
+                        msgToRequeue.add(message);
+                    }
                 }
                 else
                 {
                     // Message has no consumer tag, so was "delivered" to a GET
                     // or consumer no longer registered
                     // cannot resend, so re-queue.
-                    if (message.queue != null && (consumerTag == null || requeue))
+                    if (message.queue != null)
+                    {
+                        if (requeue)
+                        {
+                            msgToRequeue.add(message);
+                        }
+                        else
+                        {
+                            _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
+                        }
+                    }
+                    else
                     {
-                        msgToRequeue.add(message);                         
+                        _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
                     }
                 }
-                
+
                 // false means continue processing
                 return false;
             }
@@ -436,21 +495,112 @@
             }
         });
 
-        for(UnacknowledgedMessage message : msgToRequeue)
+        // Process Messages to Resend
+        if (_log.isInfoEnabled())
         {
-            _txnContext.deliver(message.message, message.queue);
+            if (!msgToResend.isEmpty())
+            {
+                _log.info("Preparing (" + msgToResend.size() + ") message to resend to.");
+            }
+        }
+        for (UnacknowledgedMessage message : msgToResend)
+        {
+            AMQMessage msg = message.message;
+
+            // Our Java Client will always suspend the channel when resending!!
+//            if (isSuspended())
+//            {
+//                _log.info("Channel is suspended so requeuing");
+//                //move this message to requeue
+//                msgToRequeue.add(message);
+//            }
+//            else
+            {
+                //release to allow it to be delivered
+                msg.release();
+
+                // Without any details from the client about what has been processed we have to mark
+                // all messages in the unacked map as redelivered.
+                msg.setRedelivered(true);
+
+
+                Subscription sub = msg.getDeliveredSubscription();
+
+                if (sub != null)
+                {
+                    synchronized (sub.getSendLock())
+                    {
+                        if (sub.isClosed())
+                        {
+                            _log.info("Subscription closed during resend so requeuing message");
+                            //move this message to requeue
+                            msgToRequeue.add(message);
+                        }
+                        else
+                        {
+                            if (_log.isDebugEnabled())
+                            {
+                                _log.debug("Requeuing (" + System.identityHashCode(msg) + ") for resend");
+                            }
+                            // Will throw an exception if the sub is closed
+                            sub.addToResendQueue(msg);
+                            _unacknowledgedMessageMap.remove(message.deliveryTag);
+                            // Don't decrement as we are bypassing the normal deliver which increments
+                            // this is what there is a decrement on the Requeue as deliver will increment.
+                            // msg.decrementReference(_storeContext);
+                        }
+                    }
+                }
+                else
+                {
+                    _log.info("DeliveredSubscription not recorded so just requeueing to prevent loss");
+                    //move this message to requeue
+                    msgToRequeue.add(message);
+                }
+            }
+        }
+
+        if (_log.isInfoEnabled())
+        {
+            if (!msgToRequeue.isEmpty())
+            {
+                _log.info("Preparing (" + msgToRequeue.size() + ") message to requeue to.");
+            }
+        }
+
+        TransactionalContext nontransacted = null;
+        if (!(_txnContext instanceof NonTransactionalContext))
+        {
+            nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this,
+                                                        _returnMessages, _browsedAcks);
+        }
+
+        // Process Messages to Requeue at the front of the queue
+        for (UnacknowledgedMessage message : msgToRequeue)
+        {
+            // Deliver these messages out of the transaction as their delivery was never
+            // part of the transaction only the receive.
+            if (!(_txnContext instanceof NonTransactionalContext))
+            {
+                nontransacted.deliver(message.message, message.queue, true);
+            }
+            else
+            {
+                _txnContext.deliver(message.message, message.queue, true);
+            }
+
             _unacknowledgedMessageMap.remove(message.deliveryTag);
             message.message.decrementReference(_storeContext);
         }
     }
 
     /**
-     * Callback indicating that a queue has been deleted. We must update the structure of unacknowledged
-     * messages to remove the queue reference and also decrement any message reference counts, without
-     * actually removing the item since we may get an ack for a delivery tag that was generated from the
-     * deleted queue.
+     * Callback indicating that a queue has been deleted. We must update the structure of unacknowledged messages to
+     * remove the queue reference and also decrement any message reference counts, without actually removing the item
+     * since we may get an ack for a delivery tag that was generated from the deleted queue.
      *
      * @param queue the queue that has been deleted
+     *
      * @throws org.apache.qpid.AMQException if there is an error processing the unacked messages
      */
     public void queueDeleted(final AMQQueue queue) throws AMQException
@@ -487,6 +637,7 @@
      * @param deliveryTag the last delivery tag
      * @param multiple    if true will acknowledge all messages up to an including the delivery tag. if false only
      *                    acknowledges the single message specified by the delivery tag
+     *
      * @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel
      */
     public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
@@ -517,10 +668,10 @@
     private void checkSuspension()
     {
         boolean suspend;
-        
-        suspend = ((_prefetch_HighWaterMark != 0) &&  _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark)
-                 || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes());
-        
+
+        suspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark)
+                  || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes());
+
         setSuspended(suspend);
     }
 
@@ -570,8 +721,6 @@
     public void rollback() throws AMQException
     {
         _txnContext.rollback();
-
-
     }
 
     public String toString()
@@ -617,8 +766,8 @@
         }
         else
         {
-            boolean willSuspend = ((_prefetch_HighWaterMark != 0) &&  _unacknowledgedMessageMap.size() + 1 > _prefetch_HighWaterMark);
-            if(!willSuspend)
+            boolean willSuspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() + 1 > _prefetch_HighWaterMark);
+            if (!willSuspend)
             {
                 final long unackedSize = _unacknowledgedMessageMap.getUnacknowledgeBytes();
 
@@ -626,12 +775,17 @@
             }
 
 
-            if(willSuspend)
+            if (willSuspend)
             {
                 setSuspended(true);
             }
             return willSuspend;
         }
 
+    }
+
+    public TransactionalContext getTransactionalContext()
+    {
+        return _txnContext;
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?view=diff&rev=510897&r1=510896&r2=510897
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java Fri Feb 23 02:20:44 2007
@@ -85,7 +85,6 @@
             for (UnacknowledgedMessage msg : msgs)
             {
                 remove(msg.deliveryTag);
-
             }
         }
     }

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?view=auto&rev=510897
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java Fri Feb 23 02:20:44 2007
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicRejectBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.ack.UnacknowledgedMessage;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.log4j.Logger;
+
+public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicRejectBody>
+{
+    private static final Logger _logger = Logger.getLogger(BasicRejectMethodHandler.class);
+
+    private static BasicRejectMethodHandler _instance = new BasicRejectMethodHandler();
+
+    public static BasicRejectMethodHandler getInstance()
+    {
+        return _instance;
+    }
+
+    private BasicRejectMethodHandler()
+    {
+    }
+
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicRejectBody> evt) throws AMQException
+    {
+        AMQProtocolSession session = stateManager.getProtocolSession();
+
+        _logger.info("FIXME: Rejecting:" + evt.getMethod().deliveryTag + ": Requeue:" + evt.getMethod().requeue);
+
+        int channelId = evt.getChannelId();
+        UnacknowledgedMessage message = session.getChannel(channelId).getUnacknowledgedMessageMap().get(evt.getMethod().deliveryTag);
+
+        _logger.info("Need to reject message:" + message);
+//        if (evt.getMethod().requeue)
+//        {
+//          session.getChannel(channelId).requeue(evt.getMethod().deliveryTag);
+//        }
+//        else
+//       {
+//           // session.getChannel(channelId).resend(message);
+//       }
+
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java?view=diff&rev=510897&r1=510896&r2=510897
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java Fri Feb 23 02:20:44 2007
@@ -62,6 +62,7 @@
             session.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
             //Now resend all the unacknowledged messages back to the original subscribers.
             //(Must be done after the TxnRollback-ok response).
+            // Why, are we not allowed to send messages back to client before the ok method?
             channel.resend(session, false);
         }
         catch (AMQException e)

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=510897&r1=510896&r2=510897
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Fri Feb 23 02:20:44 2007
@@ -36,21 +36,15 @@
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.txn.TransactionalContext;
 
-/**
- * Combines the information that make up a deliverable message into a more manageable form.
- */
+/** Combines the information that make up a deliverable message into a more manageable form. */
 public class AMQMessage
 {
     private static final Logger _log = Logger.getLogger(AMQMessage.class);
 
-    /**
-     * Used in clustering
-     */
+    /** Used in clustering */
     private Set<Object> _tokens;
 
-    /**
-     * Only use in clustering - should ideally be removed?
-     */
+    /** Only use in clustering - should ideally be removed? */
     private AMQProtocolSession _publisher;
 
     private final Long _messageId;
@@ -63,16 +57,14 @@
     private TransactionalContext _txnContext;
 
     /**
-     * Flag to indicate whether message has been delivered to a
-     * consumer. Used in implementing return functionality for
+     * Flag to indicate whether message has been delivered to a consumer. Used in implementing return functionality for
      * messages published with the 'immediate' flag.
      */
     private boolean _deliveredToConsumer;
     /**
-     * We need to keep track of whether the message was 'immediate'
-     * as in extreme circumstances, when the checkDelieveredToConsumer
-     * is called, the message may already have been received and acknowledged,
-     * and the body removed from the store.
+     * We need to keep track of whether the message was 'immediate' as in extreme circumstances, when the
+     * checkDelieveredToConsumer is called, the message may already have been received and acknowledged, and the body
+     * removed from the store.
      */
     private boolean _immediate;
 
@@ -80,11 +72,16 @@
 
     private TransientMessageData _transientMessageData = new TransientMessageData();
 
+    private Subscription _takenBySubcription;
 
+    public boolean isTaken()
+    {
+        return _taken.get();
+    }
 
     /**
-     * Used to iterate through all the body frames associated with this message. Will not
-     * keep all the data in memory therefore is memory-efficient.
+     * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
+     * therefore is memory-efficient.
      */
     private class BodyFrameIterator implements Iterator<AMQDataBlock>
     {
@@ -103,7 +100,7 @@
         {
             try
             {
-                return _index < _messageHandle.getBodyCount(getStoreContext(),_messageId) - 1;
+                return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1;
             }
             catch (AMQException e)
             {
@@ -153,7 +150,7 @@
         {
             try
             {
-                return _index < _messageHandle.getBodyCount(getStoreContext(),_messageId) - 1;
+                return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1;
             }
             catch (AMQException e)
             {
@@ -166,7 +163,7 @@
         {
             try
             {
-                return _messageHandle.getContentChunk(getStoreContext(),_messageId, ++_index);
+                return _messageHandle.getContentChunk(getStoreContext(), _messageId, ++_index);
             }
             catch (AMQException e)
             {
@@ -196,12 +193,14 @@
     }
 
     /**
-     * Used when recovering, i.e. when the message store is creating references to messages.
-     * In that case, the normal enqueue/routingComplete is not done since the recovery process
-     * is responsible for routing the messages to queues.
+     * Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal
+     * enqueue/routingComplete is not done since the recovery process is responsible for routing the messages to
+     * queues.
+     *
      * @param messageId
      * @param store
      * @param factory
+     *
      * @throws AMQException
      */
     public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext) throws AMQException
@@ -213,8 +212,8 @@
     }
 
     /**
-     * Used in testing only. This allows the passing of the content header immediately
-     * on construction.
+     * Used in testing only. This allows the passing of the content header immediately on construction.
+     *
      * @param messageId
      * @param info
      * @param txnContext
@@ -228,14 +227,15 @@
     }
 
     /**
-     * Used in testing only. This allows the passing of the content header and some body fragments on
-     * construction.
+     * Used in testing only. This allows the passing of the content header and some body fragments on construction.
+     *
      * @param messageId
      * @param info
      * @param txnContext
      * @param contentHeader
      * @param destinationQueues
      * @param contentBodies
+     *
      * @throws AMQException
      */
     public AMQMessage(Long messageId, MessagePublishInfo info,
@@ -280,7 +280,7 @@
         }
         else
         {
-            return _messageHandle.getContentHeaderBody(getStoreContext(),_messageId);
+            return _messageHandle.getContentHeaderBody(getStoreContext(), _messageId);
         }
     }
 
@@ -338,16 +338,14 @@
         return _messageId;
     }
 
-    /**
-     * Threadsafe. Increment the reference count on the message.
-     */
+    /** Threadsafe. Increment the reference count on the message. */
     public void incrementReference()
     {
         _referenceCount.incrementAndGet();
         if (_log.isDebugEnabled())
         {
 
-            _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount + "   " +  Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4));
+            _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount + "   " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4));
 
         }
     }
@@ -355,7 +353,7 @@
     /**
      * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
      * message store.
-     *                                                                                                            
+     *
      * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
      *                                 failed
      */
@@ -371,7 +369,7 @@
             {
                 if (_log.isDebugEnabled())
                 {
-                    _log.debug("Ref count on message " + _messageId + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4));
+                    _log.debug("Ref count on message " + _messageId + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4));
 
 
                 }
@@ -394,13 +392,13 @@
         {
             if (_log.isDebugEnabled())
             {
-                _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId+ "\n" +  Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4));
+                _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId + "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4));
                 if (_referenceCount.get() < 0)
                 {
                     Thread.dumpStack();
                 }
             }
-            if(_referenceCount.get()<0)
+            if (_referenceCount.get() < 0)
             {
                 throw new MessageCleanupException("Reference count for message id " + _messageId + " has gone below 0.");
             }
@@ -419,7 +417,8 @@
 
     /**
      * Called selectors to determin if the message has already been sent
-     * @return   _deliveredToConsumer
+     *
+     * @return _deliveredToConsumer
      */
     public boolean getDeliveredToConsumer()
     {
@@ -427,10 +426,17 @@
     }
 
 
-
-    public boolean taken()
+    public boolean taken(Subscription sub)
     {
-        return _taken.getAndSet(true);
+        if (_taken.getAndSet(true))
+        {
+            return true;
+        }
+        else
+        {
+            _takenBySubcription = sub;
+            return false;
+        }
     }
 
     public void release()
@@ -441,9 +447,9 @@
     public boolean checkToken(Object token)
     {
 
-        if(_tokens==null)
+        if (_tokens == null)
         {
-            _tokens  = new HashSet<Object>();
+            _tokens = new HashSet<Object>();
         }
 
         if (_tokens.contains(token))
@@ -458,11 +464,12 @@
     }
 
     /**
-     * Registers a queue to which this message is to be delivered. This is
-     * called from the exchange when it is routing the message. This will be called before any content bodies have
-     * been received so that the choice of AMQMessageHandle implementation can be picked based on various criteria.
+     * Registers a queue to which this message is to be delivered. This is called from the exchange when it is routing
+     * the message. This will be called before any content bodies have been received so that the choice of
+     * AMQMessageHandle implementation can be picked based on various criteria.
      *
      * @param queue the queue
+     *
      * @throws org.apache.qpid.AMQException if there is an error enqueuing the message
      */
     public void enqueue(AMQQueue queue) throws AMQException
@@ -483,16 +490,15 @@
         }
         else
         {
-            return _messageHandle.isPersistent(getStoreContext(),_messageId);
+            return _messageHandle.isPersistent(getStoreContext(), _messageId);
         }
     }
 
     /**
      * Called to enforce the 'immediate' flag.
      *
-     * @throws NoConsumersException if the message is marked for
-     *                              immediate delivery but has not been marked as delivered to a
-     *                              consumer
+     * @throws NoConsumersException if the message is marked for immediate delivery but has not been marked as delivered
+     *                              to a consumer
      */
     public void checkDeliveredToConsumer() throws NoConsumersException, AMQException
     {
@@ -500,7 +506,7 @@
         if (_immediate && !_deliveredToConsumer)
         {
             throw new NoConsumersException(this);
-        }        
+        }
     }
 
     public MessagePublishInfo getMessagePublishInfo() throws AMQException
@@ -512,7 +518,7 @@
         }
         else
         {
-            pb = _messageHandle.getMessagePublishInfo(getStoreContext(),_messageId);
+            pb = _messageHandle.getMessagePublishInfo(getStoreContext(), _messageId);
         }
         return pb;
     }
@@ -533,10 +539,7 @@
     }
 
 
-    /**
-     * Called when this message is delivered to a consumer. (used to
-     * implement the 'immediate' flag functionality).
-     */
+    /** Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). */
     public void setDeliveredToConsumer()
     {
         _deliveredToConsumer = true;
@@ -566,7 +569,7 @@
 
             for (AMQQueue q : destinationQueues)
             {
-                _txnContext.deliver(this, q);
+                _txnContext.deliver(this, q, true);
             }
         }
         finally
@@ -583,23 +586,22 @@
         AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
                                                                       getContentHeaderBody());
 
-        final int bodyCount = _messageHandle.getBodyCount(getStoreContext(),_messageId);
-        if(bodyCount == 0)
+        final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
+        if (bodyCount == 0)
         {
             SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
-                                                                             contentHeader);
+                                                                                       contentHeader);
 
             protocolSession.writeFrame(compositeBlock);
         }
         else
         {
 
-
             //
             // Optimise the case where we have a single content body. In that case we create a composite block
             // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
             //
-            ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, 0);
+            ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
 
             AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
             AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
@@ -609,9 +611,9 @@
             //
             // Now start writing out the other content bodies
             //
-            for(int i = 1; i < bodyCount; i++)
+            for (int i = 1; i < bodyCount; i++)
             {
-                cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, i);
+                cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
                 protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
             }
 
@@ -627,22 +629,21 @@
         AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
                                                                       getContentHeaderBody());
 
-        final int bodyCount = _messageHandle.getBodyCount(getStoreContext(),_messageId);
-        if(bodyCount == 0)
+        final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
+        if (bodyCount == 0)
         {
             SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
-                                                                             contentHeader);
+                                                                                       contentHeader);
             protocolSession.writeFrame(compositeBlock);
         }
         else
         {
 
-
             //
             // Optimise the case where we have a single content body. In that case we create a composite block
             // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
             //
-            ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, 0);
+            ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
 
             AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
             AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
@@ -652,9 +653,9 @@
             //
             // Now start writing out the other content bodies
             //
-            for(int i = 1; i < bodyCount; i++)
+            for (int i = 1; i < bodyCount; i++)
             {
-                cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, i);
+                cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
                 protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
             }
 
@@ -685,10 +686,10 @@
         AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
                                                             protocolSession.getProtocolMajorVersion(),
                                                             protocolSession.getProtocolMinorVersion(),
-                                                                deliveryTag, pb.getExchange(),
-                                                                queueSize,
-                                                                _messageHandle.isRedelivered(),
-                                                                pb.getRoutingKey());
+                                                            deliveryTag, pb.getExchange(),
+                                                            queueSize,
+                                                            _messageHandle.isRedelivered(),
+                                                            pb.getRoutingKey());
         ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
         getOkFrame.writePayload(buf);
         buf.flip();
@@ -699,7 +700,7 @@
     {
         AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
                                                               protocolSession.getProtocolMajorVersion(),
-                                                              protocolSession.getProtocolMinorVersion(), 
+                                                              protocolSession.getProtocolMinorVersion(),
                                                               getMessagePublishInfo().getExchange(),
                                                               replyCode, replyText,
                                                               getMessagePublishInfo().getRoutingKey());
@@ -757,12 +758,11 @@
         }
         catch (AMQException e)
         {
-            _log.error(e.toString(),e);
+            _log.error(e.toString(), e);
             return 0;
         }
 
-    }    
-
+    }
 
 
     public void restoreTransientMessageData() throws AMQException
@@ -771,7 +771,7 @@
         transientMessageData.setMessagePublishInfo(getMessagePublishInfo());
         transientMessageData.setContentHeaderBody(getContentHeaderBody());
         transientMessageData.addBodyLength(getContentHeaderBody().getSize());
-        _transientMessageData = transientMessageData; 
+        _transientMessageData = transientMessageData;
     }
 
 
@@ -784,6 +784,11 @@
     public String toString()
     {
         return "Message: " + _messageId + "; ref count: " + _referenceCount + "; taken: " +
-                _taken;
+               _taken + " by:" + _takenBySubcription;
+    }
+
+    public Subscription getDeliveredSubscription()
+    {
+        return _takenBySubcription;
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=510897&r1=510896&r2=510897
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Fri Feb 23 02:20:44 2007
@@ -45,13 +45,11 @@
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 /**
- * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like
- * that. It is described fully in RFC 006.
+ * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like that. It is described
+ * fully in RFC 006.
  */
 public class AMQQueue implements Managable, Comparable
 {
-
-
     public static final class ExistingExclusiveSubscription extends AMQException
     {
 
@@ -74,26 +72,19 @@
     private static final ExistingSubscriptionPreventsExclusive EXISTING_SUBSCRIPTION = new ExistingSubscriptionPreventsExclusive();
 
 
-
     private static final Logger _logger = Logger.getLogger(AMQQueue.class);
 
     private final AMQShortString _name;
 
-    /**
-     * null means shared
-     */
+    /** null means shared */
     private final AMQShortString _owner;
 
     private final boolean _durable;
 
-    /**
-     * If true, this queue is deleted when the last subscriber is removed
-     */
+    /** If true, this queue is deleted when the last subscriber is removed */
     private final boolean _autoDelete;
 
-    /**
-     * Holds subscribers to the queue.
-     */
+    /** Holds subscribers to the queue. */
     private final SubscriptionSet _subscribers;
 
     private final SubscriptionFactory _subscriptionFactory;
@@ -106,20 +97,13 @@
 
     private List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
 
-    /**
-     * Manages message delivery.
-     */
+    /** Manages message delivery. */
     private final DeliveryManager _deliveryMgr;
 
-    /**
-     * Used to track bindings to exchanges so that on deletion they can easily
-     * be cancelled.
-     */
+    /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
     private final ExchangeBindings _bindings = new ExchangeBindings(this);
 
-    /**
-     * Executor on which asynchronous delivery will be carriedout where required
-     */
+    /** Executor on which asynchronous delivery will be carriedout where required */
     private final Executor _asyncDelivery;
 
     private final AMQQueueMBean _managedObject;
@@ -127,39 +111,27 @@
     private final VirtualHost _virtualHost;
 
 
-    /**
-     * max allowed size(KB) of a single message
-     */
+    /** max allowed size(KB) of a single message */
     @Configured(path = "maximumMessageSize", defaultValue = "0")
     public long _maximumMessageSize;
 
-    /**
-     * max allowed number of messages on a queue.
-     */
+    /** max allowed number of messages on a queue. */
     @Configured(path = "maximumMessageCount", defaultValue = "0")
     public int _maximumMessageCount;
 
-    /**
-     * max queue depth for the queue
-     */
+    /** max queue depth for the queue */
     @Configured(path = "maximumQueueDepth", defaultValue = "0")
     public long _maximumQueueDepth;
 
-    /**
-     * maximum message age before alerts occur
-     */
+    /** maximum message age before alerts occur */
     @Configured(path = "maximumMessageAge", defaultValue = "0")
     public long _maximumMessageAge;
 
-    /**
-     * the minimum interval between sending out consequetive alerts of the same type
-     */
+    /** the minimum interval between sending out consequetive alerts of the same type */
     @Configured(path = "minimumAlertRepeatGap", defaultValue = "0")
     public long _minimumAlertRepeatGap;
 
-    /**
-     * total messages received by the queue since startup.
-     */
+    /** total messages received by the queue since startup. */
     public AtomicLong _totalMessagesReceived = new AtomicLong();
 
     public int compareTo(Object o)
@@ -176,7 +148,6 @@
     }
 
 
-
     protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
                        boolean autoDelete, VirtualHost virtualHost,
                        SubscriptionSet subscribers)
@@ -211,7 +182,7 @@
 
         _subscribers = subscribers;
         _subscriptionFactory = subscriptionFactory;
-		_deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
+        _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
     }
 
     private AMQQueueMBean createMBean() throws AMQException
@@ -251,17 +222,13 @@
         return _autoDelete;
     }
 
-    /**
-     * @return no of messages(undelivered) on the queue.
-     */
+    /** @return no of messages(undelivered) on the queue. */
     public int getMessageCount()
     {
         return _deliveryMgr.getQueueMessageCount();
     }
 
-    /**
-     * @return List of messages(undelivered) on the queue.
-     */
+    /** @return List of messages(undelivered) on the queue. */
     public List<AMQMessage> getMessagesOnTheQueue()
     {
         return _deliveryMgr.getMessages();
@@ -275,6 +242,7 @@
 
     /**
      * @param messageId
+     *
      * @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist.
      */
     public AMQMessage getMessageOnTheQueue(long messageId)
@@ -294,13 +262,12 @@
     }
 
     /**
-     * moves messages from this queue to another queue. to do this the approach is following-
-     * - setup the queue for moving messages (hold the lock and stop the async delivery)
-     * - get all the messages available in the given message id range
-     * - setup the other queue for moving messages (hold the lock and stop the async delivery)
-     * - send these available messages to the other queue (enqueue in other queue)
-     * - Once sending to other Queue is successful, remove messages from this queue
-     * - remove locks from both queues and start async delivery
+     * moves messages from this queue to another queue. to do this the approach is following- - setup the queue for
+     * moving messages (hold the lock and stop the async delivery) - get all the messages available in the given message
+     * id range - setup the other queue for moving messages (hold the lock and stop the async delivery) - send these
+     * available messages to the other queue (enqueue in other queue) - Once sending to other Queue is successful,
+     * remove messages from this queue - remove locks from both queues and start async delivery
+     *
      * @param fromMessageId
      * @param toMessageId
      * @param queueName
@@ -316,7 +283,7 @@
             startMovingMessages();
             List<AMQMessage> list = getMessagesOnTheQueue();
             List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>();
-            int maxMessageCountToBeMoved = (int)(toMessageId - fromMessageId + 1);
+            int maxMessageCountToBeMoved = (int) (toMessageId - fromMessageId + 1);
 
             // Run this loop till you find all the messages or the list has no more messages
             for (AMQMessage message : list)
@@ -344,7 +311,7 @@
         {
             // remove the lock and start the async delivery
             anotherQueue.stopMovingMessages();
-            stopMovingMessages();   
+            stopMovingMessages();
         }
     }
 
@@ -364,10 +331,8 @@
         _deliveryMgr.stopMovingMessages();
         _deliveryMgr.processAsync(_asyncDelivery);
     }
-    
-    /**
-     * @return MBean object associated with this Queue
-     */
+
+    /** @return MBean object associated with this Queue */
     public ManagedObject getManagedObject()
     {
         return _managedObject;
@@ -422,20 +387,16 @@
     public long getOldestMessageArrivalTime()
     {
         return _deliveryMgr.getOldestMessageArrival();
-        
+
     }
 
-    /**
-     * Removes the AMQMessage from the top of the queue.
-     */
+    /** Removes the AMQMessage from the top of the queue. */
     public synchronized void deleteMessageFromTop(StoreContext storeContext) throws AMQException
     {
         _deliveryMgr.removeAMessageFromTop(storeContext);
     }
 
-    /**
-     * removes all the messages from the queue.
-     */
+    /** removes all the messages from the queue. */
     public synchronized long clearQueue(StoreContext storeContext) throws AMQException
     {
         return _deliveryMgr.clearAllMessages(storeContext);
@@ -443,10 +404,10 @@
 
     public void bind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException
     {
-        exchange.registerQueue(routingKey, this, arguments);        
-        if(isDurable() && exchange.isDurable())
+        exchange.registerQueue(routingKey, this, arguments);
+        if (isDurable() && exchange.isDurable())
         {
-            _virtualHost.getMessageStore().bindQueue(exchange,routingKey,this,arguments);
+            _virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments);
         }
         _bindings.addBinding(routingKey, arguments, exchange);
     }
@@ -454,9 +415,9 @@
     public void unBind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException
     {
         exchange.deregisterQueue(routingKey, this, arguments);
-        if(isDurable() && exchange.isDurable())
+        if (isDurable() && exchange.isDurable())
         {
-            _virtualHost.getMessageStore().unbindQueue(exchange,routingKey,this,arguments);
+            _virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments);
         }
         _bindings.remove(routingKey, arguments, exchange);
     }
@@ -466,30 +427,31 @@
                                         FieldTable filters, boolean noLocal, boolean exclusive)
             throws AMQException
     {
-        if(incrementSubscriberCount() > 1)
+        if (incrementSubscriberCount() > 1)
         {
-            if(isExclusive())
+            if (isExclusive())
             {
                 decrementSubscriberCount();
                 throw EXISTING_EXCLUSIVE;
             }
-            else if(exclusive)
+            else if (exclusive)
             {
                 decrementSubscriberCount();
                 throw EXISTING_SUBSCRIPTION;
             }
 
         }
-        else if(exclusive)
+        else if (exclusive)
         {
             setExclusive(true);
         }
 
         debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
 
-        Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal);
+        Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks,
+                                                                            filters, noLocal, this);
 
-        if(subscription.hasFilters())
+        if (subscription.hasFilters())
         {
             if (_deliveryMgr.hasQueuedMessages())
             {
@@ -537,10 +499,10 @@
                                    " and protocol session key " + ps.getKey() + " not registered with queue " + this);
         }
 
+        removedSubscription.close();
         setExclusive(false);
         decrementSubscriberCount();
 
-
         // if we are eligible for auto deletion, unregister from the queue registry
         if (_autoDelete && _subscribers.isEmpty())
         {
@@ -583,13 +545,13 @@
 
     public void delete() throws AMQException
     {
-        if(!_deleted.getAndSet(true))
+        if (!_deleted.getAndSet(true))
         {
             _subscribers.queueDeleted(this);
             _bindings.deregister();
             _virtualHost.getQueueRegistry().unregisterQueue(_name);
             _managedObject.unregister();
-            for(Task task : _deleteTaskList)
+            for (Task task : _deleteTaskList)
             {
                 task.doTask(this);
             }
@@ -605,7 +567,8 @@
 
     public void processGet(StoreContext storeContext, AMQMessage msg) throws AMQException
     {
-        _deliveryMgr.deliver(storeContext, getName(), msg);
+        //fixme not sure what this is doing. should we be passing deliverFirst through here?
+        _deliveryMgr.deliver(storeContext, getName(), msg, false);
         try
         {
             msg.checkDeliveredToConsumer();
@@ -620,9 +583,9 @@
     }
 
 
-    public void process(StoreContext storeContext, AMQMessage msg) throws AMQException
+    public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
     {
-        _deliveryMgr.deliver(storeContext, getName(), msg);
+        _deliveryMgr.deliver(storeContext, getName(), msg, deliverFirst);
         try
         {
             msg.checkDeliveredToConsumer();
@@ -731,7 +694,7 @@
 
     public static interface Task
     {
-        public void doTask(AMQQueue queue) throws AMQException;        
+        public void doTask(AMQQueue queue) throws AMQException;
     }
 
     public void addQueueDeleteTask(Task task)
@@ -759,4 +722,8 @@
         _maximumMessageAge = maximumMessageAge;
     }
 
+    public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, AMQMessage msg)
+    {
+        _deliveryMgr.subscriberHasPendingResend(hasContent, subscription, msg);
+    }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=510897&r1=510896&r2=510897
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Fri Feb 23 02:20:44 2007
@@ -24,9 +24,14 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
+import java.util.Set;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.log4j.Logger;
@@ -38,12 +43,12 @@
 import org.apache.qpid.server.configuration.Configurator;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.util.MessageQueue;
+import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize;
 import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
 
 
-/**
- * Manages delivery of messages on behalf of a queue
- */
+/** Manages delivery of messages on behalf of a queue */
 public class ConcurrentSelectorDeliveryManager implements DeliveryManager
 {
     private static final Logger _log = Logger.getLogger(ConcurrentSelectorDeliveryManager.class);
@@ -51,47 +56,36 @@
     @Configured(path = "advanced.compressBufferOnQueue",
                 defaultValue = "false")
     public boolean compressBufferOnQueue;
-    /**
-     * Holds any queued messages
-     */
-    private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+    /** Holds any queued messages */
+    private final MessageQueue<AMQMessage> _messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>();
 
-    private final ReentrantLock _messageAccessLock = new ReentrantLock();
-
-    //private int _messageCount;
-    /**
-     * Ensures that only one asynchronous task is running for this manager at
-     * any time.
-     */
+    /** Ensures that only one asynchronous task is running for this manager at any time. */
     private final AtomicBoolean _processing = new AtomicBoolean();
-    /**
-     * The subscriptions on the queue to whom messages are delivered
-     */
+    /** The subscriptions on the queue to whom messages are delivered */
     private final SubscriptionManager _subscriptions;
 
     /**
-     * A reference to the queue we are delivering messages for. We need this to be able
-     * to pass the code that handles acknowledgements a handle on the queue.
+     * A reference to the queue we are delivering messages for. We need this to be able to pass the code that handles
+     * acknowledgements a handle on the queue.
      */
     private final AMQQueue _queue;
 
     /**
-     * Flag used while moving messages from this queue to another. For moving messages the async delivery
-     * should also stop. This flat should be set to true to stop async delivery and set to false to enable
-     * async delivery again.
+     * Flag used while moving messages from this queue to another. For moving messages the async delivery should also
+     * stop. This flat should be set to true to stop async delivery and set to false to enable async delivery again.
      */
     private AtomicBoolean _movingMessages = new AtomicBoolean();
-    
+
     /**
      * Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced
-     * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be delivered
-     * via the async thread.
-     * <p/>
-     * Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue.
+     * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be
+     * delivered via the async thread. <p/> Lock is used to control access to hasQueuedMessages() and over the addition
+     * of messages to the queue.
      */
     private ReentrantLock _lock = new ReentrantLock();
     private AtomicLong _totalMessageSize = new AtomicLong();
-
+    private AtomicInteger _extraMessages = new AtomicInteger();
+    private Set<Subscription> _hasContent = Collections.synchronizedSet(new HashSet<Subscription>());
 
     ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
     {
@@ -109,7 +103,7 @@
     }
 
 
-    private boolean addMessageToQueue(AMQMessage msg)
+    private boolean addMessageToQueue(AMQMessage msg, boolean deliverFirst)
     {
         // Shrink the ContentBodies to their actual size to save memory.
         if (compressBufferOnQueue)
@@ -122,7 +116,14 @@
             }
         }
 
-        _messages.offer(msg);
+        if (deliverFirst)
+        {
+            _messages.pushHead(msg);
+        }
+        else
+        {
+            _messages.offer(msg);
+        }
 
         _totalMessageSize.addAndGet(msg.getSize());
 
@@ -135,7 +136,7 @@
         _lock.lock();
         try
         {
-            return !_messages.isEmpty();
+            return !(_messages.isEmpty() && _hasContent.isEmpty());
         }
         finally
         {
@@ -149,18 +150,17 @@
     }
 
     /**
-     * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine size.
-     * The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue.
+     * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine
+     * size. The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue.
      *
      * @return int the number of messages in the delivery queue.
      */
     private int getMessageCount()
     {
-        return _messages.size();
+        return _messages.size() + _extraMessages.get();
     }
 
 
-
     public long getTotalMessageSize()
     {
         return _totalMessageSize.get();
@@ -172,6 +172,38 @@
         return msg == null ? Long.MAX_VALUE : msg.getArrivalTime();
     }
 
+    public void subscriberHasPendingResend(boolean hasContent, Subscription subscription, AMQMessage msg)
+    {
+        _lock.lock();
+        try
+        {
+            if (hasContent)
+            {
+                _log.debug("Queue has adding subscriber content");
+                _hasContent.add(subscription);
+                _totalMessageSize.addAndGet(msg.getSize());
+                _extraMessages.addAndGet(1);
+            }
+            else
+            {
+                _log.debug("Queue has removing subscriber content");
+                if (msg == null)
+                {
+                    _hasContent.remove(subscription);
+                }
+                else
+                {
+                    _totalMessageSize.addAndGet(-msg.getSize());
+                    _extraMessages.addAndGet(-1);
+                }
+            }
+        }
+        finally
+        {
+            _lock.unlock();
+        }
+    }
+
 
     public List<AMQMessage> getMessages()
     {
@@ -195,7 +227,7 @@
             AMQMessage message = currentQueue.next();
             if (subscription.hasInterest(message))
             {
-                subscription.enqueueForPreDelivery(message);
+                subscription.enqueueForPreDelivery(message, false);
             }
         }
     }
@@ -203,7 +235,7 @@
     public boolean performGet(AMQProtocolSession protocolSession, AMQChannel channel, boolean acks) throws AMQException
     {
         AMQMessage msg = getNextMessage();
-        if(msg == null)
+        if (msg == null)
         {
             return false;
         }
@@ -229,7 +261,7 @@
                     }
                     _queue.dequeue(channel.getStoreContext(), msg);
                 }
-                synchronized(channel)
+                synchronized (channel)
                 {
                     long deliveryTag = channel.getNextDeliveryTag();
 
@@ -252,8 +284,8 @@
     }
 
     /**
-     * For feature of moving messages, this method is used. It sets the lock and sets the movingMessages flag,
-     * so that the asyn delivery is also stopped.
+     * For feature of moving messages, this method is used. It sets the lock and sets the movingMessages flag, so that
+     * the asyn delivery is also stopped.
      */
     public void startMovingMessages()
     {
@@ -262,8 +294,8 @@
     }
 
     /**
-     * Once moving messages to another queue is done or aborted, remove lock and unset the movingMessages flag,
-     * so that the async delivery can start again.
+     * Once moving messages to another queue is done or aborted, remove lock and unset the movingMessages flag, so that
+     * the async delivery can start again.
      */
     public void stopMovingMessages()
     {
@@ -276,6 +308,7 @@
 
     /**
      * Messages will be removed from this queue and all preDeliveryQueues
+     *
      * @param messageList
      */
     public void removeMovedMessages(List<AMQMessage> messageList)
@@ -308,7 +341,9 @@
 
     /**
      * Now with implementation of predelivery queues, this method will mark the message on the top as taken.
+     *
      * @param storeContext
+     *
      * @throws AMQException
      */
     public void removeAMessageFromTop(StoreContext storeContext) throws AMQException
@@ -318,11 +353,11 @@
         if (msg != null)
         {
             // mark this message as taken and get it removed
-            msg.taken();
+            msg.taken(null);
             _queue.dequeue(storeContext, msg);
             getNextMessage();
         }
-        
+
         _lock.unlock();
     }
 
@@ -335,7 +370,7 @@
         while (msg != null)
         {
             //mark this message as taken and get it removed
-            msg.taken();
+            msg.taken(null);
             _queue.dequeue(storeContext, msg);
             msg = getNextMessage();
             count++;
@@ -347,20 +382,15 @@
 
     public synchronized AMQMessage getNextMessage() throws AMQException
     {
-        return getNextMessage(_messages);
+        return getNextMessage(_messages, null);
     }
 
-
-    private AMQMessage getNextMessage(Queue<AMQMessage> messages)
-    {
-        return getNextMessage(messages, false);
-    }
-
-    private AMQMessage getNextMessage(Queue<AMQMessage> messages, boolean browsing)
+    private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub)
     {
         AMQMessage message = messages.peek();
 
-        while (message != null && (browsing || message.taken()))
+
+        while (message != null && ((sub == null || sub.isBrowser()) || message.taken(sub)))
         {
             //remove the already taken message
             messages.poll();
@@ -371,27 +401,76 @@
         return message;
     }
 
-    public void sendNextMessage(Subscription sub, Queue<AMQMessage> messageQueue)
+    public void sendNextMessage(Subscription sub, AMQQueue queue)//Queue<AMQMessage> messageQueue)
     {
+
+        Queue<AMQMessage> messageQueue = sub.getNextQueue(_messages);
+
+        if (_log.isTraceEnabled())
+        {
+            _log.trace("Async sendNextMessage for sub (" + System.identityHashCode(sub) +
+                       ") from queue (" + System.identityHashCode(messageQueue) +
+                       ") AMQQueue (" + System.identityHashCode(queue) + ")");
+        }
+
+        if (messageQueue == null)
+        {
+            // There is no queue with messages currently. This is ok... just means the queue has no msgs matching selector
+            if (_log.isDebugEnabled())
+            {
+                _log.debug(sub + ": asked to send messages but has none on given queue:" + queue);
+            }
+            return;
+        }
+
         AMQMessage message = null;
         try
         {
-            message = getNextMessage(messageQueue, sub.isBrowser());
+            message = getNextMessage(messageQueue, sub);
 
             // message will be null if we have no messages in the messageQueue.
             if (message == null)
             {
+                if (_log.isTraceEnabled())
+                {
+                    _log.trace("No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")");
+                }
                 return;
             }
             if (_log.isDebugEnabled())
             {
-                _log.debug("Async Delivery Message:" + message + " to :" + sub);
+                _log.debug("Async Delivery Message (" + System.identityHashCode(message) +
+                           ") by :" + System.identityHashCode(this) +
+                           ") to :" + System.identityHashCode(sub));
             }
 
             sub.send(message, _queue);
 
             //remove sent message from our queue.
             messageQueue.poll();
+            //If we don't remove the message from _messages
+            // Otherwise the Async send will never end
+
+            if (messageQueue == sub.getResendQueue())
+            {
+                if (_log.isTraceEnabled())
+                {
+                    _log.trace("All messages sent from resendQueue for " + sub);
+                }
+                if (messageQueue.isEmpty())
+                {
+                    subscriberHasPendingResend(false, sub, null);
+                    //better to use the above method as this keeps all the tracking in one location.
+//                    _hasContent.remove(sub);
+                }
+
+                _extraMessages.decrementAndGet();
+            }
+            else if (messageQueue == sub.getPreDeliveryQueue())
+            {
+                _log.info("We could do clean up of the main _message queue here");
+            }
+
             _totalMessageSize.addAndGet(-message.getSize());
         }
         catch (AMQException e)
@@ -403,6 +482,7 @@
 
     /**
      * enqueues the messages in the list on the queue and all required predelivery queues
+     *
      * @param storeContext
      * @param movedMessageList
      */
@@ -411,7 +491,7 @@
         _lock.lock();
         for (AMQMessage msg : movedMessageList)
         {
-            addMessageToQueue(msg);
+            addMessageToQueue(msg, true);
         }
 
         // enqueue on the pre delivery queues
@@ -422,7 +502,7 @@
                 // Only give the message to those that want them.
                 if (sub.hasInterest(msg))
                 {
-                    sub.enqueueForPreDelivery(msg);
+                    sub.enqueueForPreDelivery(msg, true);
                 }
             }
         }
@@ -430,8 +510,8 @@
     }
 
     /**
-     * Only one thread should ever execute this method concurrently, but
-     * it can do so while other threads invoke deliver().
+     * Only one thread should ever execute this method concurrently, but it can do so while other threads invoke
+     * deliver().
      */
     private void processQueue()
     {
@@ -444,40 +524,43 @@
 
             for (Subscription sub : _subscriptions.getSubscriptions())
             {
-                if (!sub.isSuspended())
+                synchronized (sub.getSendLock())
                 {
-                    sendNextMessage(sub);
+                    if (!sub.isSuspended())
+                    {
+                        sendNextMessage(sub, _queue);
 
-                    hasSubscribers = true;
+                        hasSubscribers = true;
+                    }
                 }
             }
         }
     }
 
-    private void sendNextMessage(Subscription sub)
-    {
-        if (sub.hasFilters())
-        {
-            sendNextMessage(sub, sub.getPreDeliveryQueue());
-            if (sub.isAutoClose())
-            {
-                if (sub.getPreDeliveryQueue().isEmpty())
-                {
-                    sub.close();
-                }
-            }
-        }
-        else
-        {
-            sendNextMessage(sub, _messages);
-        }
-    }
+//    private void sendNextMessage(Subscription sub)
+//    {
+//        if (sub.hasFilters())
+//        {
+//            sendNextMessage(sub, sub.getPreDeliveryQueue());
+//            if (sub.isAutoClose())
+//            {
+//                if (sub.getPreDeliveryQueue().isEmpty())
+//                {
+//                    sub.close();
+//                }
+//            }
+//        }
+//        else
+//        {
+//            sendNextMessage(sub, _messages);
+//        }
+//    }
 
-    public void deliver(StoreContext context, AMQShortString name, AMQMessage msg) throws AMQException
+    public void deliver(StoreContext context, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws AMQException
     {
         if (_log.isDebugEnabled())
         {
-            _log.debug(id() + "deliver :" + msg);
+            _log.debug(id() + "deliver :first(" + deliverFirst + ") :" + msg);
         }
         msg.release();
 
@@ -491,11 +574,11 @@
             {
                 if (_log.isDebugEnabled())
                 {
-                    _log.debug(id() + "Testing Message(" + msg + ") for Queued Delivery");
+                    _log.debug(id() + "Testing Message(" + msg + ") for Queued Delivery:" + currentStatus());
                 }
                 if (!msg.getMessagePublishInfo().isImmediate())
                 {
-                    addMessageToQueue(msg);
+                    addMessageToQueue(msg, deliverFirst);
 
                     //release lock now message is on queue.
                     _lock.unlock();
@@ -504,7 +587,7 @@
                     if (_log.isDebugEnabled())
                     {
                         _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() +
-                                   " subscribers to give the message to.");
+                                   " subscribers to give the message to:" + currentStatus());
                     }
                     for (Subscription sub : _subscriptions.getSubscriptions())
                     {
@@ -528,7 +611,7 @@
                                 _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) +
                                            ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
                             }
-                            sub.enqueueForPreDelivery(msg);
+                            sub.enqueueForPreDelivery(msg, deliverFirst);
                         }
                     }
                 }
@@ -537,14 +620,47 @@
             {
                 //release lock now
                 _lock.unlock();
-
-                if (_log.isDebugEnabled())
+                synchronized (s.getSendLock())
                 {
-                    _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
-                               System.identityHashCode(s) + ") :" + s);
+                    if (!s.isSuspended())
+                    {
+                        if (_log.isDebugEnabled())
+                        {
+                            _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
+                                       System.identityHashCode(s) + ") :" + s);
+                        }
+                        msg.taken(s);
+                        //Deliver the message
+                        s.send(msg, _queue);
+                    }
+                    else
+                    {
+                        if (_log.isDebugEnabled())
+                        {
+                            _log.debug(id() + " Subscription(" + System.identityHashCode(s) + ") became suspended between nextSubscriber and send");
+                        }
+                    }
+
+                    if (!msg.isTaken())
+                    {
+                        if (_log.isDebugEnabled())
+                        {
+                            _log.debug(id() + " Message(" + System.identityHashCode(msg) + ") has not been taken so recursing!:" +
+                                       " Subscriber:" + System.identityHashCode(s));
+                        }
+
+                        deliver(context, name, msg, deliverFirst);
+                    }
+                    else
+                    {
+                        if (_log.isDebugEnabled())
+                        {
+                            _log.debug(id() + " Message(" + System.identityHashCode(msg) +
+                                       ") has been taken so disregarding deliver request to Subscriber:" +
+                                       System.identityHashCode(s));
+                        }
+                    }
                 }
-                //Deliver the message
-                s.send(msg, _queue);
             }
         }
         finally
@@ -593,9 +709,7 @@
     {
         if (_log.isDebugEnabled())
         {
-            _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" +
-                       " Active:" + _subscriptions.hasActiveSubscribers() +
-                       " Processing:" + _processing.get());
+            _log.debug("Processing Async." + currentStatus());
         }
 
         if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
@@ -606,6 +720,18 @@
                 executor.execute(asyncDelivery);
             }
         }
+    }
+
+    private String currentStatus()
+    {
+        return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains") +
+               "(" + _messages.size() + ":" + ((ConcurrentLinkedQueue) _messages).size() + ") " +
+               " Extra: " + (_hasContent.isEmpty() ? "Empty " : "Contains") +
+               "(" + _hasContent.size() + ":" + _extraMessages.get() + ") " +
+               " Active:" + _subscriptions.hasActiveSubscribers() +
+               " Processing:" + _processing.get() +
+               " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains") +
+               "(" + _messages.size() + ":" + ((ConcurrentLinkedQueue) _messages).size() + ") ";
     }
 
 }