You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/15 14:16:10 UTC

svn commit: r1769837 [2/4] - in /qpid/java/trunk: ./ bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/ broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/flow/ broker-core...

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Tue Nov 15 14:16:10 2016
@@ -37,6 +37,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -57,6 +58,7 @@ import java.util.zip.GZIPOutputStream;
 
 import javax.security.auth.Subject;
 
+import com.google.common.collect.Lists;
 import com.google.common.io.ByteStreams;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -113,15 +115,13 @@ import org.apache.qpid.server.util.Conne
 import org.apache.qpid.server.util.Deletable;
 import org.apache.qpid.server.util.MapValueConverter;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.server.util.StateChangeListener;
+import org.apache.qpid.server.virtualhost.HouseKeepingTask;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
-import org.apache.qpid.transport.TransportException;
 
 public abstract class AbstractQueue<X extends AbstractQueue<X>>
         extends AbstractConfiguredObject<X>
         implements Queue<X>,
-                   StateChangeListener<QueueConsumer<?>, State>,
                    MessageGroupManager.ConsumerResetHelper
 {
 
@@ -147,18 +147,13 @@ public abstract class AbstractQueue<X ex
     private final QueueManagingVirtualHost<?> _virtualHost;
     private final DeletedChildListener _deletedChildListener = new DeletedChildListener();
 
-    private final AccessControlContext _immediateDeliveryContext;
+    private QueueConsumerManagerImpl _queueConsumerManager;
 
     @ManagedAttributeField( beforeSet = "preSetAlternateExchange", afterSet = "postSetAlternateExchange")
     private Exchange _alternateExchange;
 
-
-    private final QueueConsumerList _consumerList = new QueueConsumerList();
-
     private volatile QueueConsumer<?> _exclusiveSubscriber;
 
-
-
     private final AtomicLong _targetQueueSize = new AtomicLong(INITIAL_TARGET_QUEUE_SIZE);
 
     private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
@@ -206,11 +201,8 @@ public abstract class AbstractQueue<X ex
             Collections.synchronizedSet(EnumSet.noneOf(NotificationCheck.class));
 
 
-    private volatile int _maxAsyncDeliveries;
     private volatile long _estimatedAverageMessageHeaderSize;
 
-    private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
-
     private AtomicBoolean _stopped = new AtomicBoolean(false);
 
     private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>();
@@ -268,12 +260,52 @@ public abstract class AbstractQueue<X ex
 
     private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = new ConcurrentLinkedQueue<>();
 
-    private final QueueRunner _queueRunner;
     private boolean _closing;
     private final ConcurrentMap<String, Callable<MessageFilter>> _defaultFiltersMap = new ConcurrentHashMap<>();
     private final List<HoldMethod> _holdMethods = new CopyOnWriteArrayList<>();
     private Map<String, String> _mimeTypeToFileExtension = Collections.emptyMap();
-    private volatile boolean _hasPullOnlyConsumers;
+    private AdvanceConsumersTask _queueHouseKeepingTask;
+
+    void setNotifyWorkDesired(final QueueConsumer consumer, final boolean desired)
+    {
+        if (_queueConsumerManager.setInterest(consumer, desired))
+        {
+            if (desired)
+            {
+                _activeSubscriberCount.incrementAndGet();
+                notifyConsumer(consumer);
+            }
+            else
+            {
+                _activeSubscriberCount.decrementAndGet();
+
+                // iterate over interested and notify one as long as its priority is higher than any notified
+                final Iterator<QueueConsumer<?>> consumerIterator = _queueConsumerManager.getInterestedIterator();
+                final int highestNotifiedPriority = _queueConsumerManager.getHighestNotifiedPriority();
+                while (consumerIterator.hasNext())
+                {
+                    QueueConsumer<?> queueConsumer = consumerIterator.next();
+                    if (queueConsumer.getPriority() < highestNotifiedPriority || notifyConsumer(queueConsumer))
+                    {
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    private boolean notifyConsumer(final QueueConsumer<?> consumer)
+    {
+        if(_queueConsumerManager.setNotified(consumer, true))
+        {
+            consumer.notifyWork();
+            return true;
+        }
+        else
+        {
+            return false;
+        }
+    }
 
     private interface HoldMethod
     {
@@ -284,12 +316,7 @@ public abstract class AbstractQueue<X ex
     {
         super(parentsMap(virtualHost), attributes);
 
-
         _virtualHost = virtualHost;
-        _immediateDeliveryContext = getSystemTaskControllerContext("Immediate Delivery", virtualHost.getPrincipal());
-
-        _queueRunner = new QueueRunner(this, getSystemTaskControllerContext("Queue Delivery",
-                                                                            virtualHost.getPrincipal()));
     }
 
     @Override
@@ -355,8 +382,9 @@ public abstract class AbstractQueue<X ex
 
         _arguments = Collections.synchronizedMap(arguments);
 
+        _queueConsumerManager = new QueueConsumerManagerImpl(this);
         _logSubject = new QueueLogSubject(this);
-
+        _queueHouseKeepingTask = new AdvanceConsumersTask();
         Subject activeSubject = Subject.getSubject(AccessController.getContext());
         Set<SessionPrincipal> sessionPrincipals = activeSubject == null ? Collections.<SessionPrincipal>emptySet() : activeSubject.getPrincipals(SessionPrincipal.class);
         AMQSessionModel<?> sessionModel;
@@ -478,7 +506,6 @@ public abstract class AbstractQueue<X ex
         }
 
         _estimatedAverageMessageHeaderSize = getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD);
-        _maxAsyncDeliveries = getContextValue(Integer.class, Queue.MAX_ASYNCHRONOUS_DELIVERIES);
         _mimeTypeToFileExtension = getContextValue(Map.class, MAP_OF_STRING_STRING, MIME_TYPE_TO_FILE_EXTENSION);
 
         if(_defaultFilters != null)
@@ -712,12 +739,12 @@ public abstract class AbstractQueue<X ex
                                          final EnumSet<ConsumerImpl.Option> optionSet,
                                          final Integer priority)
             throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
-                   ConsumerAccessRefused
+                   ConsumerAccessRefused, QueueDeleted
     {
 
         try
         {
-            return getTaskExecutor().run(new Task<QueueConsumerImpl, Exception>()
+            final QueueConsumerImpl queueConsumer = getTaskExecutor().run(new Task<QueueConsumerImpl, Exception>()
             {
                 @Override
                 public QueueConsumerImpl execute() throws Exception
@@ -743,9 +770,19 @@ public abstract class AbstractQueue<X ex
                     return "target=" + target + ", consumerName=" + consumerName + ", optionSet=" + optionSet;
                 }
             });
-        }
-        catch (ExistingExclusiveConsumer | ConsumerAccessRefused |
-            ExistingConsumerPreventsExclusive | RuntimeException e)
+
+            target.consumerAdded(queueConsumer);
+            if(isEmpty())
+            {
+                target.queueEmpty();
+            }
+            target.updateNotifyWorkDesired();
+            target.notifyWork();
+            return queueConsumer;
+        }
+        catch (ExistingExclusiveConsumer | ConsumerAccessRefused
+                | ExistingConsumerPreventsExclusive | QueueDeleted
+                | RuntimeException e)
         {
             throw e;
         }
@@ -765,8 +802,13 @@ public abstract class AbstractQueue<X ex
                                                   EnumSet<ConsumerImpl.Option> optionSet,
                                                   final Integer priority)
             throws ExistingExclusiveConsumer, ConsumerAccessRefused,
-                   ExistingConsumerPreventsExclusive
+                   ExistingConsumerPreventsExclusive, QueueDeleted
     {
+        if (isDeleted())
+        {
+            throw new QueueDeleted();
+        }
+
         if (hasExclusiveConsumer())
         {
             throw new ExistingExclusiveConsumer();
@@ -902,20 +944,12 @@ public abstract class AbstractQueue<X ex
                                                            priority);
 
         _exclusiveOwner = exclusiveOwner;
-        target.consumerAdded(consumer);
-
 
         if (exclusive && !isTransient)
         {
             _exclusiveSubscriber = consumer;
         }
 
-        if(consumer.isActive())
-        {
-            _activeSubscriberCount.incrementAndGet();
-        }
-
-        consumer.setStateListener(this);
         QueueContext queueContext;
         if(filters == null || !filters.startAtTail())
         {
@@ -927,36 +961,15 @@ public abstract class AbstractQueue<X ex
         }
         consumer.setQueueContext(queueContext);
 
-        if (!isDeleted())
+        _queueConsumerManager.addConsumer(consumer);
+        if (consumer.isNotifyWorkDesired())
         {
-            if(consumer.isPullOnly())
-            {
-                _hasPullOnlyConsumers = true;
-            }
-            _consumerList.add(consumer);
-
-            if (isDeleted())
-            {
-                consumer.queueDeleted();
-            }
-        }
-        else
-        {
-            // TODO
+            _activeSubscriberCount.incrementAndGet();
         }
 
         childAdded(consumer);
         consumer.addChangeListener(_deletedChildListener);
 
-        if(consumer.isPullOnly())
-        {
-            consumer.getSessionModel().getAMQPConnection().notifyWork();
-        }
-        else
-        {
-            deliverAsync();
-        }
-
         return consumer;
     }
 
@@ -976,7 +989,7 @@ public abstract class AbstractQueue<X ex
             throw new NullPointerException("consumer argument is null");
         }
 
-        boolean removed = _consumerList.remove(consumer);
+        boolean removed = _queueConsumerManager.removeConsumer(consumer);
 
         if (removed)
         {
@@ -996,18 +1009,6 @@ public abstract class AbstractQueue<X ex
                 resetSubPointersForGroups(consumer);
             }
 
-            if(consumer.isPullOnly())
-            {
-                boolean hasOnlyPushConsumers = true;
-                ConsumerNode consumerNode = _consumerList.getHead().findNext();
-                while (consumerNode != null && hasOnlyPushConsumers)
-                {
-                    hasOnlyPushConsumers = !consumerNode.getConsumer().isPullOnly();
-                    consumerNode = consumerNode.findNext();
-                }
-                _hasPullOnlyConsumers = !hasOnlyPushConsumers;
-            }
-
             // auto-delete queues must be deleted if there are no remaining subscribers
 
             if(!consumer.isTransient()
@@ -1042,16 +1043,10 @@ public abstract class AbstractQueue<X ex
     @Override
     public Collection<QueueConsumer<?>> getConsumers()
     {
-        List<QueueConsumer<?>> consumers = new ArrayList<QueueConsumer<?>>();
-        ConsumerNodeIterator iter = _consumerList.iterator();
-        while(iter.advance())
-        {
-            consumers.add(iter.getNode().getConsumer());
-        }
-        return consumers;
-
+        return Lists.newArrayList(_queueConsumerManager.getAllIterator());
     }
 
+
     public void resetSubPointersForGroups(QueueConsumer<?> consumer)
     {
         QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer);
@@ -1063,24 +1058,6 @@ public abstract class AbstractQueue<X ex
         }
     }
 
-    @Override
-    public void resetSubPointersForGroups(final QueueEntry entry)
-    {
-        ConsumerNodeIterator subscriberIter = _consumerList.iterator();
-        // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
-        while (subscriberIter.advance())
-        {
-            QueueConsumer<?> sub = subscriberIter.getNode().getConsumer();
-
-            // we don't make browsers send the same stuff twice
-            if (sub.seesRequeues())
-            {
-                updateSubRequeueEntry(sub, entry);
-            }
-        }
-        notifyPullOnlyConsumers();
-        deliverAsync();
-    }
 
     public void addBinding(final Binding<?> binding)
     {
@@ -1152,7 +1129,6 @@ public abstract class AbstractQueue<X ex
 
     public final void recover(ServerMessage message, final MessageEnqueueRecord enqueueRecord)
     {
-
         doEnqueue(message, null, enqueueRecord);
     }
 
@@ -1197,25 +1173,10 @@ public abstract class AbstractQueue<X ex
 
         try
         {
-            if (action != null || (exclusiveSub == null  && _queueRunner.isIdle()))
-            {
-                AccessController.doPrivileged(
-                        new PrivilegedAction<Void>()
-                        {
-                            @Override
-                            public Void run()
-                            {
-                                tryDeliverStraightThrough(entry);
-                                return null;
-                            }
-                        }, _immediateDeliveryContext);
-            }
-
             if (entry.isAvailable())
             {
                 checkConsumersNotAheadOfDelivery(entry);
-                notifyPullOnlyConsumers();
-                deliverAsync();
+                notifyConsumers(entry);
             }
 
             checkForNotificationOnNewMessage(entry.getMessage());
@@ -1264,125 +1225,6 @@ public abstract class AbstractQueue<X ex
         }
     }
 
-    /**
-     * iterate over consumers and if any is at the end of the queue and can deliver this message,
-     * then deliver the message
-     */
-    private void tryDeliverStraightThrough(final QueueEntry entry)
-    {
-        try
-        {
-            ConsumerNode node = _consumerList.getMarkedNode();
-            ConsumerNode nextNode = node.findNext();
-            if (nextNode == null)
-            {
-                nextNode = _consumerList.getHead().findNext();
-            }
-            while (nextNode != null)
-            {
-                if (_consumerList.updateMarkedNode(node, nextNode))
-                {
-                    break;
-                }
-                else
-                {
-                    node = _consumerList.getMarkedNode();
-                    nextNode = node.findNext();
-                    if (nextNode == null)
-                    {
-                        nextNode = _consumerList.getHead().findNext();
-                    }
-                }
-            }
-            // always do one extra loop after we believe we've finished
-            // this catches the case where we *just* miss an update
-            int loops = 2;
-
-            while (entry.isAvailable() && loops != 0)
-            {
-                if (nextNode == null)
-                {
-                    loops--;
-                    nextNode = _consumerList.getHead();
-                }
-                else
-                {
-                    // if consumer at end, and active, offer
-                    final QueueConsumer<?> sub = nextNode.getConsumer();
-
-                    if(sub.getPriority() == Integer.MAX_VALUE)
-                    {
-                        deliverToConsumer(sub, entry);
-                    }
-
-                }
-                nextNode = nextNode.findNext();
-
-            }
-        }
-        catch (ConnectionScopedRuntimeException | TransportException e)
-        {
-            String errorMessage = "Suppressing " + e.getClass().getSimpleName() +
-                              " during straight through delivery, as this" +
-                              " can only indicate an issue with a consumer.";
-            if(_logger.isDebugEnabled())
-            {
-                _logger.debug(errorMessage, e);
-            }
-            else
-            {
-                _logger.info(errorMessage + ' ' + e.getMessage());
-            }
-        }
-    }
-
-    private void deliverToConsumer(final QueueConsumer<?> sub, final QueueEntry entry)
-    {
-
-        if(sub.trySendLock())
-        {
-            try
-            {
-                // get available queue entry first in order to avoid referring old deleted queue entry in sub._queueContext._lastSeen
-                if ((getNextAvailableEntry(sub) == entry)
-                    && !sub.isSuspended()
-                    && sub.hasInterest(entry)
-                    && mightAssign(sub, entry)
-                    && !sub.wouldSuspend(entry))
-                {
-
-                    MessageReference messageReference = null;
-                    try
-                    {
-
-                        if ((sub.acquires() && !assign(sub, entry))
-                            || (!sub.acquires() && (messageReference = entry.newMessageReference()) == null))
-                        {
-                            // restore credit here that would have been taken away by wouldSuspend since we didn't manage
-                            // to acquire the entry for this consumer
-                            sub.restoreCredit(entry);
-                        }
-                        else
-                        {
-                            deliverMessage(sub, entry, false, true);
-                        }
-                    }
-                    finally
-                    {
-                        if (messageReference != null)
-                        {
-                            messageReference.release();
-                        }
-                    }
-                }
-            }
-            finally
-            {
-                sub.releaseSendLock();
-            }
-        }
-    }
-
     private boolean assign(final QueueConsumer<?> sub, final QueueEntry entry)
     {
         if(_messageGroupManager == null)
@@ -1427,20 +1269,6 @@ public abstract class AbstractQueue<X ex
         return _queueStatistics.getEnqueueCount();
     }
 
-    private void deliverMessage(final QueueConsumer<?> sub,
-                                final QueueEntry entry,
-                                boolean batch,
-                                final boolean updateLastSeen)
-    {
-        if(updateLastSeen)
-        {
-            setLastSeenEntry(sub, entry);
-        }
-
-        sub.send(entry, batch);
-    }
-
-
     private void setLastSeenEntry(final QueueConsumer<?> sub, final QueueEntry entry)
     {
         QueueContext subContext = sub.getQueueContext();
@@ -1458,7 +1286,6 @@ public abstract class AbstractQueue<X ex
 
     private void updateSubRequeueEntry(final QueueConsumer<?> sub, final QueueEntry entry)
     {
-
         QueueContext subContext = sub.getQueueContext();
         if(subContext != null)
         {
@@ -1468,19 +1295,33 @@ public abstract class AbstractQueue<X ex
             {
                 if(QueueContext._releasedUpdater.compareAndSet(subContext, oldEntry, entry))
                 {
+                    notifyConsumer(sub);
                     break;
                 }
             }
         }
     }
 
+
+    @Override
+    public void resetSubPointersForGroups(final QueueEntry entry)
+    {
+        resetSubPointers(entry, true);
+    }
+
+    @Override
     public void requeue(QueueEntry entry)
     {
-        ConsumerNodeIterator subscriberIter = _consumerList.iterator();
+        resetSubPointers(entry, false);
+    }
+
+    private void resetSubPointers(final QueueEntry entry, final boolean ignoreAvailable)
+    {
+        Iterator<QueueConsumer<?>> consumerIterator = _queueConsumerManager.getAllIterator();
         // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
-        while (subscriberIter.advance() && entry.isAvailable())
+        while (consumerIterator.hasNext() && (ignoreAvailable || entry.isAvailable()))
         {
-            QueueConsumer<?> sub = subscriberIter.getNode().getConsumer();
+            QueueConsumer<?> sub = consumerIterator.next();
 
             // we don't make browsers send the same stuff twice
             if (sub.seesRequeues())
@@ -1488,40 +1329,11 @@ public abstract class AbstractQueue<X ex
                 updateSubRequeueEntry(sub, entry);
             }
         }
-        notifyPullOnlyConsumers();
-        deliverAsync();
-
-    }
-
-    public boolean resend(final QueueEntry entry, final QueueConsumer<?> consumer)
-    {
-        /* TODO : This is wrong as the consumer may be suspended, we should instead change the state of the message
-                  entry to resend and move back the consumer pointer. */
-
-        consumer.getSendLock();
-        try
-        {
-            if (!consumer.isClosed())
-            {
-                deliverMessage(consumer, entry, false, false);
-                return true;
-            }
-            else
-            {
-                return false;
-            }
-        }
-        finally
-        {
-            consumer.releaseSendLock();
-        }
     }
 
-
-
     public int getConsumerCount()
     {
-        return _consumerList.size();
+        return _queueConsumerManager.getAllSize();
     }
 
     public int getConsumerCountWithCredit()
@@ -1620,31 +1432,6 @@ public abstract class AbstractQueue<X ex
 
     }
 
