You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/07 15:15:01 UTC

svn commit: r1768530 - in /qpid/java/branches/remove-queue-runner: bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/ broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/mod...

Author: rgodfrey
Date: Mon Nov  7 15:15:00 2016
New Revision: 1768530

URL: http://svn.apache.org/viewvc?rev=1768530&view=rev
Log:
Removed sendLock, made onClose return future

Removed:
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java
Modified:
    qpid/java/branches/remove-queue-runner/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/security/AbstractKeyStore.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
    qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java
    qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    qpid/java/branches/remove-queue-runner/broker-plugins/logging-logback/src/main/java/org/apache/qpid/server/logging/logback/BrokerFileLoggerImpl.java
    qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
    qpid/java/branches/remove-queue-runner/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java

Modified: qpid/java/branches/remove-queue-runner/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java Mon Nov  7 15:15:00 2016
@@ -510,16 +510,18 @@ public class BDBHAVirtualHostNodeImpl ex
     }
 
     @Override
-    protected void onClose()
+    protected ListenableFuture<Void> onClose()
     {
-        try
-        {
-            super.onClose();
-        }
-        finally
-        {
-            closeEnvironment();
-        }
+        return doAfterAlways(super.onClose(),
+                             new Runnable()
+                             {
+                                 @Override
+                                 public void run()
+                                 {
+                                     closeEnvironment();
+                                 }
+                             });
+
     }
 
     @Override

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Mon Nov  7 15:15:00 2016
@@ -32,6 +32,8 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +41,7 @@ import org.apache.qpid.server.logging.Lo
 import org.apache.qpid.server.logging.messages.SubscriptionMessages;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AbstractQueue;
 import org.apache.qpid.server.queue.SuspendedConsumerLoggingTicker;
 import org.apache.qpid.server.transport.AMQPConnection;
@@ -90,7 +93,7 @@ public abstract class AbstractConsumerTa
     public void notifyWork()
     {
         _waitingOnStateChange.set(false);
-        getSessionModel().getAMQPConnection().notifyWork();
+        getSessionModel().notifyWork(this);
     }
 
     @Override
@@ -100,32 +103,25 @@ public abstract class AbstractConsumerTa
         {
             return false;
         }
-        if(sendNextMessage())
-        {
-            return true;
-        }
-        else
-        {
-            processStateChanged();
-            processClosed();
-            return false;
-        }
+        return sendNextMessage();
     }
 
     @Override
     public boolean hasPendingWork()
     {
-        return hasMessagesToSend() || hasStateChanged() || hasClosed();
+        if (!_waitingOnStateChange.get() && hasCredit())
+        {
+            for (ConsumerImpl consumer : _consumers)
+            {
+                if (consumer.hasAvailableMessages())
+                {
+                    return true;
+                }
+            }
+        }
+        return false;
     }
 
-    protected abstract boolean hasStateChanged();
-
-    protected abstract boolean hasClosed();
-
-    protected abstract void processStateChanged();
-
-    protected abstract void processClosed();
-
     @Override
     public void consumerAdded(final ConsumerImpl sub)
     {
@@ -133,7 +129,33 @@ public abstract class AbstractConsumerTa
     }
 
     @Override
-    public void consumerRemoved(final ConsumerImpl sub)
+    public ListenableFuture<Void> consumerRemoved(final ConsumerImpl sub)
+    {
+        if(_consumers.contains(sub))
+        {
+            return doOnIoThreadAsync(
+                    new Runnable()
+                    {
+                        @Override
+                        public void run()
+                        {
+                            consumerRemovedInternal(sub);
+                        }
+                    });
+        }
+        else
+        {
+            return Futures.immediateFuture(null);
+        }
+    }
+
+    private ListenableFuture<Void> doOnIoThreadAsync(final Runnable task)
+    {
+        AMQSessionModel<?> sessionModel = getSessionModel();
+        return sessionModel.getAMQPConnection().doOnIOThreadAsync(task);
+    }
+
+    private void consumerRemovedInternal(final ConsumerImpl sub)
     {
         _consumers.remove(sub);
         if(_consumers.isEmpty())
@@ -250,21 +272,6 @@ public abstract class AbstractConsumerTa
         _stateChangeListeners.remove(listener);
     }
 
