You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/15 14:16:10 UTC

svn commit: r1769837 [1/4] - in /qpid/java/trunk: ./ bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/ broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/flow/ broker-core...

Author: rgodfrey
Date: Tue Nov 15 14:16:10 2016
New Revision: 1769837

URL: http://svn.apache.org/viewvc?rev=1769837&view=rev
Log:
QPID-7514 : remove QueueRunner and move processing into the IO thread

Added:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
      - copied unchanged from r1769836, qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
      - copied unchanged from r1769836, qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
      - copied unchanged from r1769836, qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeIterator.java
      - copied unchanged from r1769836, qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeIterator.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeList.java
      - copied unchanged from r1769836, qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeList.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeListEntry.java
      - copied unchanged from r1769836, qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeListEntry.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
      - copied unchanged from r1769836, qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/InfiniteCreditCreditManager.java
      - copied unchanged from r1769836, qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/InfiniteCreditCreditManager.java
Removed:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSourceConsumer.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ConsumerNode.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ConsumerNodeIterator.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java
Modified:
    qpid/java/trunk/   (props changed)
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/AbstractKeyStore.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManagerTest.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    qpid/java/trunk/broker-plugins/logging-logback/src/main/java/org/apache/qpid/server/logging/logback/BrokerFileLoggerImpl.java
    qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
    qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
    qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
    qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
    qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
    qpid/java/trunk/test-profiles/apache-ci.test.overridden.properties

Propchange: qpid/java/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Nov 15 14:16:10 2016
@@ -9,4 +9,5 @@
 /qpid/branches/java-broker-vhost-refactor/java:1493674-1494547
 /qpid/branches/java-network-refactor/qpid/java:805429-821809
 /qpid/branches/qpid-2935/qpid/java:1061302-1072333
+/qpid/java/branches/remove-queue-runner:1767741-1769836
 /qpid/trunk/qpid:796646-796653

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java Tue Nov 15 14:16:10 2016
@@ -510,16 +510,18 @@ public class BDBHAVirtualHostNodeImpl ex
     }
 
     @Override
-    protected void onClose()
+    protected ListenableFuture<Void> onClose()
     {
-        try
-        {
-            super.onClose();
-        }
-        finally
-        {
-            closeEnvironment();
-        }
+        return doAfterAlways(super.onClose(),
+                             new Runnable()
+                             {
+                                 @Override
+                                 public void run()
+                                 {
+                                     closeEnvironment();
+                                 }
+                             });
+
     }
 
     @Override

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Tue Nov 15 14:16:10 2016
@@ -23,15 +23,11 @@ package org.apache.qpid.server.consumer;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,301 +35,224 @@ import org.apache.qpid.server.logging.Lo
 import org.apache.qpid.server.logging.messages.SubscriptionMessages;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AbstractQueue;
 import org.apache.qpid.server.queue.SuspendedConsumerLoggingTicker;
 import org.apache.qpid.server.transport.AMQPConnection;
-import org.apache.qpid.server.util.StateChangeListener;
 
-public abstract class AbstractConsumerTarget implements ConsumerTarget, LogSubject
+public abstract class AbstractConsumerTarget implements ConsumerTarget
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConsumerTarget.class);
-    protected static final String PULL_ONLY_CONSUMER = "x-pull-only";
-    private final AtomicReference<State> _state;
-
-    private final Set<StateChangeListener<ConsumerTarget, State>> _stateChangeListeners = new
-            CopyOnWriteArraySet<>();
+    private static final LogSubject MULTI_QUEUE_LOG_SUBJECT = new LogSubject()
+    {
+        @Override
+        public String toLogString()
+        {
+            return "[(** Multi-Queue **)] ";
+        }
+    };
+    private final AtomicReference<State> _state = new AtomicReference<>(State.OPEN);
 
-    private final Lock _stateChangeLock = new ReentrantLock();
-    private final AtomicInteger _stateActivates = new AtomicInteger();
     private final boolean _isMultiQueue;
     private final SuspendedConsumerLoggingTicker _suspendedConsumerLoggingTicker;