-    public void stateChanged(QueueConsumer<?> sub, State oldState, State newState)
-    {
-        if (oldState == State.ACTIVE && newState != State.ACTIVE)
-        {
-            _activeSubscriberCount.decrementAndGet();
-
-        }
-        else if (newState == State.ACTIVE)
-        {
-            if (oldState != State.ACTIVE)
-            {
-                _activeSubscriberCount.incrementAndGet();
-                if(sub.isPullOnly())
-                {
-                    sub.getSessionModel().getAMQPConnection().notifyWork();
-                }
-
-            }
-            if(!sub.isPullOnly())
-            {
-                deliverAsync();
-            }
-        }
-    }
-
     public int compareTo(final X o)
     {
         return getName().compareTo(o.getName());
@@ -1660,11 +1447,6 @@ public abstract class AbstractQueue<X ex
         _exclusiveSubscriber = exclusiveSubscriber;
     }
 
-    long getStateChangeCount()
-    {
-        return _stateChangeCount.get();
-    }
-
     /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
     abstract QueueEntryList getEntries();
 
@@ -1673,9 +1455,9 @@ public abstract class AbstractQueue<X ex
         return _queueStatistics;
     }
 
-    protected QueueConsumerList getConsumerList()
+    protected final QueueConsumerManagerImpl getQueueConsumerManager()
     {
-        return _consumerList;
+        return _queueConsumerManager;
     }
 
     public EventLogger getEventLogger()