-    public final boolean trySendLock()
-    {
-        return _stateChangeLock.tryLock();
-    }
-
-    public final void getSendLock()
-    {
-        _stateChangeLock.lock();
-    }
-
-    public final void releaseSendLock()
-    {
-        _stateChangeLock.unlock();
-    }
-
     @Override
     public final long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
     {
@@ -279,21 +286,6 @@ public abstract class AbstractConsumerTa
 
     protected abstract void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
 
-    @Override
-    public boolean hasMessagesToSend()
-    {
-        if (!_waitingOnStateChange.get() && hasCredit())
-        {
-            for (ConsumerImpl consumer : _consumers)
-            {
-                if (consumer.hasAvailableMessages())
-                {
-                    return true;
-                }
-            }
-        }
-        return false;
-    }
 
     @Override
     public boolean sendNextMessage()
@@ -351,27 +343,21 @@ public abstract class AbstractConsumerTa
     {
         boolean closed = false;
         State state = getState();
+        List<ConsumerImpl> consumers = new ArrayList<>(_consumers);
+        _consumers.clear();
 
-        getSendLock();
-        try
+        while(!closed && state != State.CLOSED)
         {
-            while(!closed && state != State.CLOSED)
+            closed = updateState(state, State.CLOSED);
+            if(!closed)
             {
-                closed = updateState(state, State.CLOSED);
-                if(!closed)
-                {
-                    state = getState();
-                }
+                state = getState();
             }
-            ConsumerMessageInstancePair instance;
-            doCloseInternal();
-        }
-        finally
-        {
-            releaseSendLock();
         }
 
-        for (ConsumerImpl consumer : _consumers)
+        doCloseInternal();
+
+        for (ConsumerImpl consumer : consumers)
         {
             consumer.close();
         }

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java Mon Nov  7 15:15:00 2016
@@ -72,13 +72,6 @@ public interface ConsumerImpl
 
     void close();
 
-    boolean trySendLock();
-
-
-    void getSendLock();
-
-    void releaseSendLock();
-
     boolean isActive();
 
     String getName();

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java Mon Nov  7 15:15:00 2016
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.consumer;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -54,7 +56,7 @@ public interface ConsumerTarget
 
     void consumerAdded(ConsumerImpl sub);
 
-    void consumerRemoved(ConsumerImpl sub);
+    ListenableFuture<Void> consumerRemoved(ConsumerImpl sub);
 
     void notifyCurrentState();
 
@@ -68,8 +70,6 @@ public interface ConsumerTarget
 
     long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
 
-    boolean hasMessagesToSend();
-
     boolean sendNextMessage();
 
     void flushBatched();
@@ -85,11 +85,4 @@ public interface ConsumerTarget
     boolean isSuspended();
 
     boolean close();
-
-    boolean trySendLock();
-
-    void getSendLock();
-
-    void releaseSendLock();
-
 }

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java Mon Nov  7 15:15:00 2016
@@ -774,16 +774,25 @@ public abstract class AbstractConfigured
                         {
                             return closeChildren();
                         }
+                    }).then(new Callable<ListenableFuture<Void>>()
+                    {
+                        @Override
+                        public ListenableFuture<Void> call() throws Exception
+                        {
+                            return onClose();
+                        }
                     }).then(new Runnable()
-                            {
-                                @Override
-                                public void run()
-                                {
-                                    onClose();
-                                    unregister(false);
-                                    LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
-                                }
-                            });
+                    {
+                        @Override
+                        public void run()
+                        {
+                            unregister(false);
+                            LOGGER.debug("Closed "
+                                         + AbstractConfiguredObject.this.getClass().getSimpleName()
+                                         + " : "
+                                         + getName());
+                        }
+                    });
                 }
                 else
                 {
@@ -819,8 +828,9 @@ public abstract class AbstractConfigured
         return Futures.immediateFuture(null);
     }
 
-    protected void onClose()
+    protected ListenableFuture<Void> onClose()
     {
+        return Futures.immediateFuture(null);
     }
 
     public final void create()

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java Mon Nov  7 15:15:00 2016
@@ -168,7 +168,7 @@ public abstract class AbstractSystemConf
     }
 
     @Override
-    protected void onClose()
+    protected ListenableFuture<Void> onClose()
     {
         final TaskExecutor taskExecutor = getTaskExecutor();
         try
@@ -192,7 +192,7 @@ public abstract class AbstractSystemConf
                 taskExecutor.stopImmediately();
             }
         }
-
+        return Futures.immediateFuture(null);
     }
 
     @Override

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java Mon Nov  7 15:15:00 2016
@@ -671,7 +671,7 @@ public class BrokerImpl extends Abstract
     }
 
     @Override
-    protected void onClose()
+    protected ListenableFuture<Void> onClose()
     {
         if (_reportingTimer != null)
         {
@@ -700,6 +700,8 @@ public class BrokerImpl extends Abstract
                 task.run();
             }
         }
+        return Futures.immediateFuture(null);
+
     }
 
     @Override

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java Mon Nov  7 15:15:00 2016
@@ -60,9 +60,9 @@ public final class SessionAdapter extend
     // Attributes
     private final AMQSessionModel _session;
     private final Action _deleteModelTask;
-    private final AbstractAMQPConnection<?> _amqpConnection;
+    private final AbstractAMQPConnection<?,?> _amqpConnection;
 
-    public SessionAdapter(final AbstractAMQPConnection<?> amqpConnection,
+    public SessionAdapter(final AbstractAMQPConnection<?,?> amqpConnection,
                           final AMQSessionModel session)
     {
         super(parentsMap(amqpConnection), createAttributes(session));
@@ -206,7 +206,7 @@ public final class SessionAdapter extend
         return Futures.immediateFuture(null);
     }
 
-    private void registerTransactionTimeoutTickers(AbstractAMQPConnection<?> amqpConnection,
+    private void registerTransactionTimeoutTickers(AbstractAMQPConnection<?,?> amqpConnection,
                                                    final AMQSessionModel session)
     {
         NamedAddressSpace addressSpace = amqpConnection.getAddressSpace();

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java Mon Nov  7 15:15:00 2016
@@ -43,6 +43,7 @@ import javax.net.ssl.X509TrustManager;
 import javax.security.auth.Subject;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import org.slf4j.Logger;
@@ -310,7 +311,7 @@ public class AmqpPortImpl extends Abstra
     }
 
     @Override
-    protected void onClose()
+    protected ListenableFuture<Void> onClose()
     {
         if (_transport != null)
         {
@@ -322,6 +323,7 @@ public class AmqpPortImpl extends Abstra
 
             _transport.close();
         }
+        return Futures.immediateFuture(null);
     }
 
     @Override

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Mon Nov  7 15:15:00 2016
@@ -25,6 +25,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentSkipListSet;
 
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.Queue;
@@ -106,5 +107,5 @@ public interface AMQSessionModel<T exten
     void addTicker(Ticker ticker);
     void removeTicker(Ticker ticker);
 
-    void ensureConsumersNoticedStateChange();
+    void notifyWork(ConsumerTarget target);
 }

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Mon Nov  7 15:15:00 2016
@@ -1851,11 +1851,11 @@ public abstract class AbstractQueue<X ex
     }
 
     @Override