-    private ConcurrentLinkedQueue<ConsumerMessageInstancePair> _queue = new ConcurrentLinkedQueue();
     private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
 
-    private final boolean _isPullOnly;
     private Iterator<ConsumerImpl> _pullIterator;
+    private boolean _notifyWorkDesired;
 
-
-    protected AbstractConsumerTarget(final State initialState,
-                                     final boolean isPullOnly,
-                                     final boolean isMultiQueue,
+    protected AbstractConsumerTarget(final boolean isMultiQueue,
                                      final AMQPConnection<?> amqpConnection)
     {
-        _state = new AtomicReference<State>(initialState);
-        _isPullOnly = isPullOnly;
         _isMultiQueue = isMultiQueue;
-        _suspendedConsumerLoggingTicker = isMultiQueue
-                ? new SuspendedConsumerLoggingTicker(amqpConnection.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD))
-                {
-                    @Override
-                    protected void log(final long period)
-                    {
-                        amqpConnection.getEventLogger().message(AbstractConsumerTarget.this, SubscriptionMessages.STATE(period));
-                    }
-                }
-                : null;
 
+        _suspendedConsumerLoggingTicker = new SuspendedConsumerLoggingTicker(amqpConnection.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD))
+        {
+            @Override
+            protected void log(final long period)
+            {
+                amqpConnection.getEventLogger().message(AbstractConsumerTarget.this.getLogSubject(), SubscriptionMessages.STATE(period));
+            }
+        };
     }
 
-    public boolean isMultiQueue()
-    {
-        return _isMultiQueue;
-    }
-
-    @Override
-    public boolean processPending()
+    private LogSubject getLogSubject()
     {
-        if (!getSessionModel().getAMQPConnection().isIOThread())
-        {
-            return false;
-        }
-        if(sendNextMessage())
+        if (_consumers.size() == 1 && _consumers.get(0) instanceof LogSubject)
         {
-            return true;
+            return (LogSubject) _consumers.get(0);
         }
         else
         {
-            processStateChanged();
-            processClosed();
-            return false;
+            return MULTI_QUEUE_LOG_SUBJECT;
         }
     }
 
-    @Override
-    public boolean hasPendingWork()
+    public boolean isMultiQueue()
     {
-        return hasMessagesToSend() || hasStateChanged() || hasClosed();
+        return _isMultiQueue;
     }
 
-    protected abstract boolean hasStateChanged();
-
-    protected abstract boolean hasClosed();
-
-    protected abstract void processStateChanged();
-
-    protected abstract void processClosed();
-
     @Override
-    public void consumerAdded(final ConsumerImpl sub)
+    public void notifyWork()
     {
-        _consumers.add(sub);
+        getSessionModel().notifyWork(this);
     }
 
-    @Override
-    public void consumerRemoved(final ConsumerImpl sub)
+    protected final void setNotifyWorkDesired(final boolean desired)
     {
-        _consumers.remove(sub);
-        if(_consumers.isEmpty())
+        if (desired != _notifyWorkDesired)
         {
-            close();
+            if(_suspendedConsumerLoggingTicker != null)
+            {
+                if (desired)
+                {
+                    getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
+                }
+                else
+                {
+                    _suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
+                    getSessionModel().addTicker(_suspendedConsumerLoggingTicker);
+                }
+            }
+
+            for (ConsumerImpl consumer : _consumers)
+            {
+                consumer.setNotifyWorkDesired(desired);
+            }
+
+            _notifyWorkDesired = desired;
         }
     }
 
-    public List<ConsumerImpl> getConsumers()
+    public final boolean isNotifyWorkDesired()
     {
-        return _consumers;
+        return _notifyWorkDesired;
     }
 
-
     @Override