@@ -1898,14 +1680,16 @@ public abstract class AbstractQueue<X ex
                 {
                     try
                     {
-                        final ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
 
-                        while (consumerNodeIterator.advance())
+                        Iterator<QueueConsumer<?>> consumerIterator = _queueConsumerManager.getAllIterator();
+
+                        while (consumerIterator.hasNext())
                         {
-                            final QueueConsumer s = consumerNodeIterator.getNode().getConsumer();
-                            if (s != null)
+                            QueueConsumer<?> consumer = consumerIterator.next();
+
+                            if (consumer != null)
                             {
-                                s.queueDeleted();
+                                consumer.queueDeleted();
                             }
                         }
 
@@ -1971,11 +1755,12 @@ public abstract class AbstractQueue<X ex
     }
 
     @Override
-    protected void onClose()
+    protected ListenableFuture<Void> onClose()
     {
-        super.onClose();
         _stopped.set(true);
         _closing = false;
+        _queueHouseKeepingTask.cancel();
+        return Futures.immediateFuture(null);
     }
 
     @Override
@@ -2038,110 +1823,122 @@ public abstract class AbstractQueue<X ex
         }
     }
 
-    public void deliverAsync()
+    void notifyConsumers(QueueEntry entry)
     {
-        _stateChangeCount.incrementAndGet();
-
-        _queueRunner.execute();
 
-    }
+        Iterator<QueueConsumer<?>> nonAcquiringIterator = _queueConsumerManager.getNonAcquiringIterator();
+        while (nonAcquiringIterator.hasNext())
+        {
+            QueueConsumer<?> consumer = nonAcquiringIterator.next();
+            if(consumer.hasInterest(entry))
+            {
+                notifyConsumer(consumer);
+            }
+        }
 
-    void notifyPullOnlyConsumers()
-    {
-        if(_hasPullOnlyConsumers)
+        final Iterator<QueueConsumer<?>> interestedIterator = _queueConsumerManager.getInterestedIterator();
+        while (entry.isAvailable() && interestedIterator.hasNext())
         {
-            ConsumerNode consumerNode = _consumerList.getHead().findNext();
-            while (consumerNode != null)
+            QueueConsumer<?> consumer = interestedIterator.next();
+            if(consumer.hasInterest(entry))
             {
-                QueueConsumer<?> consumer = consumerNode.getConsumer();
-                if (consumer.isActive() && consumer.isPullOnly() && getNextAvailableEntry(consumer) != null)
+                if(notifyConsumer(consumer))
+                {
+                    break;
+                }
+                else if(!noHigherPriorityWithCredit(consumer, entry))
                 {
-                    consumer.getSessionModel().getAMQPConnection().notifyWork();
+                    // there exists a higher priority consumer that would take this message, therefore no point in
+                    // continuing to loop
+                    break;
                 }
-                consumerNode = consumerNode.findNext();
             }
         }
     }
 
-    void flushConsumer(QueueConsumer<?> sub)
+    void notifyOtherConsumers(final QueueConsumer<?> excludedConsumer)
     {
+        final Iterator<QueueConsumer<?>> interestedIterator = _queueConsumerManager.getInterestedIterator();
+        while (hasAvailableMessages() && interestedIterator.hasNext())
+        {
+            QueueConsumer<?> consumer = interestedIterator.next();
 
-        flushConsumer(sub, Long.MAX_VALUE);
+            if (excludedConsumer != consumer)
+            {
+                if (notifyConsumer(consumer))
+                {
+                    break;
+                }
+            }
+        }
     }
 