-    protected void onClose()
+    protected ListenableFuture<Void> onClose()
     {
-        super.onClose();
         _stopped.set(true);
         _closing = false;
+        return Futures.immediateFuture(null);
     }
 
     public void checkCapacity(AMQSessionModel channel)
@@ -1931,7 +1931,6 @@ public abstract class AbstractQueue<X ex
         boolean queueEmpty = false;
         MessageContainer messageContainer = null;
 
-        sub.getSendLock();
         try
         {
             if (!sub.isSuspended())
@@ -1950,7 +1949,6 @@ public abstract class AbstractQueue<X ex
         }
         finally
         {
-            sub.releaseSendLock();
             if(queueEmpty)
             {
                 sub.queueEmpty();

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Mon Nov  7 15:15:00 2016
@@ -33,6 +33,8 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -301,28 +303,33 @@ class QueueConsumerImpl
     }
 
     @Override
-    protected void onClose()
+    protected ListenableFuture<Void> onClose()
     {
         if(_closed.compareAndSet(false,true))
         {
-            _target.getSendLock();
-            try
-            {
-                _waitingOnCreditMessageListener.remove();
-                _target.consumerRemoved(this);
-                _target.removeStateChangeListener(_listener);
-                _queue.unregisterConsumer(this);
-                if(_suspendedConsumerLoggingTicker != null)
-                {
-                    getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
-                }
-                deleted();
-            }
-            finally
-            {
-                _target.releaseSendLock();
-            }
+            _waitingOnCreditMessageListener.remove();
 
+            return doAfter(_target.consumerRemoved(this),
+                           new Runnable()
+                           {
+                               @Override
+                               public void run()
+                               {
+                                   _target.removeStateChangeListener(_listener);
+
+                                   _queue.unregisterConsumer(QueueConsumerImpl.this);
+
+                                   if (_suspendedConsumerLoggingTicker != null)
+                                   {
+                                       getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
+                                   }
+                                   deleted();
+                               }
+                           });
+        }
+        else
+        {
+            return Futures.immediateFuture(null);
         }
     }
 
@@ -524,21 +531,6 @@ class QueueConsumerImpl
         return filterLogString.toString();
     }
 
-    public final boolean trySendLock()
-    {
-        return getTarget().trySendLock();
-    }
-
-    public final void getSendLock()
-    {
-        getTarget().getSendLock();
-    }
-
-    public final void releaseSendLock()
-    {
-        getTarget().releaseSendLock();
-    }
-
     public final long getCreateTime()
     {
         return _createTime;

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/security/AbstractKeyStore.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/security/AbstractKeyStore.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/security/AbstractKeyStore.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/security/AbstractKeyStore.java Mon Nov  7 15:15:00 2016
@@ -79,14 +79,14 @@ public abstract class AbstractKeyStore<X
     }
 
     @Override
-    protected void onClose()
+    protected ListenableFuture<Void> onClose()
     {
-        super.onClose();
         if(_checkExpiryTaskFuture != null)
         {
             _checkExpiryTaskFuture.cancel(false);
             _checkExpiryTaskFuture = null;
         }
+        return Futures.immediateFuture(null);
     }
 
     protected void initializeExpiryChecking()

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java Mon Nov  7 15:15:00 2016
@@ -27,6 +27,8 @@ import java.util.Collection;
 
 import javax.security.auth.Subject;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.logging.EventLoggerProvider;
 import org.apache.qpid.server.model.Connection;
@@ -73,6 +75,7 @@ public interface AMQPConnection<C extend
     void sendConnectionCloseAsync(AMQConstant connectionForced, String reason);
 
     boolean isIOThread();
+    ListenableFuture<Void> doOnIOThreadAsync(final Runnable task);
 
     void checkAuthorizedMessagePrincipal(String messageUserId);
 
@@ -86,4 +89,6 @@ public interface AMQPConnection<C extend
     Collection<? extends AMQSessionModel<?>> getSessionModels();
 
     void resetStatistics();
+
+    void notifyWork(AMQSessionModel<?> sessionModel);
 }

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Mon Nov  7 15:15:00 2016
@@ -39,6 +39,7 @@ import java.util.concurrent.atomic.Atomi
 import javax.security.auth.Subject;
 import javax.security.auth.SubjectDomainCombiner;
 
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import org.slf4j.Logger;
@@ -75,7 +76,7 @@ import org.apache.qpid.server.util.Fixed
 import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.transport.network.Ticker;
 
-public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C>>
+public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,T>, T>
         extends AbstractConfiguredObject<C>
         implements ProtocolEngine, AMQPConnection<C>, EventLoggerProvider
 
@@ -497,6 +498,42 @@ public abstract class AbstractAMQPConnec
         return Thread.currentThread() == _ioThread;
     }
 
