You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2008/07/15 13:42:56 UTC

svn commit: r676887 - /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java

Author: ritchiem
Date: Tue Jul 15 04:42:55 2008
New Revision: 676887

URL: http://svn.apache.org/viewvc?rev=676887&view=rev
Log:
QPID-1172 : Moved unregistration out of the sendLock. Potential refactor possible between processQueue and flushSubscription

Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=676887&r1=676886&r2=676887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Tue Jul 15 04:42:55 2008
@@ -82,16 +82,12 @@
 
     private volatile Subscription _exclusiveSubscriber;
 
-
     private final QueueEntryList _entries;
 
-
     private final AMQQueueMBean _managedObject;
     private final Executor _asyncDelivery;
     private final AtomicLong _totalMessagesReceived = new AtomicLong();
 
-
-
     /** max allowed size(KB) of a single message */
     @Configured(path = "maximumMessageSize", defaultValue = "0")
     public long _maximumMessageSize;
@@ -112,14 +108,10 @@
     @Configured(path = "minimumAlertRepeatGap", defaultValue = "0")
     public long _minimumAlertRepeatGap;
 
-
-
     private static final int MAX_ASYNC_DELIVERIES = 10;
 
-
     private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
 
-
     private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
     private AtomicReference _asynchronousRunner = new AtomicReference(null);
     private AtomicInteger _deliveredMessages = new AtomicInteger();
@@ -127,7 +119,7 @@
     protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
             throws AMQException
     {
-        this(name,durable,owner,autoDelete,virtualHost,new SimpleQueueEntryList.Factory());
+        this(name, durable, owner, autoDelete, virtualHost, new SimpleQueueEntryList.Factory());
     }
 
     protected SimpleAMQQueue(AMQShortString name,
@@ -136,7 +128,7 @@
                              boolean autoDelete,
                              VirtualHost virtualHost,
                              QueueEntryListFactory entryListFactory)
-                throws AMQException
+            throws AMQException
     {
 
         if (name == null)
@@ -168,7 +160,6 @@
             throw new AMQException("AMQQueue MBean creation has failed ", e);
         }
 
-
         // This ensure that the notification checks for the configured alerts are created.
         setMaximumMessageAge(_maximumMessageAge);
         setMaximumMessageCount(_maximumMessageCount);
@@ -204,7 +195,6 @@
         return _virtualHost;
     }
 
-
     // ------ bind and unbind
 
     public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
@@ -227,27 +217,25 @@
         }
 
         boolean removed = _bindings.remove(routingKey, arguments, exchange);
-        if(!removed)
+        if (!removed)
         {
             _logger.error("Mismatch between queue bindings and exchange record of bindings");
         }
     }
 
-
     // ------ Manage Subscriptions
 
     public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException
     {
 
-
-        if(isExclusiveSubscriber())
+        if (isExclusiveSubscriber())
         {
             throw new ExistingExclusiveSubscription();
         }
 
-        if(exclusive)
+        if (exclusive)
         {
-            if(getConsumerCount() != 0)
+            if (getConsumerCount() != 0)
             {
                 throw new ExistingSubscriptionPreventsExclusive();
             }
@@ -258,16 +246,15 @@
             }
         }
 
-
         _activeSubscriberCount.incrementAndGet();
         subscription.setStateListener(this);
-        subscription.setLastSeenEntry(null,_entries.getHead());
+        subscription.setLastSeenEntry(null, _entries.getHead());
 
-        if(!isDeleted())
+        if (!isDeleted())
         {
             subscription.setQueue(this);
             _subscriptionList.add(subscription);
-            if(isDeleted())
+            if (isDeleted())
             {
                 subscription.queueDeleted(this);
             }
@@ -277,39 +264,32 @@
             // TODO
         }
 