-    boolean flushConsumer(QueueConsumer<?> sub, long iterations)
+
+    MessageContainer deliverSingleMessage(QueueConsumer<?> consumer)
     {
-        boolean atTail = false;
-        final boolean keepSendLockHeld = iterations <=  getMaxAsyncDeliveries();
         boolean queueEmpty = false;
-        boolean deliveryAttempted = false;
+        MessageContainer messageContainer = null;
+
+        _queueConsumerManager.setNotified(consumer, false);
 
         try
         {
-            if(keepSendLockHeld)
+            if (!consumer.isSuspended())
             {
-                sub.getSendLock();
-            }
-            while (!sub.isSuspended() && !atTail && iterations != 0)
-            {
-                try
+                messageContainer = attemptDelivery(consumer);
+                if(messageContainer != null)
                 {
-                    if(!keepSendLockHeld)
-                    {
-                        sub.getSendLock();
-                    }
+                    _queueConsumerManager.setNotified(consumer, true);
+                }
 
-                    atTail = attemptDelivery(sub, true);
-                    deliveryAttempted = true;
-                    if (atTail && getNextAvailableEntry(sub) == null)
-                    {
-                        queueEmpty = true;
-                    }
-                    else if (!atTail)
-                    {
-                        iterations--;
-                    }
+                if (messageContainer == null && getNextAvailableEntry(consumer) == null)
+                {
+                    queueEmpty = true;
                 }
-                finally
+
+                if(messageContainer == null && consumer.acquires())
                 {
-                    if(!keepSendLockHeld)
+                    if(hasAvailableMessages())
                     {
-                        sub.releaseSendLock();
+                        notifyOtherConsumers(consumer);
                     }
                 }
             }
-
-            if (!deliveryAttempted )
+            else
             {
                 // avoid referring old deleted queue entry in sub._queueContext._lastSeen
-                getNextAvailableEntry(sub);
+                getNextAvailableEntry(consumer);
             }
         }
         finally
         {
-            if(keepSendLockHeld)
-            {
-                sub.releaseSendLock();
-            }
             if(queueEmpty)
             {
-                sub.queueEmpty();
+                consumer.queueEmpty();
             }
 
-            sub.flushBatched();
-
+            consumer.flushBatched();
         }
 
+        return messageContainer;
+    }
+
+    private boolean hasAvailableMessages()
+    {
+        return _queueStatistics.getAvailableCount() != 0;
+    }
 
-        // if there's (potentially) more than one consumer the others will potentially not have been advanced to the
-        // next entry they are interested in yet.  This would lead to holding on to references to expired messages, etc
-        // which would give us memory "leak".
+    public static class MessageContainer
+    {
+        public final MessageInstance _messageInstance;
+        public final MessageReference<?> _messageReference;
 
-        if (!hasExclusiveConsumer())
+        public MessageContainer(final MessageInstance messageInstance,
+                                final MessageReference<?> messageReference)
         {
-            advanceAllConsumers();
+            _messageInstance = messageInstance;
+            _messageReference = messageReference;
         }
-        return atTail;
     }
 
     /**
@@ -2151,18 +1948,16 @@ public abstract class AbstractQueue<X ex
      *
      *
      * @param sub the consumer
-     * @param batch true if processing can be batched
      * @return true if we have completed all possible deliveries for this sub.
      */
-    private boolean attemptDelivery(QueueConsumer<?> sub, boolean batch)
+    private MessageContainer attemptDelivery(QueueConsumer<?> sub)
     {
-        boolean atTail = false;
-
+        MessageContainer messageContainer = null;
         // avoid referring old deleted queue entry in sub._queueContext._lastSeen
         QueueEntry node  = getNextAvailableEntry(sub);
         boolean subActive = sub.isActive() && !sub.isSuspended();
 
-        if (subActive && (sub.getPriority() == Integer.MAX_VALUE || noHigherPriorityWithCredit(sub)))
+        if (node != null && subActive && (sub.getPriority() == Integer.MAX_VALUE || noHigherPriorityWithCredit(sub, node)))
         {
 
             if (_virtualHost.getState() != State.ACTIVE)
@@ -2171,11 +1966,11 @@ public abstract class AbstractQueue<X ex
                                                            "virtualhost state " + _virtualHost.getState());
             }
 
-            if (node != null && node.isAvailable())
+            if (node.isAvailable())
             {
                 if (sub.hasInterest(node) && mightAssign(sub, node))
                 {
-                    if (!sub.wouldSuspend(node))
+                    if (sub.allocateCredit(node))
                     {
                         MessageReference messageReference = null;
                         try
@@ -2190,7 +1985,8 @@ public abstract class AbstractQueue<X ex
                             }
                             else
                             {
-                                deliverMessage(sub, node, batch, true);
+                                setLastSeenEntry(sub, node);
+                                messageContainer = new MessageContainer(node, messageReference);
                             }
                         }
                         finally
@@ -2201,59 +1997,43 @@ public abstract class AbstractQueue<X ex
                             }
                         }
                     }
-                    else // Not enough Credit for message and wouldSuspend
+                    else
                     {
-                        //QPID-1187 - Treat the consumer as suspended for this message
-                        // and wait for the message to be removed to continue delivery.
-                        subActive = false;
                         sub.awaitCredit(node);
-
                     }
                 }
-
             }
-            atTail = (node == null) || (getNextAvailableEntry(sub) == null);
         }
-        return atTail || !subActive;
+        return messageContainer;
     }
 
-    private boolean noHigherPriorityWithCredit(final QueueConsumer<?> sub)
+    boolean noHigherPriorityWithCredit(final QueueConsumer<?> sub, final QueueEntry queueEntry)
     {
-        ConsumerNodeIterator iterator = _consumerList.iterator();
-        while(iterator.advance())
+        Iterator<QueueConsumer<?>> consumerIterator = _queueConsumerManager.getAllIterator();
+
+        while (consumerIterator.hasNext())
         {
-            final ConsumerNode node = iterator.getNode();
-            QueueConsumer consumer = node.getConsumer();
+            QueueConsumer<?> consumer = consumerIterator.next();
             if(consumer.getPriority() > sub.getPriority())
             {
-                if(getNextAvailableEntry(consumer) != null && consumer.hasCredit())
+                if(consumer.isNotifyWorkDesired()
+                   && consumer.acquires()
+                   && consumer.hasInterest(queueEntry)
+                   && getNextAvailableEntry(consumer) != null)
                 {
                     return false;
                 }
             }
-        }
-        return true;
-    }
-
-    protected void advanceAllConsumers()
-    {
-        ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
-        while (consumerNodeIterator.advance())
-        {
-            ConsumerNode subNode = consumerNodeIterator.getNode();
-            QueueConsumer sub = subNode.getConsumer();
-            if(sub.acquires())
-            {
-                getNextAvailableEntry(sub);
-            }
             else
             {
-                // TODO
+                break;
             }
         }
+        return true;
     }
 
-    private QueueEntry getNextAvailableEntry(final QueueConsumer sub)
+
+    QueueEntry getNextAvailableEntry(final QueueConsumer sub)
     {
         QueueContext context = sub.getQueueContext();
         if(context != null)
@@ -2310,156 +2090,6 @@ public abstract class AbstractQueue<X ex
     }
 
 