+    @Override
+    public ListenableFuture<Void> doOnIOThreadAsync(final Runnable task)
+    {
+        if (isIOThread())
+        {
+            task.run();
+            return Futures.immediateFuture(null);
+        }
+        else
+        {
+            final SettableFuture<Void> future = SettableFuture.create();
+
+            addAsyncTask(
+                    new Action<Object>()
+                    {
+                        @Override
+                        public void performAction(final Object object)
+                        {
+                            try
+                            {
+                                task.run();
+                                future.set(null);
+                            }
+                            catch (RuntimeException e)
+                            {
+                                future.setException(e);
+                            }
+                        }
+                    });
+            return future;
+        }
+    }
+
+    protected abstract void addAsyncTask(final Action<? super T> action);
+
+
     protected <T> T runAsSubject(PrivilegedAction<T> action)
     {
         return Subject.doAs(_subject, action);

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Mon Nov  7 15:15:00 2016
@@ -44,7 +44,6 @@ import org.apache.qpid.server.queue.Abst
 import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.StateChangeListener;
@@ -146,23 +145,15 @@ public abstract class AbstractSystemMess
         @Override
         public AbstractQueue.MessageContainer pullMessage()
         {
-            _target.getSendLock();
-            try
+            if (!_queue.isEmpty())
             {
-                if (!_queue.isEmpty())
+                final PropertiesMessageInstance propertiesMessageInstance = _queue.get(0);
+                if (!_target.isSuspended() && _target.allocateCredit(propertiesMessageInstance.getMessage()))
                 {
-                    final PropertiesMessageInstance propertiesMessageInstance = _queue.get(0);
-                    if (!_target.isSuspended() && _target.allocateCredit(propertiesMessageInstance.getMessage()))
-                    {
-                        _queue.remove(0);
-                        return new AbstractQueue.MessageContainer(propertiesMessageInstance, null);
-                    }
+                    _queue.remove(0);
+                    return new AbstractQueue.MessageContainer(propertiesMessageInstance, null);
                 }
             }
-            finally
-            {
-                _target.releaseSendLock();
-            }
             return null;
         }
 
@@ -239,24 +230,6 @@ public abstract class AbstractSystemMess
         }
 
         @Override
-        public boolean trySendLock()
-        {
-            return _target.trySendLock();
-        }
-
-        @Override
-        public void getSendLock()
-        {
-            _target.getSendLock();
-        }
-
-        @Override
-        public void releaseSendLock()
-        {
-            _target.releaseSendLock();
-        }
-
-        @Override
         public boolean isActive()
         {
             return false;

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Mon Nov  7 15:15:00 2016
@@ -1444,7 +1444,7 @@ public abstract class AbstractVirtualHos
     }
 
     @Override
-    protected void onClose()
+    protected ListenableFuture<Void> onClose()
     {
         _dtxRegistry.close();
         shutdownHouseKeeping();
@@ -1455,6 +1455,7 @@ public abstract class AbstractVirtualHos
         _eventLogger.message(VirtualHostMessages.CLOSED(getName()));
 
         stopLogging(_virtualHostLoggersToClose);
+        return Futures.immediateFuture(null);
     }
 
 

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java Mon Nov  7 15:15:00 2016
@@ -405,10 +405,11 @@ public abstract class AbstractVirtualHos
     }
 
     @Override
-    protected void onClose()
+    protected ListenableFuture<Void> onClose()
     {
         closeConfigurationStore();
         _virtualHostExecutor.stop();
+        return Futures.immediateFuture(null);
     }
 
     private void closeConfigurationStore()

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java Mon Nov  7 15:15:00 2016
@@ -142,8 +142,9 @@ public class ManagementModeStoreHandlerT
             }
 
             @Override
-            protected void onClose()
+            protected ListenableFuture<Void> onClose()
             {
+                return Futures.immediateFuture(null);
             }
 
             @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.QUIESCED)

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java Mon Nov  7 15:15:00 2016
@@ -31,6 +31,9 @@ import java.util.UUID;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
@@ -153,12 +156,6 @@ public class TestConsumerTarget implemen
     }
 
     @Override
-    public boolean hasMessagesToSend()
-    {
-        return false;
-    }
-
-    @Override
     public boolean sendNextMessage()
     {
         return false;
@@ -199,9 +196,10 @@ public class TestConsumerTarget implemen
     }
 
     @Override
-    public void consumerRemoved(final ConsumerImpl sub)
+    public ListenableFuture<Void> consumerRemoved(final ConsumerImpl sub)
     {
        close();
+        return Futures.immediateFuture(null);
     }
 
     @Override
@@ -276,21 +274,6 @@ public class TestConsumerTarget implemen
     }
 
 
-    public final boolean trySendLock()
-    {
-        return _stateChangeLock.tryLock();
-    }
-
-    public final void getSendLock()
-    {
-        _stateChangeLock.lock();
-    }
-
-    public final void releaseSendLock()
-    {
-        _stateChangeLock.unlock();
-    }
-
     @Override
     public boolean isMultiQueue()
     {
@@ -508,9 +491,9 @@ public class TestConsumerTarget implemen
         }
 
         @Override
-        public void ensureConsumersNoticedStateChange()
+        public void notifyWork(final ConsumerTarget target)
         {
-
+            _connection.notifyWork(this);
         }
 
         @Override

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Mon Nov  7 15:15:00 2016
@@ -55,7 +55,7 @@ import org.apache.qpid.transport.Constan
 import org.apache.qpid.server.transport.AggregateTicker;
 
 
-public class AMQPConnection_0_10 extends AbstractAMQPConnection<AMQPConnection_0_10>
+public class AMQPConnection_0_10 extends AbstractAMQPConnection<AMQPConnection_0_10, ServerConnection>
 {
     private static final Logger _logger = LoggerFactory.getLogger(AMQPConnection_0_10.class);
     private final ServerInputHandler _inputHandler;
@@ -277,6 +277,12 @@ public class AMQPConnection_0_10 extends
         }
     }
 
+    @Override
+    public void notifyWork(final AMQSessionModel<?> sessionModel)
+    {
+        notifyWork();
+    }
+
     public void clearWork()
     {
         _stateChanged.set(false);
@@ -303,6 +309,12 @@ public class AMQPConnection_0_10 extends
         _connection.closeSessionAsync((ServerSession) session, cause, message);
     }
 