-
         deliverAsync(subscription);
 
     }
 
     public synchronized void unregisterSubscription(final Subscription subscription) throws AMQException
     {
-        if(subscription == null)
+        if (subscription == null)
         {
             throw new NullPointerException("subscription argument is null");
         }
 
         boolean removed = _subscriptionList.remove(subscription);
 
-
-
-        if(removed)
+        if (removed)
         {
             subscription.close();
             // No longer can the queue have an exclusive consumer
             setExclusiveSubscriber(null);
 
-
             QueueEntry lastSeen;
 
-            while((lastSeen = subscription.getLastSeenEntry()) != null)
+            while ((lastSeen = subscription.getLastSeenEntry()) != null)
             {
                 subscription.setLastSeenEntry(lastSeen, null);
             }
 
-
-
-
             // auto-delete queues must be deleted if there are no remaining subscribers
 
             if (_autoDelete && getConsumerCount() == 0)
@@ -324,30 +304,25 @@
                 // we need to manually fire the event to the removed subscription (which was the last one left for this
                 // queue. This is because the delete method uses the subscription set which has just been cleared
                 subscription.queueDeleted(this);
-             }
+            }
         }
 
-    
     }
 
-
     // ------ Enqueue / Dequeue
 
     public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException
     {
 
-
-
         incrementQueueCount();
         incrementQueueSize(message);
 
         _totalMessagesReceived.incrementAndGet();
 
-
-        QueueEntry entry; 
+        QueueEntry entry;
         Subscription exclusiveSub = _exclusiveSubscriber;
 
-        if(exclusiveSub != null)
+        if (exclusiveSub != null)
         {
             exclusiveSub.getSendLock();
 
@@ -357,11 +332,10 @@
 
                 deliverToSubscription(exclusiveSub, entry);
 
-
                 // where there is more than one producer there's a reasonable chance that even though there is
                 // no "queueing" we do not deliver because we get an interleving of _entries.add and
                 // deliverToSubscription between threads.  Therefore have one more try. 
-                if(!(entry.isAcquired()  || entry.isDeleted()))
+                if (!(entry.isAcquired() || entry.isDeleted()))
                 {
                     deliverToSubscription(exclusiveSub, entry);
                 }
@@ -381,13 +355,13 @@
              */
             SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get();
             SubscriptionList.SubscriptionNode nextNode = node.getNext();
-            if(nextNode == null)
+            if (nextNode == null)
             {
                 nextNode = _subscriptionList.getHead().getNext();
             }
-            while(nextNode != null)
+            while (nextNode != null)
             {
-                if(_lastSubscriptionNode.compareAndSet(node, nextNode))
+                if (_lastSubscriptionNode.compareAndSet(node, nextNode))
                 {
                     break;
                 }
@@ -395,21 +369,20 @@
                 {
                     node = _lastSubscriptionNode.get();
                     nextNode = node.getNext();
-                    if(nextNode == null)
+                    if (nextNode == null)
                     {
                         nextNode = _subscriptionList.getHead().getNext();
                     }
                 }
             }
 
-
             // always do one extra loop after we believe we've finished
             // this catches the case where we *just* miss an update
             int loops = 2;
 
-            while(!(entry.isAcquired() || entry.isDeleted()) && loops != 0)
+            while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0)
             {
-                if(nextNode == null)
+                if (nextNode == null)
                 {
                     loops--;
                     nextNode = _subscriptionList.getHead();
@@ -425,20 +398,18 @@
             }
         }
 
-
-        if(entry.immediateAndNotDelivered())
+        if (entry.immediateAndNotDelivered())
         {
             dequeue(storeContext, entry);
             entry.dispose(storeContext);
         }
-        else if(!(entry.isAcquired()  || entry.isDeleted()))
+        else if (!(entry.isAcquired() || entry.isDeleted()))
         {
             checkSubscriptionsNotAheadOfDelivery(entry);
 
             deliverAsync();
         }
 
-
         try
         {
             _managedObject.checkForNotification(entry.getMessage());
@@ -448,7 +419,6 @@
             throw new AMQException("Unable to get notification from manage queue: " + e, e);
         }
 
-
         return entry;
 
     }