-    boolean hasAvailableMessages(final QueueConsumer queueConsumer)
-    {
-        return getNextAvailableEntry(queueConsumer) != null;
-    }
-
-    /**
-     * Used by queue Runners to asynchronously deliver messages to consumers.
-     *
-     * A queue Runner is started whenever a state change occurs, e.g when a new
-     * message arrives on the queue and cannot be immediately delivered to a
-     * consumer (i.e. asynchronous delivery is required).
-     *
-     * processQueue should be running while there are messages on the queue AND
-     * there are consumers that can deliver them. If there are no
-     * consumers capable of delivering the remaining messages on the queue
-     * then processQueue should stop to prevent spinning.
-     *
-     * Since processQueue is runs in a fixed size Executor, it should not run
-     * indefinitely to prevent starving other tasks of CPU (e.g jobs to process
-     * incoming messages may not be able to be scheduled in the thread pool
-     * because all threads are working on clearing down large queues). To solve
-     * this problem, after an arbitrary number of message deliveries the
-     * processQueue job stops iterating, resubmits itself to the executor, and
-     * ends the current instance
-     *
-     * @param runner the Runner to schedule
-     */
-    public long processQueue(QueueRunner runner)
-    {
-        long stateChangeCount;
-        long previousStateChangeCount = Long.MIN_VALUE;
-        long rVal = Long.MIN_VALUE;
-        boolean deliveryIncomplete = true;
-
-        boolean lastLoop = false;
-        int iterations = getMaxAsyncDeliveries();
-
-        final int numSubs = _consumerList.size();
-
-        final int perSub = Math.max(iterations / Math.max(numSubs,1), 1);
-
-        // For every message enqueue/requeue the we fire deliveryAsync() which
-        // increases _stateChangeCount. If _sCC changes whilst we are in our loop
-        // (detected by setting previousStateChangeCount to stateChangeCount in the loop body)
-        // then we will continue to run for a maximum of iterations.
-        // So whilst delivery/rejection is going on a processQueue thread will be running
-        while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete))
-        {
-            // we want to have one extra loop after every consumer has reached the point where it cannot move
-            // further, just in case the advance of one consumer in the last loop allows a different consumer to
-            // move forward in the next iteration
-
-            if (previousStateChangeCount != stateChangeCount)
-            {
-                //further asynchronous delivery is required since the
-                //previous loop. keep going if iteration slicing allows.
-                lastLoop = false;
-                rVal = stateChangeCount;
-            }
-
-            previousStateChangeCount = stateChangeCount;
-            boolean allConsumersDone = true;
-            boolean consumerDone;
-
-            ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
-            //iterate over the subscribers and try to advance their pointer
-            while (consumerNodeIterator.advance())
-            {
-
-                QueueConsumer<?> sub = consumerNodeIterator.getNode().getConsumer();
-
-                if(!sub.isPullOnly())
-                {
-                    sub.getSendLock();
-
-                    try
-                    {
-                        for (int i = 0; i < perSub; i++)
-                        {
-                            //attempt delivery. returns true if no further delivery currently possible to this sub
-                            consumerDone = attemptDelivery(sub, true);
-                            if (consumerDone)
-                            {
-                                sub.flushBatched();
-                                boolean noMore = getNextAvailableEntry(sub) == null;
-                                if (lastLoop && noMore)
-                                {
-                                    sub.queueEmpty();
-                                }
-                                break;
-                            }
-                            else
-                            {
-                                //this consumer can accept additional deliveries, so we must
-                                //keep going after this (if iteration slicing allows it)
-                                allConsumersDone = false;
-                                lastLoop = false;
-                                if (--iterations == 0)
-                                {
-                                    sub.flushBatched();
-                                    break;
-                                }
-                            }
-                        }
-
-                        sub.flushBatched();
-                    }
-                    finally
-                    {
-                        sub.releaseSendLock();
-                    }
-                }
-            }
-
-            if(allConsumersDone && lastLoop)
-            {
-                //We have done an extra loop already and there are again
-                //again no further delivery attempts possible, only
-                //keep going if state change demands it.
-                deliveryIncomplete = false;
-            }
-            else if(allConsumersDone)
-            {
-                //All consumers reported being done, but we have to do
-                //an extra loop if the iterations are not exhausted and
-                //there is still any work to be done
-                deliveryIncomplete = _consumerList.size() != 0;
-                lastLoop = true;
-            }
-            else
-            {
-                //some consumers can still accept more messages,
-                //keep going if iteration count allows.
-                lastLoop = false;
-                deliveryIncomplete = true;
-            }
-
-        }
-
-        // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit
-        // therefore we should schedule this runner again (unless someone beats us to it :-) ).
-        if (iterations == 0)
-        {
-            _logger.debug("Rescheduling runner: {}", runner);
-            return 0L;
-        }
-        return rVal;
-
-    }
-
     public void checkMessageStatus()
     {
         QueueEntryIterator queueListIterator = getEntries().iterator();
@@ -3067,7 +2697,12 @@ public abstract class AbstractQueue<X ex
         switch (getConsumerCount())
         {
             case 1:
-                _exclusiveSubscriber = getConsumerList().getHead().getConsumer();
+                Iterator<QueueConsumer<?>> consumerIterator = _queueConsumerManager.getAllIterator();
+
+                if (consumerIterator.hasNext())
+                {
+                    _exclusiveSubscriber = consumerIterator.next();
+                }
                 // deliberate fall through
             case 0:
                 _exclusiveOwner = null;
@@ -3088,8 +2723,11 @@ public abstract class AbstractQueue<X ex
             case CONTAINER:
             case CONNECTION:
                 AMQSessionModel session = null;
-                for(ConsumerImpl c : getConsumers())
+                Iterator<QueueConsumer<?>> queueConsumerIterator = _queueConsumerManager.getAllIterator();
+                while(queueConsumerIterator.hasNext())
                 {
+                    QueueConsumer<?> c = queueConsumerIterator.next();
+
                     if(session == null)
                     {
                         session = c.getSessionModel();
@@ -3114,8 +2752,10 @@ public abstract class AbstractQueue<X ex
             case CONTAINER:
             case PRINCIPAL:
                 AMQPConnection con = null;
-                for(ConsumerImpl c : getConsumers())
+                Iterator<QueueConsumer<?>> queueConsumerIterator = _queueConsumerManager.getAllIterator();
+                while(queueConsumerIterator.hasNext())
                 {
+                    QueueConsumer<?> c = queueConsumerIterator.next();
                     if(con == null)
                     {
                         con = c.getSessionModel().getAMQPConnection();
@@ -3142,8 +2782,10 @@ public abstract class AbstractQueue<X ex
             case NONE:
             case PRINCIPAL:
                 String containerID = null;
-                for(ConsumerImpl c : getConsumers())
+                Iterator<QueueConsumer<?>> queueConsumerIterator = _queueConsumerManager.getAllIterator();
+                while(queueConsumerIterator.hasNext())
                 {
+                    QueueConsumer<?> c = queueConsumerIterator.next();
                     if(containerID == null)
                     {
                         containerID = c.getSessionModel().getAMQPConnection().getRemoteContainerName();
@@ -3173,8 +2815,10 @@ public abstract class AbstractQueue<X ex
             case NONE:
             case CONTAINER:
                 Principal principal = null;
-                for(ConsumerImpl c : getConsumers())
+                Iterator<QueueConsumer<?>> queueConsumerIterator = _queueConsumerManager.getAllIterator();
+                while(queueConsumerIterator.hasNext())
                 {
+                    QueueConsumer<?> c = queueConsumerIterator.next();
                     if(principal == null)
                     {
                         principal = c.getSessionModel().getAMQPConnection().getAuthorizedPrincipal();
@@ -3232,6 +2876,7 @@ public abstract class AbstractQueue<X ex
     @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE)
     private ListenableFuture<Void> activate()
     {
+        _virtualHost.scheduleHouseKeepingTask(_virtualHost.getHousekeepingCheckPeriod(), _queueHouseKeepingTask);
         setState(State.ACTIVE);
         return Futures.immediateFuture(null);
     }
@@ -3311,7 +2956,9 @@ public abstract class AbstractQueue<X ex
         }
         else if(clazz == org.apache.qpid.server.model.Consumer.class)
         {
-            return (Collection<C>) getConsumers();
+            return _queueConsumerManager == null
+                    ? Collections.<C>emptySet()
+                    : (Collection<C>) Lists.newArrayList(_queueConsumerManager.getAllIterator());
         }
         else return Collections.emptySet();
     }
@@ -3368,13 +3015,6 @@ public abstract class AbstractQueue<X ex
 
     }
 
-    int getMaxAsyncDeliveries()
-    {
-        return _maxAsyncDeliveries;
-    }
-
-
-
     private static final String[] NON_NEGATIVE_NUMBERS = {
         ALERT_REPEAT_GAP,
         ALERT_THRESHOLD_MESSAGE_AGE,
@@ -3763,4 +3403,33 @@ public abstract class AbstractQueue<X ex
             }
         }
     }
+
+    private class AdvanceConsumersTask extends HouseKeepingTask
+    {
+
+        AdvanceConsumersTask()
+        {
+            super("Queue Housekeeping: " + AbstractQueue.this.getName(),
+                  _virtualHost, getSystemTaskControllerContext("Queue Housekeeping", _virtualHost.getPrincipal()));
+        }
+
+        @Override
+        public void execute()
+        {
+            // if there's (potentially) more than one consumer the others will potentially not have been advanced to the
+            // next entry they are interested in yet.  This would lead to holding on to references to expired messages, etc
+            // which would give us memory "leak".
+
+            Iterator<QueueConsumer<?>> consumerIterator = _queueConsumerManager.getAllIterator();
+
+            while (consumerIterator.hasNext() && !isDeleted())
+            {
+                QueueConsumer<?> sub = consumerIterator.next();
+                if(sub.acquires())
+                {
+                    getNextAvailableEntry(sub);
+                }
+            }
+        }
+    }
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java Tue Nov 15 14:16:10 2016
@@ -20,6 +20,8 @@
 */
 package org.apache.qpid.server.queue;
 
+import static org.apache.qpid.server.model.Queue.QUEUE_SCAVANGE_COUNT;
+
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -47,7 +49,7 @@ public abstract class OrderedQueueEntryL
                 _nextUpdater = OrderedQueueEntry._nextUpdater;
 
     private AtomicLong _scavenges = new AtomicLong(0L);
-    private final long _scavengeCount = Integer.getInteger("qpid.queue.scavenge_count", 50);
+    private final long _scavengeCount;
     private final AtomicReference<QueueEntry> _unscavengedHWM = new AtomicReference<QueueEntry>();
 
 
@@ -57,6 +59,7 @@ public abstract class OrderedQueueEntryL
     {
         super(queue, queueStatistics);
         _queue = queue;
+        _scavengeCount = _queue.getContextValue(Integer.class, QUEUE_SCAVANGE_COUNT);
         _head = headCreator.createHead(this);
         _tail = _head;
     }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java Tue Nov 15 14:16:10 2016
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.queue;
 
+import java.util.Iterator;
 import java.util.Map;
 
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
@@ -36,10 +37,12 @@ public abstract class OutOfOrderQueue<X
     protected void checkConsumersNotAheadOfDelivery(final QueueEntry entry)
     {
         // check that all consumers are not in advance of the entry
-        ConsumerNodeIterator subIter = getConsumerList().iterator();
-        while(subIter.advance() && !entry.isAcquired())
+        Iterator<QueueConsumer<?>> consumerIterator = getQueueConsumerManager().getAllIterator();
+
+        while (consumerIterator.hasNext() && !entry.isAcquired())
         {
-            final QueueConsumer<?> consumer = subIter.getNode().getConsumer();
+            QueueConsumer<?> consumer = consumerIterator.next();
+
             if(!consumer.isClosed())
             {
                 QueueContext context = consumer.getQueueContext();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java Tue Nov 15 14:16:10 2016
@@ -32,27 +32,27 @@ public interface QueueConsumer<X extends
 
     boolean hasInterest(QueueEntry node);
 
-    boolean wouldSuspend(QueueEntry entry);
+    boolean allocateCredit(QueueEntry entry);
 
     void restoreCredit(QueueEntry entry);
 
-    void send(QueueEntry entry, boolean batch);
-
     void acquisitionRemoved(QueueEntry node);
 
+    QueueConsumerNode getQueueConsumerNode();
+
     void queueDeleted();
 
     Queue<?> getQueue();
 
-    boolean resend(QueueEntry e);
-
     MessageInstance.StealableConsumerAcquiredState<X> getOwningState();
 
     QueueContext getQueueContext();
 
     void awaitCredit(QueueEntry entry);
 
-    boolean hasCredit();
+    boolean isNotifyWorkDesired();
+
+    void notifyWork();
 
-    boolean isPullOnly();
+    void setQueueConsumerNode(QueueConsumerNode node);
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Tue Nov 15 14:16:10 2016
@@ -23,7 +23,6 @@ package org.apache.qpid.server.queue;
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT;
 
 import java.text.MessageFormat;
-import java.util.EnumMap;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -33,6 +32,8 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,15 +52,12 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
-import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.ManagedAttributeField;
 import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
 import org.apache.qpid.server.security.access.Operation;
-import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.util.StateChangeListener;
 
 class QueueConsumerImpl
@@ -67,7 +65,6 @@ class QueueConsumerImpl
         implements QueueConsumer<QueueConsumerImpl>, LogSubject
 {
     private final static Logger LOGGER = LoggerFactory.getLogger(QueueConsumerImpl.class);
-    private final AtomicBoolean _targetClosed = new AtomicBoolean(false);
     private final AtomicBoolean _closed = new AtomicBoolean(false);
     private final long _consumerNumber;
     private final long _createTime = System.currentTimeMillis();
@@ -84,28 +81,9 @@ class QueueConsumerImpl
     private final Object _sessionReference;
     private final AbstractQueue _queue;
 
-    private final SuspendedConsumerLoggingTicker _suspendedConsumerLoggingTicker;
-
-    static final EnumMap<ConsumerTarget.State, State> STATE_MAP =
-            new EnumMap<ConsumerTarget.State, State>(ConsumerTarget.State.class);
-
-    static
-    {
-        STATE_MAP.put(ConsumerTarget.State.ACTIVE, State.ACTIVE);
-        STATE_MAP.put(ConsumerTarget.State.SUSPENDED, State.QUIESCED);
-        STATE_MAP.put(ConsumerTarget.State.CLOSED, State.DELETED);
-    }
-
     private final ConsumerTarget _target;
-    private final StateChangeListener<ConsumerTarget, ConsumerTarget.State> _listener;
     private volatile QueueContext _queueContext;
-    private volatile StateChangeListener<? super QueueConsumerImpl, State> _stateListener = new StateChangeListener<QueueConsumerImpl, State>()
-    {
-        public void stateChanged(QueueConsumerImpl sub, State oldState, State newState)
-        {
-            // no-op
-        }
-    };
+
     @ManagedAttributeField
     private boolean _exclusive;
     @ManagedAttributeField
@@ -119,6 +97,8 @@ class QueueConsumerImpl
     @ManagedAttributeField
     private int _priority;
 
+    private QueueConsumerNode _queueConsumerNode;
+
     QueueConsumerImpl(final AbstractQueue<?> queue,
                       ConsumerTarget target,
                       final String consumerName,
@@ -145,29 +125,6 @@ class QueueConsumerImpl
         open();
 
         setupLogging();
-
-        _listener = new StateChangeListener<ConsumerTarget, ConsumerTarget.State>()
-        {
-            @Override
-            public void stateChanged(final ConsumerTarget object,
-                                     final ConsumerTarget.State oldState,
-                                     final ConsumerTarget.State newState)
-            {
-                targetStateChanged(oldState, newState);
-            }
-        };
-        _target.addStateListener(_listener);
-
-        _suspendedConsumerLoggingTicker = target.isMultiQueue()
-                ? null
-                : new SuspendedConsumerLoggingTicker(queue.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD))
-                    {
-                        @Override
-                        protected void log(final long period)
-                        {
-                            getEventLogger().message(getLogSubject(), SubscriptionMessages.STATE(period));
-                        }
-                    };
     }
 
     private static Map<String, Object> createAttributeMap(String name,
@@ -204,43 +161,6 @@ class QueueConsumerImpl
         return attributes;
     }
 
-    private void targetStateChanged(final ConsumerTarget.State oldState, final ConsumerTarget.State newState)
-    {
-        if(oldState != newState)
-        {
-            if(newState == ConsumerTarget.State.CLOSED)
-            {
-                if(_targetClosed.compareAndSet(false,true))
-                {
-                    getEventLogger().message(getLogSubject(), SubscriptionMessages.CLOSE());
-                }
-            }
-
-            if(_suspendedConsumerLoggingTicker != null)
-            {
-                if (newState == ConsumerTarget.State.SUSPENDED)
-                {
-                    _suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
-                    getSessionModel().addTicker(_suspendedConsumerLoggingTicker);
-                }
-                else
-                {
-                    getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
-                }
-            }
-        }
-
-        if(newState == ConsumerTarget.State.CLOSED && oldState != newState && !_closed.get())
-        {
-            closeAsync();
-        }
-        final StateChangeListener<? super QueueConsumerImpl, State> stateListener = getStateListener();
-        if(stateListener != null)
-        {
-            stateListener.stateChanged(this, STATE_MAP.get(oldState), STATE_MAP.get(newState));
-        }
-    }
-
     @Override
     public ConsumerTarget getTarget()
     {
@@ -254,28 +174,15 @@ class QueueConsumerImpl
     }
 
     @Override
-    public boolean hasCredit()
+    public boolean isNotifyWorkDesired()
     {
-        return _target.hasCredit();
+        return _target.isNotifyWorkDesired();
     }
 
     @Override
     public void externalStateChange()
     {
-        if(isPullOnly())
-        {
-            getSessionModel().getAMQPConnection().notifyWork();
-        }
-        else
-        {
-            _queue.deliverAsync();
-        }
-    }
-
-    @Override
-    public boolean hasAvailableMessages()
-    {
-        return !_queue.isEmpty() && _queue.hasAvailableMessages(this);
+        _target.notifyWork();
     }
 
     @Override
@@ -309,28 +216,29 @@ class QueueConsumerImpl
     }
 
     @Override
-    protected void onClose()
+    protected ListenableFuture<Void> onClose()
     {
         if(_closed.compareAndSet(false,true))
         {
-            _target.getSendLock();
-            try
-            {
-                _waitingOnCreditMessageListener.remove();
-                _target.consumerRemoved(this);
-                _target.removeStateChangeListener(_listener);
-                _queue.unregisterConsumer(this);
-                if(_suspendedConsumerLoggingTicker != null)
-                {
-                    getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
-                }
-                deleted();
-            }
-            finally
-            {
-                _target.releaseSendLock();
-            }
+            getEventLogger().message(getLogSubject(), SubscriptionMessages.CLOSE());
+
+            _waitingOnCreditMessageListener.remove();
 
+            return doAfter(_target.consumerRemoved(this),
+                           new Runnable()
+                           {
+                               @Override
+                               public void run()
+                               {
+                                   _queue.unregisterConsumer(QueueConsumerImpl.this);
+
+                                   deleted();
+                               }
+                           });
+        }
+        else
+        {
+            return Futures.immediateFuture(null);
         }
     }
 
@@ -346,19 +254,31 @@ class QueueConsumerImpl
     }
 
     @Override
-    public boolean isPullOnly()
+    public void notifyWork()
+    {
+        _target.notifyWork();
+    }
+
+    @Override
+    public void setQueueConsumerNode(final QueueConsumerNode node)
+    {
+        _queueConsumerNode = node;
+    }
+
+    @Override
+    public QueueConsumerNode getQueueConsumerNode()
     {
-        return _target.isPullOnly();
+        return _queueConsumerNode;
     }
 
     public void queueDeleted()
     {
-        _target.queueDeleted();
+        _target.consumerRemoved(this);
     }
 
-    public boolean wouldSuspend(final QueueEntry msg)
+    public boolean allocateCredit(final QueueEntry msg)
     {
-        return !_target.allocateCredit(msg.getMessage());
+        return _target.allocateCredit(msg.getMessage());
     }
 
     public void restoreCredit(final QueueEntry queueEntry)
@@ -372,12 +292,6 @@ class QueueConsumerImpl
     }
 
     @Override
-    public State getState()
-    {
-        return STATE_MAP.get(_target.getState());
-    }
-
-    @Override
     public final Queue<?> getQueue()
     {
         return _queue;
@@ -397,46 +311,21 @@ class QueueConsumerImpl
     }
 
     @Override
-    public final void flush()
+    public AbstractQueue.MessageContainer pullMessage()
     {
-        AMQPConnection<?> connection = _target.getSessionModel().getAMQPConnection();
-        try
+        AbstractQueue.MessageContainer messageContainer = _queue.deliverSingleMessage(this);
+        if (messageContainer != null)
         {
-            connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(true);
-            _queue.flushConsumer(this);
-            _target.processPending();
+            _deliveredCount.incrementAndGet();
+            _deliveredBytes.addAndGet(messageContainer._messageInstance.getMessage().getSize());
         }
-        finally
-        {
-            connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(false);
-        }
-
+        return messageContainer;
     }
 
     @Override
-    public void pullMessage()
+    public void setNotifyWorkDesired(final boolean desired)
     {
-        AMQPConnection<?> connection = _target.getSessionModel().getAMQPConnection();
-        try
-        {
-            connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(true);
-            _queue.flushConsumer(this, 1);
-        }
-        finally
-        {
-            connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(false);
-        }
-
-    }
-
-    public boolean resend(final QueueEntry entry)
-    {
-        boolean messageWasResent = getQueue().resend(entry, this);
-        if (messageWasResent)
-        {
-            _target.processPending();
-        }
-        return messageWasResent;
+        _queue.setNotifyWorkDesired(this, desired);
     }
 
     public final long getConsumerNumber()
@@ -444,16 +333,6 @@ class QueueConsumerImpl
         return _consumerNumber;
     }
 
-    public final StateChangeListener<? super QueueConsumerImpl, State> getStateListener()
-    {
-        return _stateListener;
-    }
-
-    public final void setStateListener(StateChangeListener<? super QueueConsumerImpl, State> listener)
-    {
-        _stateListener = listener;
-    }
-
     public final QueueContext getQueueContext()
     {
         return _queueContext;
@@ -466,12 +345,12 @@ class QueueConsumerImpl
 
     public final boolean isActive()
     {
-        return getState() == State.ACTIVE;
+        return _target.getState() == ConsumerTarget.State.OPEN;
     }
 
     public final boolean isClosed()
     {
-        return getState() == State.DELETED;
+        return _target.getState() == ConsumerTarget.State.CLOSED;
     }
 
     public final boolean hasInterest(QueueEntry entry)
@@ -563,21 +442,6 @@ class QueueConsumerImpl
         return filterLogString.toString();
     }
 
-    public final boolean trySendLock()
-    {
-        return getTarget().trySendLock();
-    }
-
-    public final void getSendLock()
-    {
-        getTarget().getSendLock();
-    }
-
-    public final void releaseSendLock()
-    {
-        getTarget().releaseSendLock();
-    }
-
     public final long getCreateTime()
     {
         return _createTime;
@@ -613,13 +477,6 @@ class QueueConsumerImpl
         return _deliveredCount.longValue();
     }
 
-    public final void send(final QueueEntry entry, final boolean batch)
-    {
-        _deliveredCount.incrementAndGet();
-        long size = _target.send(this, entry, batch);
-        _deliveredBytes.addAndGet(size);
-    }
-
     @Override
     public void acquisitionRemoved(final QueueEntry node)
     {
@@ -704,14 +561,7 @@ class QueueConsumerImpl
             entry.addStateChangeListener(this);
             if(!entry.isAvailable())
             {
-                if(isPullOnly())
-                {
-                    getSessionModel().getAMQPConnection().notifyWork();
-                }
-                else
-                {
-                    _queue.deliverAsync();
-                }
+                _target.notifyWork();
                 remove();
             }
         }
@@ -731,14 +581,7 @@ class QueueConsumerImpl
         {
             entry.removeStateChangeListener(this);
             _entry.compareAndSet(entry, null);
-            if(isPullOnly())
-            {
-                getSessionModel().getAMQPConnection().notifyWork();
-            }
-            else
-            {
-                _queue.deliverAsync();
-            }
+            _target.notifyWork();
         }
 
     }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue Nov 15 14:16:10 2016
@@ -709,17 +709,6 @@ public abstract class QueueEntryImpl imp
     }
 
     @Override
-    public boolean resend()
-    {
-        QueueConsumer sub = getDeliveredConsumer();
-        if(sub != null)
-        {
-            return sub.resend(this);
-        }
-        return false;
-    }
-
-    @Override
     public TransactionLogResource getOwningResource()
     {
         return getQueue();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/AbstractKeyStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/AbstractKeyStore.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/AbstractKeyStore.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/AbstractKeyStore.java Tue Nov 15 14:16:10 2016
@@ -79,14 +79,14 @@ public abstract class AbstractKeyStore<X
     }
 
     @Override
-    protected void onClose()
+    protected ListenableFuture<Void> onClose()
     {
-        super.onClose();
         if(_checkExpiryTaskFuture != null)
         {
             _checkExpiryTaskFuture.cancel(false);
             _checkExpiryTaskFuture = null;
         }
+        return Futures.immediateFuture(null);
     }
 
     protected void initializeExpiryChecking()

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java Tue Nov 15 14:16:10 2016
@@ -97,7 +97,7 @@ public class TrustStoreMessageSource ext
                                 final String consumerName,
                                 final EnumSet<ConsumerImpl.Option> options, final Integer priority)
             throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