-    public final boolean isSuspended()
+    public boolean processPending()
     {
-        return getSessionModel().getAMQPConnection().isMessageAssignmentSuspended() || isFlowSuspended();
-    }
+        if (!getSessionModel().getAMQPConnection().isIOThread())
+        {
+            return false;
+        }
 
-    @Override
-    public boolean hasCredit()
-    {
-        return !isFlowSuspended();
+        // TODO - if not closed
+        return sendNextMessage();
     }
 
-    protected abstract boolean isFlowSuspended();
-
-    public final State getState()
+    @Override
+    public void consumerAdded(final ConsumerImpl sub)
     {
-        return _state.get();
+        _consumers.add(sub);
     }
 
-    protected final boolean updateState(State from, State to)
+    @Override
+    public ListenableFuture<Void> consumerRemoved(final ConsumerImpl sub)
     {
-        if(_state.compareAndSet(from, to))
+        if(_consumers.contains(sub))
         {
-            if (to == State.ACTIVE && _stateChangeListeners.size() > 1)
-            {
-                int offset = _stateActivates.incrementAndGet();
-                if (offset >= _stateChangeListeners.size())
-                {
-                    _stateActivates.set(0);
-                    offset = 0;
-                }
-
-                List<StateChangeListener<ConsumerTarget, State>> holdovers = new ArrayList<>();
-                int pos = 0;
-                for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
-                {
-                    if (pos++ < offset)
+            return doOnIoThreadAsync(
+                    new Runnable()
                     {
-                        holdovers.add(listener);
-                    }
-                    else
-                    {
-                        listener.stateChanged(this, from, to);
-                    }
-                }
-                for (StateChangeListener<ConsumerTarget, State> listener : holdovers)
-                {
-                    listener.stateChanged(this, from, to);
-                }
-
-            }
-            else
-            {
-                if(!_stateChangeListeners.isEmpty())
-                {
-                    for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
-                    {
-                        listener.stateChanged(this, from, to);
-                    }
-                }
-            }
-            if(_suspendedConsumerLoggingTicker != null)
-            {
-                if (to == State.SUSPENDED)
-                {
-                    _suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
-                    getSessionModel().addTicker(_suspendedConsumerLoggingTicker);
-                }
-                else
-                {
-                    getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
-                }
-            }
-            return true;
+                        @Override
+                        public void run()
+                        {
+                            consumerRemovedInternal(sub);
+                        }
+                    });
         }
         else
         {
-            return false;
+            return Futures.immediateFuture(null);
         }
     }
 
-    @Override
-    public final void notifyCurrentState()
-    {
-
-        for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
-        {
-            State state = getState();
-            listener.stateChanged(this, state, state);
-        }
-    }
-    public final void addStateListener(StateChangeListener<ConsumerTarget, State> listener)
+    private ListenableFuture<Void> doOnIoThreadAsync(final Runnable task)
     {
-        _stateChangeListeners.add(listener);
+        AMQSessionModel<?> sessionModel = getSessionModel();
+        return sessionModel.getAMQPConnection().doOnIOThreadAsync(task);
     }
 
-    @Override
-    public void removeStateChangeListener(final StateChangeListener<ConsumerTarget, State> listener)
+    private void consumerRemovedInternal(final ConsumerImpl sub)
     {
-        _stateChangeListeners.remove(listener);
+        _consumers.remove(sub);
+        if(_consumers.isEmpty())
+        {
+            close();
+        }
     }
 
-    public final boolean trySendLock()
+    public List<ConsumerImpl> getConsumers()
     {
-        return _stateChangeLock.tryLock();
+        return _consumers;
     }
 
-    public final void getSendLock()
-    {
-        _stateChangeLock.lock();
-    }
 
-    public final void releaseSendLock()
+    @Override
+    public final boolean isSuspended()
     {
-        _stateChangeLock.unlock();
+        return !isNotifyWorkDesired();
     }
 