+    @Override
+    protected void addAsyncTask(final Action<? super ServerConnection> action)
+    {
+        _connection.addAsyncTask(action);
+    }
+
     public void block()
     {
         _connection.block();

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Mon Nov  7 15:15:00 2016
@@ -536,19 +536,10 @@ public class ConsumerTarget_0_10 extends
 
     public void stop()
     {
-        try
-        {
-            getSendLock();
-
-            updateState(State.ACTIVE, State.SUSPENDED);
-            _stopped.set(true);
-            FlowCreditManager_0_10 creditManager = getCreditManager();
-            creditManager.clearCredit();
-        }
-        finally
-        {
-            releaseSendLock();
-        }
+        updateState(State.ACTIVE, State.SUSPENDED);
+        _stopped.set(true);
+        FlowCreditManager_0_10 creditManager = getCreditManager();
+        creditManager.clearCredit();
     }
 
     public void addCredit(MessageCreditUnit unit, long value)
@@ -647,30 +638,6 @@ public class ConsumerTarget_0_10 extends
     }
 
     @Override
-    protected void processClosed()
-    {
-
-    }
-
-    @Override
-    protected void processStateChanged()
-    {
-
-    }
-
-    @Override
-    protected boolean hasStateChanged()
-    {
-        return false;
-    }
-
-    @Override
-    protected boolean hasClosed()
-    {
-        return false;
-    }
-
-    @Override
     public String toString()
     {
         return "ConsumerTarget_0_10[name=" + _name + ", session=" + _session.toLogString() + "]";

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Mon Nov  7 15:15:00 2016
@@ -31,7 +31,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -133,7 +132,7 @@ public class ServerConnection extends Co
             getAmqpConnection().getAggregateTicker().addTicker(new ConnectionClosingTicker(System.currentTimeMillis() + CLOSE_OK_TIMEOUT, (ServerNetworkConnection) getNetworkConnection()));
 
             // trigger a wakeup to ensure the ticker will be taken into account
-            notifyWork();
+            getAmqpConnection().notifyWork();
         }
     }
 
@@ -468,10 +467,10 @@ public class ServerConnection extends Co
         super.doHeartBeat();
     }
 
-    private void addAsyncTask(final Action<ServerConnection> action)
+    void addAsyncTask(final Action<? super ServerConnection> action)
     {
         _asyncTaskList.add(action);
-        notifyWork();
+        getAmqpConnection().notifyWork();
     }
 
     public int getMessageCompressionThreshold()
@@ -492,11 +491,6 @@ public class ServerConnection extends Co
         }
     }
 
-    public void notifyWork()
-    {
-        _amqpConnection.notifyWork();
-    }
-
     public Iterator<Runnable> processPendingIterator()
     {
         return new ProcessPendingIterator();

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Mon Nov  7 15:15:00 2016
@@ -206,7 +206,7 @@ public class ServerSession extends Sessi
         }
 
         _blockingTimeout = serverConnection.getBroker().getContextValue(Long.class, Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
-        _maxUncommittedInMemorySize = getConnection().getAmqpConnection().getContextProvider().getContextValue(Long.class, org.apache.qpid.server.model.Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
+        _maxUncommittedInMemorySize = getAMQPConnection().getContextProvider().getContextValue(Long.class, org.apache.qpid.server.model.Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
         _publishAuthCahe = new PublishAuthorisationCache(_token,
                                                          amqpConnection.getContextValue(Long.class, org.apache.qpid.server.model.Session.PRODUCER_AUTH_CACHE_TIMEOUT),
                                                          amqpConnection.getContextValue(Integer.class, org.apache.qpid.server.model.Session.PRODUCER_AUTH_CACHE_SIZE));
@@ -226,7 +226,7 @@ public class ServerSession extends Sessi
 
             if (state == State.OPEN)
             {
-                getConnection().getAmqpConnection().getEventLogger().message(ChannelMessages.CREATE());
+                getAMQPConnection().getEventLogger().message(ChannelMessages.CREATE());
             }
         }
         else
@@ -320,8 +320,8 @@ public class ServerSession extends Sessi
                 handle.flowToDisk();
                 if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getMetaData().getContentSize())
                 {
-                    getConnection().getAmqpConnection().getEventLogger()
-                            .message(_logSubject, ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));
+                    getAMQPConnection().getEventLogger()
+                                       .message(_logSubject, ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));
                 }
 
                 if(!_uncommittedMessages.isEmpty())
@@ -528,7 +528,7 @@ public class ServerSession extends Sessi
         {
             operationalLoggingMessage = ChannelMessages.CLOSE();
         }
-        getConnection().getAmqpConnection().getEventLogger().message(getLogSubject(), operationalLoggingMessage);
+        getAMQPConnection().getEventLogger().message(getLogSubject(), operationalLoggingMessage);
     }
 
     @Override
@@ -853,10 +853,10 @@ public class ServerSession extends Sessi
 
                 if(_blocking.compareAndSet(false,true))
                 {
-                    getConnection().getAmqpConnection().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
+                    getAMQPConnection().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
                     if(getState() == State.OPEN)
                     {
-                        getConnection().notifyWork();
+                        getAMQPConnection().notifyWork(this);
                     }
                 }
 
@@ -881,8 +881,8 @@ public class ServerSession extends Sessi
         {
             if(_blocking.compareAndSet(true,false) && !isClosing())
             {
-                getConnection().getAmqpConnection().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED());
-                getConnection().notifyWork();
+                getAMQPConnection().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED());
+                getAMQPConnection().notifyWork(this);
             }
         }
     }