-                   ConsumerAccessRefused
+                   ConsumerAccessRefused, QueueDeleted
     {
         final Consumer consumer = super.addConsumer(target, filters, messageClass, consumerName, options, priority);
         consumer.send(createMessage());

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java Tue Nov 15 14:16:10 2016
@@ -27,6 +27,8 @@ import java.util.Collection;
 
 import javax.security.auth.Subject;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.logging.EventLoggerProvider;
 import org.apache.qpid.server.model.Connection;
@@ -35,9 +37,6 @@ import org.apache.qpid.server.util.Delet
 
 public interface AMQPConnection<C extends AMQPConnection<C>> extends Connection<C>, Deletable<C>, EventLoggerProvider
 {
-    boolean isMessageAssignmentSuspended();
-
-    void alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(boolean override);
 
     AccessControlContext getAccessControlContextFromSubject(Subject subject);
 
@@ -75,9 +74,8 @@ public interface AMQPConnection<C extend
 
     void sendConnectionCloseAsync(AMQConstant connectionForced, String reason);
 
-    void reserveOutboundMessageSpace(long size);
-
     boolean isIOThread();
+    ListenableFuture<Void> doOnIOThreadAsync(final Runnable task);
 
     void checkAuthorizedMessagePrincipal(String messageUserId);
 
@@ -91,4 +89,8 @@ public interface AMQPConnection<C extend
     Collection<? extends AMQSessionModel<?>> getSessionModels();
 
     void resetStatistics();
+
+    void notifyWork(AMQSessionModel<?> sessionModel);
+
+    boolean isTransportBlockedForWriting();
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Tue Nov 15 14:16:10 2016
@@ -35,11 +35,11 @@ import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 
 import javax.security.auth.Subject;
 import javax.security.auth.SubjectDomainCombiner;
 
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import org.slf4j.Logger;
@@ -76,7 +76,7 @@ import org.apache.qpid.server.util.Fixed
 import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.transport.network.Ticker;
 
-public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C>>
+public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,T>, T>
         extends AbstractConfiguredObject<C>
         implements ProtocolEngine, AMQPConnection<C>, EventLoggerProvider
 
@@ -96,8 +96,6 @@ public abstract class AbstractAMQPConnec
             new CopyOnWriteArrayList<>();
 
     private final LogSubject _logSubject;
-    private final AtomicReference<Thread> _messageAssignmentAllowedThread = new AtomicReference<>();
-    private final AtomicBoolean _messageAssignmentSuspended = new AtomicBoolean();
     private volatile ContextProvider _contextProvider;
     private volatile EventLoggerProvider _eventLoggerProvider;
     private String _clientProduct;
@@ -489,60 +487,52 @@ public abstract class AbstractAMQPConnec
     }
 
     @Override
-    public boolean isMessageAssignmentSuspended()
+    public void setIOThread(final Thread ioThread)
     {
-        Thread currentThread = Thread.currentThread();
-        if (_messageAssignmentAllowedThread.get() == currentThread && currentThread == _ioThread)
-        {
-            return false;
-        }
-        return _messageAssignmentSuspended.get();
+        _ioThread = ioThread;
     }
 
     @Override
-    public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended, final boolean notifyConsumers)
+    public boolean isIOThread()
     {
-        _messageAssignmentSuspended.set(messageAssignmentSuspended);
-        if(notifyConsumers)
-        {
-            for (AMQSessionModel<?> session : getSessionModels())
-            {
-                if (messageAssignmentSuspended)
-                {
-                    session.ensureConsumersNoticedStateChange();
-                }
-                else
-                {
-                    session.notifyConsumerTargetCurrentStates();
-                }
-            }
-        }
+        return Thread.currentThread() == _ioThread;
     }
 
     @Override