-    @Override
-    public boolean isPullOnly()
+    public final State getState()
     {
-        return _isPullOnly;
+        return _state.get();
     }
 
     @Override
     public final long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
     {
-        AMQPConnection<?> amqpConnection = getSessionModel().getAMQPConnection();
-        amqpConnection.reserveOutboundMessageSpace(entry.getMessage().getSize());
-        _queue.add(new ConsumerMessageInstancePair(consumer, entry, batch));
-        amqpConnection.notifyWork();
+        doSend(consumer, entry, batch);
+
+        if (consumer.acquires())
+        {
+            entry.makeAcquisitionStealable();
+        }
         return entry.getMessage().getSize();
     }
 
     protected abstract void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
 
-    @Override
-    public boolean hasMessagesToSend()
-    {
-        return !_queue.isEmpty() || (isPullOnly() && messagesAvailable());
-    }
 
-    private boolean messagesAvailable()
+    @Override
+    public boolean sendNextMessage()
     {
-        if(hasCredit())
+        AbstractQueue.MessageContainer messageContainer = null;
+        ConsumerImpl consumer = null;
+        boolean iteratedCompleteList = false;
+        while (messageContainer == null)
         {
-            for (ConsumerImpl consumer : _consumers)
+            if (_pullIterator == null || !_pullIterator.hasNext())
             {
-                if (consumer.hasAvailableMessages())
+                if (iteratedCompleteList)
                 {
-                    return true;
+                    break;
                 }
-            }
-        }
-        return false;
-    }
+                iteratedCompleteList = true;
 