@@ -1239,7 +1239,7 @@ public class ServerSession extends Sessi
     @Override
     public void addTicker(final Ticker ticker)
     {
-        getConnection().getAmqpConnection().getAggregateTicker().addTicker(ticker);
+        getAMQPConnection().getAggregateTicker().addTicker(ticker);
         // trigger a wakeup to ensure the ticker will be taken into account
         getAMQPConnection().notifyWork();
     }
@@ -1247,24 +1247,13 @@ public class ServerSession extends Sessi
     @Override
     public void removeTicker(final Ticker ticker)
     {
-        getConnection().getAmqpConnection().getAggregateTicker().removeTicker(ticker);
+        getAMQPConnection().getAggregateTicker().removeTicker(ticker);
     }
 
     @Override
-    public void ensureConsumersNoticedStateChange()
+    public void notifyWork(final ConsumerTarget target)
     {
-        Collection<ConsumerTarget_0_10> consumerTargets = getSubscriptions();
-        for(ConsumerTarget_0_10 consumerTarget: consumerTargets)
-        {
-            try
-            {
-                consumerTarget.getSendLock();
-            }
-            finally
-            {
-                consumerTarget.releaseSendLock();
-            }
-        }
+        getAMQPConnection().notifyWork(this);
     }
 
     @Override

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Mon Nov  7 15:15:00 2016
@@ -99,8 +99,6 @@ import org.apache.qpid.server.protocol.C
 import org.apache.qpid.server.protocol.ConsumerListener;
 import org.apache.qpid.server.protocol.PublishAuthorisationCache;
 import org.apache.qpid.server.queue.QueueArgumentsConverter;
-import org.apache.qpid.server.queue.QueueConsumer;
-import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.store.MessageHandle;
 import org.apache.qpid.server.store.MessageStore;
@@ -879,12 +877,12 @@ public class AMQChannel
         {
             for(ConsumerImpl sub : subs)
             {
-                sub.close();
                 if (sub instanceof Consumer<?>)
                 {
                     _consumers.remove(sub);
                 }
             }
+            target.close();
             return true;
         }
         else
@@ -1075,19 +1073,12 @@ public class AMQChannel
         if (subscriber != null && subscriber.getSessionModel() == this)
         {
             ConsumerTarget target = subscriber.getTarget();
-            target.getSendLock();
-            try
-            {
-                if (target.getState() != ConsumerTarget.State.CLOSED)
-                {
-                    target.send(subscriber, messageInstance, false);
-                    return true;
-                }
-            }
-            finally
+            if (target.getState() != ConsumerTarget.State.CLOSED)
             {
-                target.releaseSendLock();
+                target.send(subscriber, messageInstance, false);
+                return true;
             }
+
         }
         return false;
     }
@@ -1221,11 +1212,6 @@ public class AMQChannel
                 messageWithSubject(ChannelMessages.FLOW("Started"));
             }
 
-
-            // This section takes two different approaches to perform to perform
-            // the same function. Ensuring that the Subscription has taken note
-            // of the change in Channel State
-
             // Here we have become unsuspended and so we ask each the queue to
             // perform an Async delivery for each of the subscriptions in this
             // Channel. The alternative would be to ensure that the subscription
@@ -1244,20 +1230,6 @@ public class AMQChannel
                 }
             }
 
-
-            // Here we have become suspended so we need to ensure that each of
-            // the Subscriptions has noticed this change so that we can be sure
-            // they are not still sending messages. Again the code here is a
-            // very simplistic approach to ensure that the change of suspension
-            // has been noticed by each of the Subscriptions. Unlike the above
-            // case we don't actually need to do anything else.
-            if (!wasSuspended)
-            {
-                // may need to deliver queued messages
-                ensureConsumersNoticedStateChange();
-            }
-
-
             // Log Suspension only after we have confirmed all suspensions are
             // stopped.
             if (suspended && _logChannelFlowMessages)
@@ -1324,9 +1296,6 @@ public class AMQChannel
         boolean requiresSuspend = _suspended.compareAndSet(false,true);  // TODO This is probably superfluous owing to the
         // message assignment suspended logic in NBC.
 
-        // ensure all subscriptions have seen the change to the channel state
-        ensureConsumersNoticedStateChange();
-
         try
         {
             _transaction.rollback();
@@ -1381,7 +1350,7 @@ public class AMQChannel
         return _closing.get();
     }
 
-    public AMQPConnection_0_8 getConnection()
+    public AMQPConnection_0_8<?> getConnection()
     {
         return _connection;
     }
@@ -1441,7 +1410,7 @@ public class AMQChannel
     }
 
     @Override
-    public AMQPConnection_0_8 getAMQPConnection()
+    public AMQPConnection_0_8<?> getAMQPConnection()
     {
         return _connection;
     }
@@ -1731,7 +1700,7 @@ public class AMQChannel
                 messageWithSubject(ChannelMessages.FLOW_ENFORCED("** All Queues **"));
 
 
-                getConnection().notifyWork();
+                getConnection().notifyWork(this);
             }
         }
     }
@@ -1743,7 +1712,7 @@ public class AMQChannel
             if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false))
             {
                 messageWithSubject(ChannelMessages.FLOW_REMOVED());
-                getConnection().notifyWork();
+                getConnection().notifyWork(this);
             }
         }
     }
@@ -1757,7 +1726,7 @@ public class AMQChannel
             if(_blocking.compareAndSet(false,true))
             {
                 messageWithSubject(ChannelMessages.FLOW_ENFORCED(queue.getName()));
-                getConnection().notifyWork();
+                getConnection().notifyWork(this);
 
             }
         }