-    public void alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(boolean allowed)
+    public ListenableFuture<Void> doOnIOThreadAsync(final Runnable task)
     {
-        if (allowed)
+        if (isIOThread())
         {
-            _messageAssignmentAllowedThread.set(Thread.currentThread());
+            task.run();
+            return Futures.immediateFuture(null);
         }
         else
         {
-            _messageAssignmentAllowedThread.set(null);
+            final SettableFuture<Void> future = SettableFuture.create();
+
+            addAsyncTask(
+                    new Action<Object>()
+                    {
+                        @Override
+                        public void performAction(final Object object)
+                        {
+                            try
+                            {
+                                task.run();
+                                future.set(null);
+                            }
+                            catch (RuntimeException e)
+                            {
+                                future.setException(e);
+                            }
+                        }
+                    });
+            return future;
         }
     }
 
-    @Override
-    public void setIOThread(final Thread ioThread)
-    {
-        _ioThread = ioThread;
-    }
+    protected abstract void addAsyncTask(final Action<? super T> action);
 
-    @Override
-    public boolean isIOThread()
-    {
-        return Thread.currentThread() == _ioThread;
-    }
 
     protected <T> T runAsSubject(PrivilegedAction<T> action)
     {
@@ -765,12 +755,6 @@ public abstract class AbstractAMQPConnec
         return getSessionModels().size();
     }
 