-    @Override
-    public boolean sendNextMessage()
-    {
-        if(isPullOnly())
-        {
-            if(_pullIterator == null || !_pullIterator.hasNext())
-            {
                 _pullIterator = getConsumers().iterator();
             }
-            if(_pullIterator.hasNext())
+            if (_pullIterator.hasNext())
             {
-                ConsumerImpl consumer = _pullIterator.next();
-                consumer.pullMessage();
+                consumer = _pullIterator.next();
+                messageContainer = consumer.pullMessage();
             }
         }
-        ConsumerMessageInstancePair consumerMessage = _queue.poll();
-        if (consumerMessage != null)
+
+        if (messageContainer != null)
         {
+            MessageInstance entry = messageContainer._messageInstance;
             try
             {
-
-                ConsumerImpl consumer = consumerMessage.getConsumer();
-                MessageInstance entry = consumerMessage.getEntry();
-                boolean batch = consumerMessage.isBatch();
-                doSend(consumer, entry, batch);
-
-                if (consumer.acquires())
-                {
-                    entry.makeAcquisitionStealable();
-                }
+                send(consumer, entry, false);
             }
             finally
             {
-                consumerMessage.release();
+                if (messageContainer._messageReference != null)
+                {
+                    messageContainer._messageReference.release();
+                }
             }
             return true;
         }
@@ -347,54 +266,27 @@ public abstract class AbstractConsumerTa
 
     final public boolean close()
     {
-        boolean closed = false;
-        State state = getState();
-
-        getSendLock();
-        try
+        if (_state.compareAndSet(State.OPEN, State.CLOSED))
         {
-            while(!closed && state != State.CLOSED)
+            List<ConsumerImpl> consumers = new ArrayList<>(_consumers);
+            _consumers.clear();
+
+            setNotifyWorkDesired(false);
+
+            for (ConsumerImpl consumer : consumers)
             {
-                closed = updateState(state, State.CLOSED);
-                if(!closed)
-                {
-                    state = getState();
-                }
+                consumer.close();
             }
-            ConsumerMessageInstancePair instance;
-            while((instance = _queue.poll()) != null)
+            if (_suspendedConsumerLoggingTicker != null)
             {
-                MessageInstance entry = instance.getEntry();
-                entry.release(instance.getConsumer());
-                instance.release();
+                getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
             }
-            doCloseInternal();
-        }
-        finally
-        {
-            releaseSendLock();
-        }
 
-        for (ConsumerImpl consumer : _consumers)
-        {
-            consumer.close();
+            return true;
         }
-        if(_suspendedConsumerLoggingTicker != null)
+        else
         {
-            getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
+            return false;
         }
-
-        return closed;
-
-    }
-
-    @Override
-    public String toLogString()
-    {
-
-        return "[(** Multi-Queue **)] ";
     }
-
-
-    protected abstract void doCloseInternal();
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java Tue Nov 15 14:16:10 2016
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AbstractQueue;
 
 public interface ConsumerImpl
 {
@@ -33,9 +34,9 @@ public interface ConsumerImpl
 
     ConsumerTarget getTarget();
 
-    boolean hasAvailableMessages();
+    AbstractQueue.MessageContainer pullMessage();
 
-    void pullMessage();
+    void setNotifyWorkDesired(boolean desired);
 
     enum Option
     {
@@ -71,16 +72,7 @@ public interface ConsumerImpl
 
     void close();
 
-    boolean trySendLock();
-
-
-    void getSendLock();
-
-    void releaseSendLock();
-
     boolean isActive();
 
     String getName();
-
-    void flush();
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java Tue Nov 15 14:16:10 2016
@@ -20,43 +20,38 @@
  */
 package org.apache.qpid.server.consumer;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.util.StateChangeListener;
 
 public interface ConsumerTarget
 {
-
-
     void acquisitionRemoved(MessageInstance node);
 
-    void removeStateChangeListener(StateChangeListener<ConsumerTarget, State> listener);
-
     boolean processPending();
 
-    boolean hasPendingWork();
-
     String getTargetAddress();
 
-    boolean hasCredit();
-
     boolean isMultiQueue();
 
+    void notifyWork();
+
+    void updateNotifyWorkDesired();
+
+    boolean isNotifyWorkDesired();
+
     enum State
     {
-        ACTIVE, SUSPENDED, CLOSED
+        OPEN, CLOSED
     }
 
     State getState();
 
     void consumerAdded(ConsumerImpl sub);
 
-    void consumerRemoved(ConsumerImpl sub);
-
-    void notifyCurrentState();
-
-    void addStateListener(StateChangeListener<ConsumerTarget, State> listener);
+    ListenableFuture<Void> consumerRemoved(ConsumerImpl sub);
 
     long getUnacknowledgedBytes();
 
@@ -66,14 +61,10 @@ public interface ConsumerTarget
 
     long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
 
-    boolean hasMessagesToSend();
-
     boolean sendNextMessage();
 
     void flushBatched();
 
-    void queueDeleted();
-
     void queueEmpty();
 
     boolean allocateCredit(ServerMessage msg);
@@ -83,13 +74,4 @@ public interface ConsumerTarget
     boolean isSuspended();
 
     boolean close();
-
-    boolean trySendLock();
-
-    void getSendLock();
-
-    void releaseSendLock();
-
-    boolean isPullOnly();
-
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java Tue Nov 15 14:16:10 2016
@@ -24,19 +24,10 @@ package org.apache.qpid.server.flow;
 
 public interface FlowCreditManager
 {
-    public static interface FlowCreditManagerListener
-    {
-        void creditStateChanged(boolean hasCredit);
-    }
+    void restoreCredit(long messageCredit, long bytesCredit);
 
-    void addStateListener(FlowCreditManagerListener listener);
+    boolean hasCredit();
 
-    boolean removeListener(FlowCreditManagerListener listener);
-
-    public void restoreCredit(long messageCredit, long bytesCredit);
-
-    public boolean hasCredit();
-
-    public boolean useCreditForMessage(long msgSize);
+    boolean useCreditForMessage(long msgSize);
 
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java Tue Nov 15 14:16:10 2016
@@ -248,8 +248,6 @@ public interface MessageInstance
 
     void release(ConsumerImpl release);
 
-    boolean resend();
-
     void delete();
 
     boolean isDeleted();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java Tue Nov 15 14:16:10 2016
@@ -37,7 +37,7 @@ public interface MessageSource extends T
                               EnumSet<ConsumerImpl.Option> options,
                               Integer priority)
             throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
-                   ConsumerAccessRefused;
+                   ConsumerAccessRefused, QueueDeleted;
 
     Collection<? extends ConsumerImpl> getConsumers();
 
@@ -56,6 +56,19 @@ public interface MessageSource extends T
         {
         }
     }
+
+    /**
+     * QueueDeleted signals a failure to create a consumer, because the queue has been deleted.
+     * <p>
+     * TODO Move to top level, used outside this class.
+     */
+    final class QueueDeleted extends Exception
+    {
+
+        public QueueDeleted()
+        {
+        }
+    }
 
     /**
      * ExistingConsumerPreventsExclusive signals a failure to create an exclusive consumer, as a consumer

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java Tue Nov 15 14:16:10 2016
@@ -774,16 +774,25 @@ public abstract class AbstractConfigured
                         {
                             return closeChildren();
                         }
+                    }).then(new Callable<ListenableFuture<Void>>()
+                    {
+                        @Override
+                        public ListenableFuture<Void> call() throws Exception
+                        {
+                            return onClose();
+                        }
                     }).then(new Runnable()
-                            {
-                                @Override
-                                public void run()
-                                {
-                                    onClose();
-                                    unregister(false);
-                                    LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
-                                }
-                            });
+                    {
+                        @Override
+                        public void run()
+                        {
+                            unregister(false);
+                            LOGGER.debug("Closed "
+                                         + AbstractConfiguredObject.this.getClass().getSimpleName()
+                                         + " : "
+                                         + getName());
+                        }
+                    });
                 }
                 else
                 {
@@ -819,8 +828,9 @@ public abstract class AbstractConfigured
         return Futures.immediateFuture(null);
     }
 
-    protected void onClose()
+    protected ListenableFuture<Void> onClose()
     {
+        return Futures.immediateFuture(null);
     }
 
     public final void create()

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java Tue Nov 15 14:16:10 2016
@@ -168,7 +168,7 @@ public abstract class AbstractSystemConf
     }
 
     @Override
-    protected void onClose()
+    protected ListenableFuture<Void> onClose()
     {
         final TaskExecutor taskExecutor = getTaskExecutor();
         try
@@ -192,7 +192,7 @@ public abstract class AbstractSystemConf
                 taskExecutor.stopImmediately();
             }
         }
-
+        return Futures.immediateFuture(null);
     }
 
     @Override

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java Tue Nov 15 14:16:10 2016
@@ -671,7 +671,7 @@ public class BrokerImpl extends Abstract
     }
 
     @Override
-    protected void onClose()
+    protected ListenableFuture<Void> onClose()
     {
         if (_reportingTimer != null)
         {
@@ -700,6 +700,8 @@ public class BrokerImpl extends Abstract
                 task.run();
             }
         }
+        return Futures.immediateFuture(null);
+
     }
 
     @Override

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Tue Nov 15 14:16:10 2016
@@ -90,9 +90,10 @@ public interface Queue<X extends Queue<X
     @ManagedContextDefault( name = QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD)
     long DEFAULT_ESTIMATED_MESSAGE_MEMORY_OVERHEAD = 1024l;
 
-    String MAX_ASYNCHRONOUS_DELIVERIES = "queue.maxAsynchronousDeliveries";
-    @ManagedContextDefault(name = MAX_ASYNCHRONOUS_DELIVERIES )
-    int DEFAULT_MAX_ASYNCHRONOUS_DELIVERIES = 80;
+    String QUEUE_SCAVANGE_COUNT = "qpid.queue.scavenge_count";
+    @ManagedContextDefault( name = QUEUE_SCAVANGE_COUNT)
+    int DEFAULT_QUEUE_SCAVANGE_COUNT = 50;
+
 
     String MIME_TYPE_TO_FILE_EXTENSION = "qpid.mimeTypeToFileExtension";
     @SuppressWarnings("unused")
@@ -366,8 +367,6 @@ public interface Queue<X extends Queue<X
 
     void requeue(QueueEntry entry);
 
-    boolean resend(QueueEntry entry, QueueConsumer<?> consumer);
-
     List<? extends QueueEntry> getMessagesOnTheQueue();
 
     List<Long> getMessagesOnTheQueue(int num);
@@ -394,8 +393,6 @@ public interface Queue<X extends Queue<X
 
     Set<NotificationCheck> getNotificationChecks();
 
-    void deliverAsync();
-
     Collection<String> getAvailableAttributes();
 
     void completeRecovery();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java Tue Nov 15 14:16:10 2016
@@ -60,9 +60,9 @@ public final class SessionAdapter extend
     // Attributes
     private final AMQSessionModel _session;
     private final Action _deleteModelTask;
-    private final AbstractAMQPConnection<?> _amqpConnection;
+    private final AbstractAMQPConnection<?,?> _amqpConnection;
 
-    public SessionAdapter(final AbstractAMQPConnection<?> amqpConnection,
+    public SessionAdapter(final AbstractAMQPConnection<?,?> amqpConnection,
                           final AMQSessionModel session)
     {
         super(parentsMap(amqpConnection), createAttributes(session));
@@ -206,7 +206,7 @@ public final class SessionAdapter extend
         return Futures.immediateFuture(null);
     }
 
-    private void registerTransactionTimeoutTickers(AbstractAMQPConnection<?> amqpConnection,
+    private void registerTransactionTimeoutTickers(AbstractAMQPConnection<?,?> amqpConnection,
                                                    final AMQSessionModel session)
     {
         NamedAddressSpace addressSpace = amqpConnection.getAddressSpace();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java Tue Nov 15 14:16:10 2016
@@ -57,7 +57,6 @@ public interface AmqpPort<X extends Amqp
 
     String PORT_AMQP_NUMBER_OF_SELECTORS = "qpid.port.amqp.threadPool.numberOfSelectors";
     String PORT_AMQP_ACCEPT_BACKLOG = "qpid.port.amqp.acceptBacklog";
-    String PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE = "qpid.port.amqp.outboundMessageBufferSize";
 
     @ManagedContextDefault(name = DEFAULT_AMQP_PROTOCOLS)
     String INSTALLED_PROTOCOLS = AmqpPortImpl.getInstalledProtocolsAsString();
@@ -88,11 +87,6 @@ public interface AmqpPort<X extends Amqp
     @ManagedContextDefault(name = OPEN_CONNECTIONS_WARN_PERCENT)
     int DEFAULT_OPEN_CONNECTIONS_WARN_PERCENT = 80;
 
-    @SuppressWarnings("unused")
-    @ManagedContextDefault(name = PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE)
-    long DEFAULT_PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE = 1024 * 1024;
-
-
     String PROTOCOL_HANDSHAKE_TIMEOUT = "qpid.port.protocol_handshake_timeout";
 
     @SuppressWarnings("unused")

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java Tue Nov 15 14:16:10 2016
@@ -43,6 +43,7 @@ import javax.net.ssl.X509TrustManager;
 import javax.security.auth.Subject;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import org.slf4j.Logger;
@@ -310,7 +311,7 @@ public class AmqpPortImpl extends Abstra
     }
 
     @Override
-    protected void onClose()
+    protected ListenableFuture<Void> onClose()
     {
         if (_transport != null)
         {
@@ -322,6 +323,7 @@ public class AmqpPortImpl extends Abstra
 
             _transport.close();
         }
+        return Futures.immediateFuture(null);
     }
 
     @Override

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Tue Nov 15 14:16:10 2016
@@ -25,6 +25,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentSkipListSet;
 
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.Queue;
@@ -106,6 +107,5 @@ public interface AMQSessionModel<T exten
     void addTicker(Ticker ticker);
     void removeTicker(Ticker ticker);
 
-    void notifyConsumerTargetCurrentStates();
-    void ensureConsumersNoticedStateChange();
+    void notifyWork(ConsumerTarget target);
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org