@@ -1770,7 +1739,7 @@ public class AMQChannel
             if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false) && !isClosing())
             {
                 messageWithSubject(ChannelMessages.FLOW_REMOVED());
-                getConnection().notifyWork();
+                getConnection().notifyWork(this);
             }
         }
     }
@@ -3815,19 +3784,9 @@ public class AMQChannel
     }
 
     @Override
-    public void ensureConsumersNoticedStateChange()
+    public void notifyWork(final ConsumerTarget target)
     {
-        for (ConsumerTarget_0_8 consumerTarget : getConsumerTargets())
-        {
-            try
-            {
-                consumerTarget.getSendLock();
-            }
-            finally
-            {
-                consumerTarget.releaseSendLock();
-            }
-        }
+        getAMQPConnection().notifyWork(this);
     }
 
     private Collection<ConsumerTarget_0_8> getConsumerTargets()

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java Mon Nov  7 15:15:00 2016
@@ -86,7 +86,7 @@ import org.apache.qpid.transport.ByteBuf
 import org.apache.qpid.transport.TransportException;
 
 public class AMQPConnection_0_8Impl
-        extends AbstractAMQPConnection<AMQPConnection_0_8Impl>
+        extends AbstractAMQPConnection<AMQPConnection_0_8Impl, AMQPConnection_0_8Impl>
         implements ServerMethodProcessor<ServerChannelMethodProcessor>, AMQPConnection_0_8<AMQPConnection_0_8Impl>
 {
 
@@ -812,12 +812,14 @@ public class AMQPConnection_0_8Impl
         addAsyncTask(action);
     }
 
-    private void addAsyncTask(final Action<AMQPConnection_0_8Impl> action)
+    @Override
+    protected void addAsyncTask(final Action<? super AMQPConnection_0_8Impl> action)
     {
         _asyncTaskList.add(action);
         notifyWork();
     }
 
+    @Override
     public void block()
     {
         synchronized (_channelAddRemoveLock)
@@ -1391,6 +1393,12 @@ public class AMQPConnection_0_8Impl
     }
 
     @Override
+    public void notifyWork(final AMQSessionModel<?> sessionModel)
+    {
+        notifyWork();
+    }
+
+    @Override
     public void clearWork()
     {
         _stateChanged.set(false);

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Mon Nov  7 15:15:00 2016
@@ -54,7 +54,6 @@ public abstract class ConsumerTarget_0_8
 
     private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
     private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
-    private final AtomicBoolean _needToClose = new AtomicBoolean();
     private final String _targetAddress;
 
 
@@ -461,41 +460,13 @@ public abstract class ConsumerTarget_0_8
 
     public void queueEmpty()
     {
-        if (isAutoClose())
-        {
-            _needToClose.set(true);
-            getChannel().getConnection().notifyWork();
-        }
-    }
-
-    @Override
-    protected void processClosed()
-    {
-        if (hasClosed())
+        if(isAutoClose() && getState() != State.CLOSED)
         {
             close();
             confirmAutoClose();
         }
     }
 
-    @Override
-    protected void processStateChanged()
-    {
-
-    }
-
-    @Override
-    protected boolean hasStateChanged()
-    {
-        return false;
-    }
-
-    @Override
-    protected boolean hasClosed()
-    {
-        return (_needToClose.get() && getState() != State.CLOSED);
-    }
-
     public void flushBatched()
     {
         _channel.getConnection().setDeferFlush(false);

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Mon Nov  7 15:15:00 2016
@@ -110,7 +110,7 @@ import org.apache.qpid.server.virtualhos
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.server.transport.AggregateTicker;
 
-public class AMQPConnection_1_0 extends AbstractAMQPConnection<AMQPConnection_1_0>
+public class AMQPConnection_1_0 extends AbstractAMQPConnection<AMQPConnection_1_0, ConnectionHandler>
         implements FrameOutputHandler, DescribedTypeConstructorRegistry.Source,
                    ValueWriter.Registry.Source,
                    ErrorHandler,
@@ -1377,6 +1377,12 @@ public class AMQPConnection_1_0 extends
     }
 
     @Override
+    public void notifyWork(final AMQSessionModel<?> sessionModel)
+    {
+        notifyWork();
+    }
+
+    @Override
     public void clearWork()
     {
         _stateChanged.set(false);
@@ -1444,7 +1450,8 @@ public class AMQPConnection_1_0 extends
         return _orderlyClose.get();
     }
 
-    private void addAsyncTask(final Action<ConnectionHandler> action)
+    @Override
+    protected void addAsyncTask(final Action<? super ConnectionHandler> action)
     {
         _asyncTaskList.add(action);
         notifyWork();

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Mon Nov  7 15:15:00 2016
@@ -21,12 +21,20 @@
 package org.apache.qpid.server.protocol.v1_0;
 
 import java.nio.ByteBuffer;
-import java.util.Arrays;
 import java.util.Collection;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.consumer.AbstractConsumerTarget;
+import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.LinkRegistry;
+import org.apache.qpid.server.protocol.MessageConverterRegistry;
 import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoderImpl;
@@ -34,7 +42,6 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
-import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.Target;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
@@ -42,20 +49,10 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Modified;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Released;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
 import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.transport.ProtocolEngine;
-import org.apache.qpid.server.consumer.AbstractConsumerTarget;
-import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.plugin.MessageConverter;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.LinkRegistry;
-import org.apache.qpid.server.protocol.MessageConverterRegistry;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 
@@ -303,7 +300,11 @@ class ConsumerTarget_1_0 extends Abstrac
 
     public void queueEmpty()
     {
-        _queueEmpty = true;
+        if(_link.drained())
+        {
+            updateState(State.ACTIVE, State.SUSPENDED);
+        }
+
     }
 
     public void flowStateChanged()
@@ -528,38 +529,6 @@ class ConsumerTarget_1_0 extends Abstrac
     }
 
     @Override
-    protected void processClosed()
-    {
-
-    }
-
-    @Override
-    protected void processStateChanged()
-    {
-        if(_queueEmpty)
-        {
-            _queueEmpty = false;
-
-            if(_link.drained())
-            {
-                updateState(State.ACTIVE, State.SUSPENDED);
-            }
-        }
-    }
-
-    @Override
-    protected boolean hasStateChanged()
-    {
-        return _queueEmpty;
-    }
-
-    @Override
-    protected boolean hasClosed()
-    {
-        return false;
-    }
-
-    @Override
     public String toString()
     {
         return "ConsumerTarget_1_0[linkSession=" + _link.getSession().toLogString() + "]";

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Mon Nov  7 15:15:00 2016
@@ -1578,20 +1578,9 @@ public class Session_1_0 implements AMQS
     }
 
     @Override
-    public void ensureConsumersNoticedStateChange()
+    public void notifyWork(final ConsumerTarget target)
     {
-        for(SendingLink_1_0 link : _sendingLinks)
-        {
-            ConsumerTarget_1_0 consumerTarget = link.getConsumerTarget();
-            try
-            {
-                consumerTarget.getSendLock();
-            }
-            finally
-            {
-                consumerTarget.releaseSendLock();
-            }
-        }
+        getAMQPConnection().notifyWork(this);
     }
 
     @Override

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/logging-logback/src/main/java/org/apache/qpid/server/logging/logback/BrokerFileLoggerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/logging-logback/src/main/java/org/apache/qpid/server/logging/logback/BrokerFileLoggerImpl.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/logging-logback/src/main/java/org/apache/qpid/server/logging/logback/BrokerFileLoggerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/logging-logback/src/main/java/org/apache/qpid/server/logging/logback/BrokerFileLoggerImpl.java Mon Nov  7 15:15:00 2016
@@ -34,23 +34,24 @@ import ch.qos.logback.classic.spi.ILoggi
 import ch.qos.logback.core.Appender;
 import ch.qos.logback.core.Context;
 import ch.qos.logback.core.rolling.RollingFileAppender;
-
 import ch.qos.logback.core.status.Status;
 import ch.qos.logback.core.status.StatusListener;
 import ch.qos.logback.core.status.StatusManager;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.logging.LogFileDetails;
 import org.apache.qpid.server.logging.messages.BrokerMessages;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Content;
 import org.apache.qpid.server.model.ManagedAttributeField;
 import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
-import org.apache.qpid.server.model.Content;
 import org.apache.qpid.server.model.Param;
 import org.apache.qpid.server.model.SystemConfig;
 import org.apache.qpid.server.util.DaemonThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class BrokerFileLoggerImpl extends AbstractBrokerLogger<BrokerFileLoggerImpl>
         implements BrokerFileLogger<BrokerFileLoggerImpl>, FileLoggerSettings
@@ -209,14 +210,14 @@ public class BrokerFileLoggerImpl extend
     }
 
     @Override