-    @Override
-    public void reserveOutboundMessageSpace(final long size)
-    {
-        _network.reserveOutboundMessageSpace(size);
-    }
-
     protected void markTransportClosed()
     {
         _transportClosedFuture.set(null);

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java Tue Nov 15 14:16:10 2016
@@ -91,18 +91,6 @@ public class MultiVersionProtocolEngine
         _onCloseTask = onCloseTask;
     }
 
-    @Override
-    public void setMessageAssignmentSuspended(final boolean value, final boolean notifyConsumers)
-    {
-        _delegate.setMessageAssignmentSuspended(value, notifyConsumers);
-    }
-
-    @Override
-    public boolean isMessageAssignmentSuspended()
-    {
-        return _delegate.isMessageAssignmentSuspended();
-    }
-
     public void closed()
     {
         _logger.debug("Closed");
@@ -244,18 +232,6 @@ public class MultiVersionProtocolEngine
     {
 
         @Override
-        public void setMessageAssignmentSuspended(final boolean value, final boolean notifyConsumers)
-        {
-
-        }
-
-        @Override
-        public boolean isMessageAssignmentSuspended()
-        {
-            return false;
-        }
-
-        @Override
         public Iterator<Runnable> processPendingIterator()
         {
             return Collections.emptyIterator();
@@ -366,17 +342,6 @@ public class MultiVersionProtocolEngine
         private final AtomicBoolean _hasWork = new AtomicBoolean();
 
         @Override
-        public void setMessageAssignmentSuspended(final boolean value, final boolean notifyConsumers)
-        {
-        }
-
-        @Override
-        public boolean isMessageAssignmentSuspended()
-        {
-            return false;
-        }
-
-        @Override
         public Iterator<Runnable> processPendingIterator()
         {
             return Collections.emptyIterator();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Tue Nov 15 14:16:10 2016
@@ -61,8 +61,6 @@ public class NonBlockingConnection imple
     private final AtomicBoolean _closed = new AtomicBoolean(false);
     private final ProtocolEngine _protocolEngine;
     private final Runnable _onTransportEncryptionAction;
-    private final AtomicLong _usedOutboundMessageSpace = new AtomicLong();
-    private final long _outboundMessageBufferLimit;
 
     private volatile boolean _fullyWritten = true;
 
@@ -97,9 +95,6 @@ public class NonBlockingConnection imple
         _port = port;
         _threadName = SelectorThread.IO_THREAD_NAME_PREFIX + _remoteSocketAddress.toString();
 
-        _outboundMessageBufferLimit = (long) _port.getContextValue(Long.class,
-                                                                   AmqpPort.PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE);
-
         protocolEngine.setWorkListener(new Action<ProtocolEngine>()
         {
             @Override
@@ -214,15 +209,6 @@ public class NonBlockingConnection imple
     }
 
     @Override
-    public void reserveOutboundMessageSpace(long size)
-    {
-        if (_usedOutboundMessageSpace.addAndGet(size) > _outboundMessageBufferLimit)
-        {
-            _protocolEngine.setMessageAssignmentSuspended(true, false);
-        }
-    }
-
-    @Override
     public String getTransportInfo()
     {
         return _delegate.getTransportInfo();
@@ -274,7 +260,6 @@ public class NonBlockingConnection imple
                 }
 
                 _protocolEngine.setIOThread(Thread.currentThread());
-                _protocolEngine.setMessageAssignmentSuspended(true, true);
 
                 boolean processPendingComplete = processPending();
 
@@ -290,10 +275,6 @@ public class NonBlockingConnection imple
                         _protocolEngine.notifyWork();
                     }
 
-                    if (_fullyWritten)
-                    {
-                        _protocolEngine.setMessageAssignmentSuspended(false, true);
-                    }
                 }
                 else
                 {
@@ -545,12 +526,7 @@ public class NonBlockingConnection imple
             _buffers.poll();
             buf.dispose();
         }
-        if (_fullyWritten)
-        {
-            _usedOutboundMessageSpace.set(0);
-        }
         return _fullyWritten;
-
     }
 
     protected int readFromNetwork() throws IOException

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java Tue Nov 15 14:16:10 2016
@@ -53,10 +53,6 @@ public interface ProtocolEngine extends
 
     void setTransportBlockedForWriting(boolean blocked);
 
-    void setMessageAssignmentSuspended(boolean value, final boolean notifyConsumers);
-
-    boolean isMessageAssignmentSuspended();
-
     Iterator<Runnable> processPendingIterator();
 
     boolean hasWork();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java Tue Nov 15 14:16:10 2016
@@ -23,8 +23,6 @@ import org.apache.qpid.transport.network
 
 public interface ServerNetworkConnection extends NetworkConnection
 {
-    void reserveOutboundMessageSpace(long size);
-
     String getTransportInfo();
 
     long getScheduledTime();



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org