@@ -460,12 +430,12 @@
         sub.getSendLock();
         try
         {
-            if(subscriptionReadyAndHasInterest(sub, entry)
-               && !sub.isSuspended())
+            if (subscriptionReadyAndHasInterest(sub, entry)
+                && !sub.isSuspended())
             {
-                if( !sub.wouldSuspend(entry))
+                if (!sub.wouldSuspend(entry))
                 {
-                    if(!sub.isBrowser() && !entry.acquire(sub))
+                    if (!sub.isBrowser() && !entry.acquire(sub))
                     {
                         // restore credit here that would have been taken away by wouldSuspend since we didn't manage
                         // to acquire the entry for this subscription
@@ -516,11 +486,11 @@
         // We need to move this subscription on, past entries which are already acquired, or deleted or ones it has no
         // interest in.
         QueueEntry node = sub.getLastSeenEntry();
-        while(node != null && (node.isAcquired() || node.isDeleted() || !sub.hasInterest(node)) )
+        while (node != null && (node.isAcquired() || node.isDeleted() || !sub.hasInterest(node)))
         {
 
             QueueEntry newNode = _entries.next(node);
-            if(newNode != null)
+            if (newNode != null)
             {
                 sub.setLastSeenEntry(node, newNode);
                 node = sub.getLastSeenEntry();
@@ -533,8 +503,7 @@
 
         }
 
-
-        if(node == entry)
+        if (node == entry)
         {
             // If the first entry that subscription can process is the one we are trying to deliver to it, then we are
             // good
@@ -555,11 +524,11 @@
     {
         QueueEntry node = sub.getLastSeenEntry();
 
-        if(node != null && entry.compareTo(node) < 0 && sub.hasInterest(entry))
+        if (node != null && entry.compareTo(node) < 0 && sub.hasInterest(entry))
         {
             do
             {
-                if(sub.setLastSeenEntry(node,entry))
+                if (sub.setLastSeenEntry(node, entry))
                 {
                     return;
                 }
@@ -567,7 +536,8 @@
                 {
                     node = sub.getLastSeenEntry();
                 }
-            } while (node != null && entry.compareTo(node) < 0);
+            }
+            while (node != null && entry.compareTo(node) < 0);
         }
 
     }
@@ -577,28 +547,26 @@
 
         SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
         // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
-        while(subscriberIter.advance())
+        while (subscriberIter.advance())
         {
             Subscription sub = subscriberIter.getNode().getSubscription();
 
             // we don't make browsers send the same stuff twice
-            if(!sub.isBrowser())
+            if (!sub.isBrowser())
             {
                 updateLastSeenEntry(sub, entry);
             }
         }
 
-
         deliverAsync();
 
-
     }
 
     public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
     {
         decrementQueueCount();
         decrementQueueSize(entry);
-        if(entry.acquiredBySubscription())
+        if (entry.acquiredBySubscription())
         {
             _deliveredMessages.decrementAndGet();
         }
@@ -606,7 +574,7 @@
         try
         {
             AMQMessage msg = entry.getMessage();
-            if(isDurable() && msg.isPersistent())
+            if (isDurable() && msg.isPersistent())
             {
                 _virtualHost.getMessageStore().dequeueMessage(storeContext, this, msg.getMessageId());
             }
@@ -626,7 +594,6 @@
             throw new FailedDequeueException(_name.toString(), e);
         }
 
-
     }
 
     private void decrementQueueSize(final QueueEntry entry)
@@ -647,7 +614,7 @@
         subscription.getSendLock();
         try
         {
-            if(!subscription.isClosed())
+            if (!subscription.isClosed())
             {
                 deliverMessage(subscription, entry);
                 return true;
@@ -663,10 +630,6 @@
         }
     }
 
-
-
-
-
     public int getConsumerCount()
     {
         return _subscriptionList.size();
@@ -700,7 +663,7 @@
     public int getUndeliveredMessageCount()
     {
         int count = getMessageCount() - _deliveredMessages.get();
-        if(count < 0)
+        if (count < 0)
         {
             return 0;
         }
@@ -710,7 +673,6 @@
         }
     }
 
-
     public long getReceivedMessageCount()
     {
         return _totalMessagesReceived.get();
@@ -732,16 +694,14 @@
         return _deleted.get();
     }
 
-
-
     public List<QueueEntry> getMessagesOnTheQueue()
     {
         ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
         QueueEntryIterator queueListIterator = _entries.iterator();
-        while(queueListIterator.advance())
+        while (queueListIterator.advance())
         {
             QueueEntry node = queueListIterator.getNode();
-            if(node != null && !node.isDeleted())
+            if (node != null && !node.isDeleted())
             {
                 entryList.add(node);
             }
@@ -752,14 +712,14 @@
 
     public void stateChange(Subscription sub, Subscription.State oldState, Subscription.State newState)
     {
-        if(oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE)
+        if (oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE)
         {
             _activeSubscriberCount.decrementAndGet();
 
         }
-        else if(newState == Subscription.State.ACTIVE)
-        {                                                                  
-            if(oldState != Subscription.State.ACTIVE)
+        else if (newState == Subscription.State.ACTIVE)
+        {
+            if (oldState != Subscription.State.ACTIVE)
             {
                 _activeSubscriberCount.incrementAndGet();
 
@@ -800,55 +760,52 @@
         public boolean filterComplete();
     }
 
-
-
     public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
     {
         return getMessagesOnTheQueue(new QueueEntryFilter()
-                                        {
+        {
 
-                                            public boolean accept(QueueEntry entry)
-                                            {
-                                                final long messageId = entry.getMessage().getMessageId();
-                                                return messageId >= fromMessageId && messageId <= toMessageId;
-                                            }
+            public boolean accept(QueueEntry entry)
+            {
+                final long messageId = entry.getMessage().getMessageId();
+                return messageId >= fromMessageId && messageId <= toMessageId;
+            }
 
-                                            public boolean filterComplete()
-                                            {
-                                                return false;
-                                            }
-                                        });
+            public boolean filterComplete()
+            {
+                return false;
+            }
+        });
     }
 
     public QueueEntry getMessageOnTheQueue(final long messageId)
     {
         List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
-                                        {
-                                            private boolean _complete;
+        {
+            private boolean _complete;
 
-                                            public boolean accept(QueueEntry entry)
-                                            {
-                                                _complete = entry.getMessage().getMessageId() == messageId;
-                                                return _complete;
-                                            }
+            public boolean accept(QueueEntry entry)
+            {
+                _complete = entry.getMessage().getMessageId() == messageId;
+                return _complete;
+            }
 
-                                            public boolean filterComplete()
-                                            {
-                                                return _complete;
-                                            }
-                                        });
+            public boolean filterComplete()
+            {
+                return _complete;
+            }
+        });
         return entries.isEmpty() ? null : entries.get(0);
     }
 
-
     public List<QueueEntry> getMessagesOnTheQueue(QueueEntryFilter filter)
     {
         ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
         QueueEntryIterator queueListIterator = _entries.iterator();
-        while(queueListIterator.advance() && !filter.filterComplete())
+        while (queueListIterator.advance() && !filter.filterComplete())
         {
             QueueEntry node = queueListIterator.getNode();
-            if(!node.isDeleted() && filter.accept(node))
+            if (!node.isDeleted() && filter.accept(node))
             {
                 entryList.add(node);
             }
@@ -857,7 +814,6 @@
 
     }
 
-
     public void moveMessagesToAnotherQueue(final long fromMessageId,
                                            final long toMessageId,
                                            String queueName,
@@ -867,24 +823,22 @@
         AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
         MessageStore store = getVirtualHost().getMessageStore();
 
-
         List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
-                                        {
-
-                                            public boolean accept(QueueEntry entry)
-                                            {
-                                                final long messageId = entry.getMessage().getMessageId();
-                                                return (messageId >= fromMessageId)
-                                                       && (messageId <= toMessageId)
-                                                       && entry.acquire();
-                                            }
+        {
 
-                                            public boolean filterComplete()
-                                            {
-                                                return false;
-                                            }
-                                        });
+            public boolean accept(QueueEntry entry)
+            {
+                final long messageId = entry.getMessage().getMessageId();
+                return (messageId >= fromMessageId)
+                       && (messageId <= toMessageId)
+                       && entry.acquire();
+            }
 
+            public boolean filterComplete()
+            {
+                return false;
+            }
+        });
 
         try
         {
@@ -895,7 +849,7 @@
             {
                 AMQMessage message = entry.getMessage();
 
-                if(message.isPersistent() && toQueue.isDurable())
+                if (message.isPersistent() && toQueue.isDurable())
                 {
                     store.enqueueMessage(storeContext, toQueue, message.getMessageId());
                 }
@@ -943,7 +897,6 @@
             throw new RuntimeException(e);
         }
 
-
     }
 
     public void copyMessagesToAnotherQueue(final long fromMessageId,
@@ -954,30 +907,29 @@
         AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
         MessageStore store = getVirtualHost().getMessageStore();
 
-
         List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
-                                        {
+        {
 
-                                            public boolean accept(QueueEntry entry)
-                                            {
-                                                final long messageId = entry.getMessage().getMessageId();
-                                                if((messageId >= fromMessageId)
-                                                       && (messageId <= toMessageId))
-                                                {
-                                                    if(!entry.isDeleted())
-                                                    {
-                                                        return entry.getMessage().incrementReference();
-                                                    }
-                                                }
+            public boolean accept(QueueEntry entry)
+            {
+                final long messageId = entry.getMessage().getMessageId();
+                if ((messageId >= fromMessageId)
+                    && (messageId <= toMessageId))
+                {
+                    if (!entry.isDeleted())
+                    {
+                        return entry.getMessage().incrementReference();
+                    }
+                }
 
-                                                return false;
-                                            }
+                return false;
+            }
 
-                                            public boolean filterComplete()
-                                            {
-                                                return false;
-                                            }
-                                        });
+            public boolean filterComplete()
+            {
+                return false;
+            }
+        });
 
         try
         {
@@ -988,7 +940,7 @@
             {
                 AMQMessage message = entry.getMessage();
 
-                if(message.isReferenced() && message.isPersistent() && toQueue.isDurable())
+                if (message.isReferenced() && message.isPersistent() && toQueue.isDurable())
                 {
                     store.enqueueMessage(storeContext, toQueue, message.getMessageId());
                 }
@@ -1021,7 +973,7 @@
         {
             for (QueueEntry entry : entries)
             {
-                if(entry.getMessage().isReferenced())
+                if (entry.getMessage().isReferenced())
                 {
                     toQueue.enqueue(storeContext, entry.getMessage());
                 }
@@ -1036,7 +988,6 @@
             throw new RuntimeException(e);
         }
 
-
     }
 
     public void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext)
@@ -1046,17 +997,16 @@
         {
             QueueEntryIterator queueListIterator = _entries.iterator();
 
-
-            while(queueListIterator.advance())
+            while (queueListIterator.advance())
             {
                 QueueEntry node = queueListIterator.getNode();
 
                 final long messageId = node.getMessage().getMessageId();
 
-                if((messageId >= fromMessageId)
-                           && (messageId <= toMessageId)
-                           && !node.isDeleted()
-                           && node.acquire())
+                if ((messageId >= fromMessageId)
+                    && (messageId <= toMessageId)
+                    && !node.isDeleted()
+                    && node.acquire())
                 {
                     node.discard(storeContext);
                 }
@@ -1072,16 +1022,15 @@
 
     // ------ Management functions
 
-
     public void deleteMessageFromTop(StoreContext storeContext) throws AMQException
     {
         QueueEntryIterator queueListIterator = _entries.iterator();
         boolean noDeletes = true;
 
-        while(noDeletes && queueListIterator.advance() )
+        while (noDeletes && queueListIterator.advance())
         {
             QueueEntry node = queueListIterator.getNode();
-            if(!node.isDeleted() && node.acquire())
+            if (!node.isDeleted() && node.acquire())
             {
                 node.discard(storeContext);
                 noDeletes = false;
@@ -1096,10 +1045,10 @@
         QueueEntryIterator queueListIterator = _entries.iterator();
         long count = 0;
 
-        while(queueListIterator.advance())
+        while (queueListIterator.advance())
         {
             QueueEntry node = queueListIterator.getNode();
-            if(!node.isDeleted() && node.acquire())
+            if (!node.isDeleted() && node.acquire())
             {
                 node.discard(storeContext);
                 count++;
@@ -1110,7 +1059,6 @@
 
     }
 
-
     public void addQueueDeleteTask(final Task task)
     {
         _deleteTaskList.add(task);
@@ -1126,7 +1074,7 @@
             while (subscriptionIter.advance())
             {
                 Subscription s = subscriptionIter.getNode().getSubscription();
-                if(s != null)
+                if (s != null)
                 {
                     s.queueDeleted(this);
                 }
@@ -1135,7 +1083,6 @@
             _bindings.deregister();
             _virtualHost.getQueueRegistry().unregisterQueue(_name);
 
-
             _managedObject.unregister();
             for (Task task : _deleteTaskList)
             {
@@ -1149,15 +1096,14 @@
 
     }
 
-
     public void deliverAsync()
     {
         _stateChangeCount.incrementAndGet();
 
         Runner runner = new Runner();
 
-        if(_asynchronousRunner.compareAndSet(null,runner))
-        {            
+        if (_asynchronousRunner.compareAndSet(null, runner))
+        {
             _asyncDelivery.execute(runner);
         }
     }
@@ -1193,12 +1139,10 @@
         }
     }
 
-
     private class SubFlushRunner implements ReadWriteRunnable
     {
         private final Subscription _sub;
 
-
         public SubFlushRunner(Subscription sub)
         {
             _sub = sub;
@@ -1216,7 +1160,7 @@
             {
                 _logger.error(e);
             }
-            if(!complete && !_sub.isSuspended())
+            if (!complete && !_sub.isSuspended())
             {
                 _asyncDelivery.execute(this);
             }
@@ -1244,25 +1188,25 @@
         boolean atTail = false;
         boolean advanced;
 
-        while(!sub.isSuspended() && !atTail && deliveries != 0)
+        while (!sub.isSuspended() && !atTail && deliveries != 0)
         {
 
             advanced = false;
             sub.getSendLock();
             try
             {
-                if(sub.isActive())
+                if (sub.isActive())
                 {
                     QueueEntry node = moveSubscriptionToNextNode(sub);
-                    if(!(node.isAcquired() || node.isDeleted()))
+                    if (!(node.isAcquired() || node.isDeleted()))
                     {
-                        if(!sub.isSuspended())
+                        if (!sub.isSuspended())
                         {
-                            if(sub.hasInterest(node))
+                            if (sub.hasInterest(node))
                             {
-                                if(!sub.wouldSuspend(node))
+                                if (!sub.wouldSuspend(node))
                                 {
-                                    if(!sub.isBrowser() && !node.acquire(sub))
+                                    if (!sub.isBrowser() && !node.acquire(sub))
                                     {
                                         sub.restoreCredit(node);
 
@@ -1272,11 +1216,11 @@
                                         deliveries--;
                                         deliverMessage(sub, node);
 
-                                        if(sub.isBrowser())
+                                        if (sub.isBrowser())
                                         {
                                             QueueEntry newNode = _entries.next(node);
 
-                                            if(newNode != null)
+                                            if (newNode != null)
                                             {
                                                 advanced = true;
                                                 sub.setLastSeenEntry(node, newNode);
@@ -1295,7 +1239,7 @@
                             {
                                 // this subscription is not interested in this node so we can skip over it
                                 QueueEntry newNode = _entries.next(node);
-                                if(newNode != null)
+                                if (newNode != null)
                                 {
                                     sub.setLastSeenEntry(node, newNode);
                                 }
@@ -1318,12 +1262,12 @@
         // next entry they are interested in yet.  This would lead to holding on to references to expired messages, etc
         // which would give us memory "leak".
 
-        if(!isExclusiveSubscriber())
+        if (!isExclusiveSubscriber())
         {
             advanceAllSubscriptions();
         }
 
-        if(atTail && sub.isAutoClose())
+        if (atTail && sub.isAutoClose())
         {
             unregisterSubscription(sub);
 
@@ -1337,7 +1281,7 @@
     protected void advanceAllSubscriptions() throws AMQException
     {
         SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
-        while(subscriberIter.advance())
+        while (subscriberIter.advance())
         {
             SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode();
             Subscription sub = subNode.getSubscription();
@@ -1349,19 +1293,19 @@
             throws AMQException
     {
         QueueEntry node = sub.getLastSeenEntry();
-        
-        while(node != null && (node.isAcquired() || node.isDeleted() || node.expired()))
+
+        while (node != null && (node.isAcquired() || node.isDeleted() || node.expired()))
         {
-            if(!node.isAcquired() && !node.isDeleted() && node.expired())
+            if (!node.isAcquired() && !node.isDeleted() && node.expired())
             {
-                if(node.acquire())
+                if (node.acquire())
                 {
                     final StoreContext reapingStoreContext = new StoreContext();
                     node.discard(reapingStoreContext);
                 }
             }
             QueueEntry newNode = _entries.next(node);
-            if(newNode != null)
+            if (newNode != null)
             {
                 sub.setLastSeenEntry(node, newNode);
                 node = sub.getLastSeenEntry();
@@ -1375,7 +1319,6 @@
         return node;
     }
 
-
     private void processQueue(Runnable runner) throws AMQException
     {
         long stateChangeCount;
@@ -1385,51 +1328,51 @@
         int extraLoops = 1;
         int deliveries = MAX_ASYNC_DELIVERIES;
 
-        _asynchronousRunner.compareAndSet(runner,null);
+        _asynchronousRunner.compareAndSet(runner, null);
 
-        while(deliveries != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete ) && _asynchronousRunner.compareAndSet(null,runner))
+        while (deliveries != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner))
         {
             // we want to have one extra loop after every subscription has reached the point where it cannot move
             // further, just in case the advance of one subscription in the last loop allows a different subscription to
             // move forward in the next iteration
 
-            if(previousStateChangeCount != stateChangeCount)
+            if (previousStateChangeCount != stateChangeCount)
             {
                 extraLoops = 1;
             }
-            
+
             previousStateChangeCount = stateChangeCount;
             deliveryIncomplete = _subscriptionList.size() != 0;
             boolean done = true;
 
-
             SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
             //iterate over the subscribers and try to advance their pointer
-            while(subscriptionIter.advance())
+            while (subscriptionIter.advance())
             {
+                boolean closeConsumer = false;
                 Subscription sub = subscriptionIter.getNode().getSubscription();
-                if(sub != null)
+                if (sub != null)
                 {
                     sub.getSendLock();
                     try
                     {
                         QueueEntry node = moveSubscriptionToNextNode(sub);
 
-                        if(node != null && sub.isActive())
+                        if (node != null && sub.isActive())
                         {
                             boolean advanced = false;
                             boolean subActive = false;
 
-                            if(!(node.isAcquired() || node.isDeleted()))
+                            if (!(node.isAcquired() || node.isDeleted()))
                             {
-                                if(!sub.isSuspended())
+                                if (!sub.isSuspended())
                                 {
                                     subActive = true;
-                                    if(sub.hasInterest(node))
+                                    if (sub.hasInterest(node))
                                     {
-                                        if(!sub.wouldSuspend(node))
+                                        if (!sub.wouldSuspend(node))
                                         {
-                                            if(!sub.isBrowser() && !node.acquire(sub))
+                                            if (!sub.isBrowser() && !node.acquire(sub))
                                             {
                                                 sub.restoreCredit(node);
 
@@ -1439,32 +1382,31 @@
                                                 deliverMessage(sub, node);
                                                 deliveries--;
 
-                                                if(sub.isBrowser())
+                                                if (sub.isBrowser())
                                                 {
                                                     QueueEntry newNode = _entries.next(node);
 
-                                                    if(newNode != null)
+                                                    if (newNode != null)
                                                     {
                                                         sub.setLastSeenEntry(node, newNode);
                                                         node = sub.getLastSeenEntry();
                                                         advanced = true;
                                                     }
 
-
                                                 }
                                             }
                                             done = false;
                                         }
                                         else
                                         {
-                                            node.addStateChangeListener(new QueueEntryListener(sub,node));
+                                            node.addStateChangeListener(new QueueEntryListener(sub, node));
                                         }
                                     }
                                     else
                                     {
                                         // this subscription is not interested in this node so we can skip over it
                                         QueueEntry newNode = _entries.next(node);
-                                        if(newNode != null)
+                                        if (newNode != null)
                                         {
                                             sub.setLastSeenEntry(node, newNode);
                                         }
@@ -1475,25 +1417,26 @@
 
                             done = done && (!subActive || atTail);
 
-                            if(atTail && !advanced && sub.isAutoClose())
-                            {
-                                unregisterSubscription(sub);
-
-                                ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
-                                converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
-
-                            }
-
+                            closeConsumer = (atTail && !advanced && sub.isAutoClose());
                         }
                     }
                     finally
                     {
                         sub.releaseSendLock();
                     }
+
+                    if (closeConsumer)
+                    {
+                        unregisterSubscription(sub);
+
+                        ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
+                        converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
+                    }
+
                 }
-                if(done)
+                if (done)
                 {
-                    if(extraLoops == 0)
+                    if (extraLoops == 0)
                     {
                         deliveryIncomplete = false;
                     }
@@ -1508,20 +1451,17 @@
                 }
             }
 
-
-
             _asynchronousRunner.set(null);
         }
 
         // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit
         // therefore we should schedule this runner again (unless someone beats us to it :-) ).
-        if(deliveries == 0 && _asynchronousRunner.compareAndSet(null,runner))
+        if (deliveries == 0 && _asynchronousRunner.compareAndSet(null, runner))
         {
             _asyncDelivery.execute(runner);
         }
     }
 
-
     public void removeExpiredIfNoSubscribers() throws AMQException
     {
 
@@ -1529,10 +1469,10 @@
 
         QueueEntryIterator queueListIterator = _entries.iterator();
 
-        while(queueListIterator.advance())
+        while (queueListIterator.advance())
         {
             QueueEntry node = queueListIterator.getNode();
-            if(!node.isDeleted() && node.expired() && node.acquire())
+            if (!node.isDeleted() && node.expired() && node.acquire())
             {
 
                 node.discard(storeContext);
@@ -1542,7 +1482,6 @@
 
     }
 
-
     public long getMinimumAlertRepeatGap()
     {
         return _minimumAlertRepeatGap;
@@ -1561,7 +1500,7 @@
     public void setMaximumMessageAge(long maximumMessageAge)
     {
         _maximumMessageAge = maximumMessageAge;
-        if(maximumMessageAge == 0L)
+        if (maximumMessageAge == 0L)
         {
             _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT);
         }
@@ -1579,7 +1518,7 @@
     public void setMaximumMessageCount(final long maximumMessageCount)
     {
         _maximumMessageCount = maximumMessageCount;
-        if(maximumMessageCount == 0L)
+        if (maximumMessageCount == 0L)
         {
             _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT);
         }
@@ -1588,8 +1527,6 @@
             _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT);
         }
 
-
-
     }
 
     public long getMaximumQueueDepth()
@@ -1601,7 +1538,7 @@
     public void setMaximumQueueDepth(final long maximumQueueDepth)
     {
         _maximumQueueDepth = maximumQueueDepth;
-        if(maximumQueueDepth == 0L)
+        if (maximumQueueDepth == 0L)
         {
             _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT);
         }
@@ -1620,7 +1557,7 @@
     public void setMaximumMessageSize(final long maximumMessageSize)
     {
         _maximumMessageSize = maximumMessageSize;
-        if(maximumMessageSize == 0L)
+        if (maximumMessageSize == 0L)
         {
             _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT);
         }
@@ -1630,7 +1567,6 @@
         }
     }
 
-
     public Set<NotificationCheck> getNotificationChecks()
     {
         return _notificationChecks;
@@ -1654,7 +1590,7 @@
 
         public boolean equals(Object o)
         {
-            return _entry == ((QueueEntryListener)o)._entry && _sub == ((QueueEntryListener)o)._sub;
+            return _entry == ((QueueEntryListener) o)._entry && _sub == ((QueueEntryListener) o)._sub;
         }
 
         public int hashCode()