-    protected void onClose()
+    protected ListenableFuture<Void> onClose()
     {
-        super.onClose();
-
         if (_statusManager != null)
         {
             _statusManager.remove(_logbackStatusListener);
         }
+        return Futures.immediateFuture(null);
+
     }
 
     static class BrokerFileLoggerStatusListener implements StatusListener

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java Mon Nov  7 15:15:00 2016
@@ -73,24 +73,16 @@ class ManagementNodeConsumer implements
     @Override
     public AbstractQueue.MessageContainer pullMessage()
     {
-        _target.getSendLock();
-        try
+        if (!_queue.isEmpty())
         {
-            if (!_queue.isEmpty())
-            {
 
-                final ManagementResponse managementResponse = _queue.get(0);
-                if (!_target.isSuspended() && _target.allocateCredit(managementResponse.getMessage()))
-                {
-                    _queue.remove(0);
-                    return new AbstractQueue.MessageContainer(managementResponse, null);
-                }
+            final ManagementResponse managementResponse = _queue.get(0);
+            if (!_target.isSuspended() && _target.allocateCredit(managementResponse.getMessage()))
+            {
+                _queue.remove(0);
+                return new AbstractQueue.MessageContainer(managementResponse, null);
             }
         }
-        finally
-        {
-            _target.releaseSendLock();
-        }
         return null;
     }
 
@@ -165,24 +157,6 @@ class ManagementNodeConsumer implements
     {
     }
 
-    @Override
-    public boolean trySendLock()
-    {
-        return _target.trySendLock();
-    }
-
-    @Override
-    public void getSendLock()
-    {
-        _target.getSendLock();
-    }
-
-    @Override
-    public void releaseSendLock()
-    {
-        _target.releaseSendLock();
-    }
-
 
     @Override
     public boolean isActive()

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java Mon Nov  7 15:15:00 2016
@@ -226,7 +226,7 @@ public class HttpManagement extends Abst
     }
 
     @Override
-    protected void onClose()
+    protected ListenableFuture<Void> onClose()
     {
         if (_server != null)
         {
@@ -242,6 +242,7 @@ public class HttpManagement extends Abst
         }
 
         getBroker().getEventLogger().message(ManagementConsoleMessages.STOPPED(OPERATIONAL_LOGGING_NAME));
+        return Futures.immediateFuture(null);
     }
 
     public int getSessionTimeout()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org