You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/15 14:16:10 UTC

svn commit: r1769837 [3/4] - in /qpid/java/trunk: ./ bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/ broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/flow/ broker-core...

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Tue Nov 15 14:16:10 2016
@@ -40,10 +40,10 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AbstractQueue;
 import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.StateChangeListener;
@@ -88,7 +88,7 @@ public abstract class AbstractSystemMess
                                 final String consumerName,
                                 final EnumSet<ConsumerImpl.Option> options, final Integer priority)
             throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
-                   ConsumerAccessRefused
+                   ConsumerAccessRefused, QueueDeleted
     {
         final Consumer consumer = new Consumer(consumerName, target);
         target.consumerAdded(consumer);
@@ -116,21 +116,21 @@ public abstract class AbstractSystemMess
                 Collections.synchronizedList(new ArrayList<PropertiesMessageInstance>());
         private final ConsumerTarget _target;
         private final String _name;
-        private final StateChangeListener<ConsumerTarget, ConsumerTarget.State> _targetChangeListener =
-                new Consumer.TargetChangeListener();
 
 
         public Consumer(final String consumerName, ConsumerTarget target)
         {
             _name = consumerName;
             _target = target;
-            target.addStateListener(_targetChangeListener);
         }
 
         @Override
         public void externalStateChange()
         {
-
+            if(!_queue.isEmpty())
+            {
+                _target.notifyWork();
+            }
         }
 
         @Override
@@ -140,43 +140,27 @@ public abstract class AbstractSystemMess
         }
 
         @Override
-        public boolean hasAvailableMessages()
+        public AbstractQueue.MessageContainer pullMessage()
         {
-            return !_queue.isEmpty();
-        }
-
-        @Override
-        public void pullMessage()
-        {
-            AMQPConnection<?> connection = _target.getSessionModel().getAMQPConnection();
-            _target.getSendLock();
-            try
+            if (!_queue.isEmpty())
             {
-                connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(true);
-
-                try
-                {
-                    if (!_queue.isEmpty())
-                    {
-                        final PropertiesMessageInstance propertiesMessageInstance = _queue.get(0);
-                        if (!_target.isSuspended() && _target.allocateCredit(propertiesMessageInstance.getMessage()))
-                        {
-                            _queue.remove(0);
-                            _target.send(this, propertiesMessageInstance, false);
-                        }
-                    }
-                }
-                finally
+                final PropertiesMessageInstance propertiesMessageInstance = _queue.get(0);
+                if (!_target.isSuspended() && _target.allocateCredit(propertiesMessageInstance.getMessage()))
                 {
-                    connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(false);
+                    _queue.remove(0);
+                    return new AbstractQueue.MessageContainer(propertiesMessageInstance, null);
                 }
             }
-            finally
+            return null;
+        }
+
+        @Override
+        public void setNotifyWorkDesired(final boolean desired)
+        {
+            if (desired && !_queue.isEmpty())
             {
-                _target.releaseSendLock();
+                _target.notifyWork();
             }
-
-
         }
 
         @Override
@@ -224,7 +208,7 @@ public abstract class AbstractSystemMess
         @Override
         public boolean isSuspended()
         {
-            return false;
+            return !isActive();
         }
 
         @Override
@@ -252,27 +236,9 @@ public abstract class AbstractSystemMess
         }
 
         @Override
-        public boolean trySendLock()
-        {
-            return _target.trySendLock();
-        }
-
-        @Override
-        public void getSendLock()
-        {
-            _target.getSendLock();
-        }
-
-        @Override
-        public void releaseSendLock()
-        {
-            _target.releaseSendLock();
-        }
-
-        @Override
         public boolean isActive()
         {
-            return false;
+            return _target.isNotifyWorkDesired();
         }
 
         @Override
@@ -281,85 +247,11 @@ public abstract class AbstractSystemMess
             return _name;
         }
 
-        @Override
-        public void flush()
-        {
-            AMQPConnection<?> connection = getSessionModel().getAMQPConnection();
-            try
-            {
-                connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(true);
-                deliverMessages();
-                _target.processPending();
-            }
-            finally
-            {
-                connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(false);
-            }
-        }
-
-
         public void send(final InternalMessage response)
         {
-            _target.getSendLock();
-            try
-            {
-                final PropertiesMessageInstance
-                        responseEntry = new PropertiesMessageInstance(this, response);
-                if (_queue.isEmpty() && _target.allocateCredit(response))
-                {
-                    _target.send(this, responseEntry, false);
-                }
-                else
-                {
-                    _queue.add(responseEntry);
-                }
-            }
-            finally
-            {
-                _target.releaseSendLock();
-            }
+            _queue.add(new PropertiesMessageInstance(this, response));
+            _target.notifyWork();
         }
-
-        private class TargetChangeListener implements StateChangeListener<ConsumerTarget, ConsumerTarget.State>
-        {
-            @Override
-            public void stateChanged(final ConsumerTarget object,
-                                     final ConsumerTarget.State oldState,
-                                     final ConsumerTarget.State newState)
-            {
-                if (newState == ConsumerTarget.State.ACTIVE)
-                {
-                    deliverMessages();
-                }
-            }
-        }
-
-        private void deliverMessages()
-        {
-            _target.getSendLock();
-            try
-            {
-                while (!_queue.isEmpty())
-                {
-
-                    final PropertiesMessageInstance propertiesMessageInstance = _queue.get(0);
-                    if (!_target.isSuspended() && _target.allocateCredit(propertiesMessageInstance.getMessage()))
-                    {
-                        _queue.remove(0);
-                        _target.send(this, propertiesMessageInstance, false);
-                    }
-                    else
-                    {
-                        break;
-                    }
-                }
-            }
-            finally
-            {
-                _target.releaseSendLock();
-            }
-        }
-
     }
 
     class PropertiesMessageInstance implements MessageInstance
@@ -549,12 +441,6 @@ public abstract class AbstractSystemMess
         }
 
         @Override
-        public boolean resend()
-        {
-            return false;
-        }
-
-        @Override
         public void delete()
         {
             _isDeleted = true;

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Tue Nov 15 14:16:10 2016
@@ -1173,9 +1173,10 @@ public abstract class AbstractVirtualHos
      */
     public void scheduleHouseKeepingTask(long period, HouseKeepingTask task)
     {
-        _houseKeepingTaskExecutor.scheduleAtFixedRate(task, period / 2, period, TimeUnit.MILLISECONDS);
+        task.setFuture(_houseKeepingTaskExecutor.scheduleAtFixedRate(task, period / 2, period, TimeUnit.MILLISECONDS));
     }
 
+
     public ScheduledFuture<?> scheduleTask(long delay, Runnable task)
     {
         return _houseKeepingTaskExecutor.schedule(task, delay, TimeUnit.MILLISECONDS);
@@ -1444,7 +1445,7 @@ public abstract class AbstractVirtualHos
     }
 
     @Override
-    protected void onClose()
+    protected ListenableFuture<Void> onClose()
     {
         _dtxRegistry.close();
         shutdownHouseKeeping();
@@ -1455,6 +1456,7 @@ public abstract class AbstractVirtualHos
         _eventLogger.message(VirtualHostMessages.CLOSED(getName()));
 
         stopLogging(_virtualHostLoggersToClose);
+        return Futures.immediateFuture(null);
     }
 
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java Tue Nov 15 14:16:10 2016
@@ -23,6 +23,7 @@ package org.apache.qpid.server.virtualho
 import java.security.AccessControlContext;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
+import java.util.concurrent.ScheduledFuture;
 
 import org.apache.qpid.server.model.VirtualHost;
 
@@ -30,6 +31,7 @@ public abstract class HouseKeepingTask i
 {
     private final String _name;
     private final AccessControlContext _accessControlContext;
+    private ScheduledFuture<?> _future;
 
     public HouseKeepingTask(String name, VirtualHost vhost, AccessControlContext context)
     {
@@ -65,4 +67,17 @@ public abstract class HouseKeepingTask i
     /** Execute the plugin. */
     public abstract void execute();
 
+    void setFuture(final ScheduledFuture<?> future)
+    {
+        _future = future;
+    }
+
+    public synchronized void cancel()
+    {
+        if(_future != null)
+        {
+            _future.cancel(false);
+            _future = null;
+        }
+    }
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java Tue Nov 15 14:16:10 2016
@@ -53,7 +53,7 @@ public class VirtualHostPropertiesNode e
                                 final String consumerName,
                                 final EnumSet<ConsumerImpl.Option> options, final Integer priority)
             throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
-                   ConsumerAccessRefused
+                   ConsumerAccessRefused, QueueDeleted
     {
         final Consumer consumer = super.addConsumer(target, filters, messageClass, consumerName, options, priority);
         consumer.send(createMessage());
@@ -66,10 +66,10 @@ public class VirtualHostPropertiesNode e
 
         Map<String, Object> headers = new HashMap<>();
 
-        final List<String> globalAddresseDomains = _addressSpace.getGlobalAddressDomains();
-        if (globalAddresseDomains != null && !globalAddresseDomains.isEmpty())
+        final List<String> globalAddressDomains = _addressSpace.getGlobalAddressDomains();
+        if (globalAddressDomains != null && !globalAddressDomains.isEmpty())
         {
-            String primaryDomain = globalAddresseDomains.get(0);
+            String primaryDomain = globalAddressDomains.get(0);
             if(primaryDomain != null)
             {
                 primaryDomain = primaryDomain.trim();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java Tue Nov 15 14:16:10 2016
@@ -405,10 +405,11 @@ public abstract class AbstractVirtualHos
     }
 
     @Override
-    protected void onClose()
+    protected ListenableFuture<Void> onClose()
     {
         closeConfigurationStore();
         _virtualHostExecutor.stop();
+        return Futures.immediateFuture(null);
     }
 
     private void closeConfigurationStore()

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java Tue Nov 15 14:16:10 2016
@@ -142,8 +142,9 @@ public class ManagementModeStoreHandlerT
             }
 
             @Override
-            protected void onClose()
+            protected ListenableFuture<Void> onClose()
             {
+                return Futures.immediateFuture(null);
             }
 
             @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.QUIESCED)

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Tue Nov 15 14:16:10 2016
@@ -48,7 +48,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.consumer.MockConsumer;
+import org.apache.qpid.server.consumer.TestConsumerTarget;
 import org.apache.qpid.server.exchange.DirectExchange;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.InstanceProperties;
@@ -61,6 +61,7 @@ import org.apache.qpid.server.model.Brok
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.QueueNotificationListener;
+import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.queue.AbstractQueue.QueueEntryFilter;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.util.Action;
@@ -71,14 +72,13 @@ import org.apache.qpid.test.utils.QpidTe
 abstract class AbstractQueueTestBase extends QpidTestCase
 {
     private static final Logger _logger = LoggerFactory.getLogger(AbstractQueueTestBase.class);
-    private long _queueRunnerWaitTime;
     private Queue<?> _queue;
     private QueueManagingVirtualHost<?> _virtualHost;
     private String _qname = "qname";
     private String _owner = "owner";
     private String _routingKey = "routing key";
     private DirectExchange _exchange;
-    private MockConsumer _consumerTarget = new MockConsumer();
+    private TestConsumerTarget _consumerTarget = new TestConsumerTarget();
     private QueueConsumer<?> _consumer;
     private Map<String,Object> _arguments = Collections.emptyMap();
 
@@ -97,8 +97,6 @@ abstract class AbstractQueueTestBase ext
         _queue = _virtualHost.createChild(Queue.class, attributes);
 
         _exchange = (DirectExchange) _virtualHost.getChildByName(Exchange.class, ExchangeDefaults.DIRECT_EXCHANGE_NAME);
-        _queueRunnerWaitTime = Long.getLong("AbstractQueueTestBase.queueRunnerWaitTime", 150L);
-        _logger.debug("Using AbstractQueueTestBase.queueRunnerWaitTime {}", _queueRunnerWaitTime);
     }
 
     @Override
@@ -182,7 +180,7 @@ abstract class AbstractQueueTestBase ext
 
         // Check sending a message ends up with the subscriber
         _queue.enqueue(messageA, null, null);
-        Thread.sleep(_queueRunnerWaitTime);
+        while(_consumerTarget.processPending());
 
         assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
         assertNull(_consumer.getQueueContext().getReleasedEntry());
@@ -207,7 +205,7 @@ abstract class AbstractQueueTestBase ext
         _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
                                                           EnumSet.of(ConsumerImpl.Option.ACQUIRES,
                                                   ConsumerImpl.Option.SEES_REQUEUES), 0);
-        Thread.sleep(_queueRunnerWaitTime);
+        while(_consumerTarget.processPending());
         assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
         assertNull("There should be no releasedEntry after an enqueue",
                    _consumer.getQueueContext().getReleasedEntry());
@@ -225,7 +223,7 @@ abstract class AbstractQueueTestBase ext
         _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
                                                           EnumSet.of(ConsumerImpl.Option.ACQUIRES,
                                                   ConsumerImpl.Option.SEES_REQUEUES), 0);
-        Thread.sleep(_queueRunnerWaitTime);
+        while(_consumerTarget.processPending());
         assertEquals(messageB, _consumer.getQueueContext().getLastSeenEntry().getMessage());
         assertNull("There should be no releasedEntry after enqueues",
                    _consumer.getQueueContext().getReleasedEntry());
@@ -248,12 +246,12 @@ abstract class AbstractQueueTestBase ext
         _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
                                                           EnumSet.of(ConsumerImpl.Option.ACQUIRES,
                                                                      ConsumerImpl.Option.SEES_REQUEUES), 0);
-        Thread.sleep(_queueRunnerWaitTime);
+        while(_consumerTarget.processPending());
 
         assertEquals("Message which was not yet valid was received", 0, _consumerTarget.getMessages().size());
         when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()-100L);
         _queue.checkMessageStatus();
-        Thread.sleep(_queueRunnerWaitTime);
+        while(_consumerTarget.processPending());
         assertEquals("Message which was valid was not received", 1, _consumerTarget.getMessages().size());
     }
 
@@ -274,7 +272,7 @@ abstract class AbstractQueueTestBase ext
         _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
                                                           EnumSet.of(ConsumerImpl.Option.ACQUIRES,
                                                                      ConsumerImpl.Option.SEES_REQUEUES), 0);
-        Thread.sleep(_queueRunnerWaitTime);
+        while(_consumerTarget.processPending());
 
         assertEquals("Message was held despite queue not having holding enabled", 1, _consumerTarget.getMessages().size());
 
@@ -300,14 +298,14 @@ abstract class AbstractQueueTestBase ext
         _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
                                                           EnumSet.of(ConsumerImpl.Option.ACQUIRES,
                                                                      ConsumerImpl.Option.SEES_REQUEUES), 0);
-        Thread.sleep(_queueRunnerWaitTime);
+        while(_consumerTarget.processPending());
 
         assertEquals("Expect one message (message B)", 1, _consumerTarget.getMessages().size());
         assertEquals("Wrong message received", messageB.getMessageHeader().getMessageId(), _consumerTarget.getMessages().get(0).getMessage().getMessageHeader().getMessageId());
 
         when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()-100L);
         _queue.checkMessageStatus();
-        Thread.sleep(_queueRunnerWaitTime);
+        while(_consumerTarget.processPending());
         assertEquals("Message which was valid was not received", 2, _consumerTarget.getMessages().size());
         assertEquals("Wrong message received", messageA.getMessageHeader().getMessageId(), _consumerTarget.getMessages().get(1).getMessage().getMessageHeader().getMessageId());
 
@@ -338,7 +336,7 @@ abstract class AbstractQueueTestBase ext
         _queue.enqueue(messageB, postEnqueueAction, null);
         _queue.enqueue(messageC, postEnqueueAction, null);
 
-        Thread.sleep(_queueRunnerWaitTime);  // Work done by QueueRunner Thread
+        while(_consumerTarget.processPending());
 
         assertEquals("Unexpected total number of messages sent to consumer",
                      3,
@@ -351,7 +349,7 @@ abstract class AbstractQueueTestBase ext
 
         queueEntries.get(0).release();
 
-        Thread.sleep(_queueRunnerWaitTime); // Work done by QueueRunner Thread
+        while(_consumerTarget.processPending());
 
         assertEquals("Unexpected total number of messages sent to consumer",
                      4,
@@ -372,8 +370,15 @@ abstract class AbstractQueueTestBase ext
     {
         ServerMessage messageA = createMessage(new Long(24));
         final CountDownLatch sendIndicator = new CountDownLatch(1);
-        _consumerTarget = new MockConsumer()
+        _consumerTarget = new TestConsumerTarget()
         {
+
+            @Override
+            public void notifyWork()
+            {
+                while(processPending());
+            }
+
             @Override
             public long send(ConsumerImpl consumer, MessageInstance entry, boolean batch)
             {
@@ -397,7 +402,7 @@ abstract class AbstractQueueTestBase ext
 
         /* Enqueue one message with expiration set for a short time in the future */
 
-        final long expiration = System.currentTimeMillis() + _queueRunnerWaitTime;
+        final long expiration = System.currentTimeMillis() + 100L;
         when(messageA.getExpiration()).thenReturn(expiration);
 
         _queue.enqueue(messageA, postEnqueueAction, null);
@@ -472,7 +477,7 @@ abstract class AbstractQueueTestBase ext
         _queue.enqueue(messageB, postEnqueueAction, null);
         _queue.enqueue(messageC, postEnqueueAction, null);
 
-        Thread.sleep(_queueRunnerWaitTime);  // Work done by QueueRunner Thread
+        while(_consumerTarget.processPending());
 
         assertEquals("Unexpected total number of messages sent to consumer",
                      3,
@@ -486,7 +491,7 @@ abstract class AbstractQueueTestBase ext
         queueEntries.get(2).release();
         queueEntries.get(0).release();
 
-        Thread.sleep(_queueRunnerWaitTime); // Work done by QueueRunner Thread
+        while(_consumerTarget.processPending());
 
         assertEquals("Unexpected total number of messages sent to consumer",
                      5,
@@ -508,8 +513,8 @@ abstract class AbstractQueueTestBase ext
         ServerMessage messageA = createMessage(new Long(24));
         ServerMessage messageB = createMessage(new Long(25));
 
-        MockConsumer target1 = new MockConsumer();
-        MockConsumer target2 = new MockConsumer();
+        TestConsumerTarget target1 = new TestConsumerTarget();
+        TestConsumerTarget target2 = new TestConsumerTarget();
 
 
         QueueConsumer consumer1 = (QueueConsumer) _queue.addConsumer(target1, null, messageA.getClass(), "test",
@@ -529,7 +534,8 @@ abstract class AbstractQueueTestBase ext
         _queue.enqueue(messageA, postEnqueueAction, null);
         _queue.enqueue(messageB, postEnqueueAction, null);
 
-        Thread.sleep(_queueRunnerWaitTime);  // Work done by QueueRunner Thread
+        while(target1.processPending());
+        while(target2.processPending());
 
         assertEquals("Unexpected total number of messages sent to both after enqueue",
                      2,
@@ -538,7 +544,8 @@ abstract class AbstractQueueTestBase ext
         /* Now release the first message only, causing it to be requeued */
         queueEntries.get(0).release();
 
-        Thread.sleep(_queueRunnerWaitTime); // Work done by QueueRunner Thread
+        while(target1.processPending());
+        while(target2.processPending());
 
         assertEquals("Unexpected total number of messages sent to both consumers after release",
                      3,
@@ -566,20 +573,13 @@ abstract class AbstractQueueTestBase ext
         // Check sending a message ends up with the subscriber
         _queue.enqueue(messageA, null, null);
 
-        final long timeout = System.currentTimeMillis() + _queueRunnerWaitTime;
-
-        QueueEntry lastSeen = null;
-        while (timeout > System.currentTimeMillis() &&
-               ((lastSeen = _consumer.getQueueContext().getLastSeenEntry()) == null || lastSeen.getMessage() == null))
-        {
-            Thread.sleep(10);
-        }
+        while(_consumerTarget.processPending());
 
-        assertEquals("Queue context did not see expected message within timeout",
+        assertEquals("Queue context did not see expected message",
                      messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
 
         // Check we cannot add a second subscriber to the queue
-        MockConsumer subB = new MockConsumer();
+        TestConsumerTarget subB = new TestConsumerTarget();
         Exception ex = null;
         try
         {
@@ -616,32 +616,6 @@ abstract class AbstractQueueTestBase ext
         assertNotNull(ex);
     }
 
-
-    public void testResend() throws Exception
-    {
-        Long id = new Long(26);
-        ServerMessage message = createMessage(id);
-
-        _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, message.getClass(), "test",
-                                                          EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES),
-                                                          0);
-
-        _queue.enqueue(message, new Action<MessageInstance>()
-        {
-            @Override
-            public void performAction(final MessageInstance object)
-            {
-                QueueEntryImpl entry = (QueueEntryImpl) object;
-                entry.setRedelivered();
-                _consumer.resend(entry);
-
-            }
-        }, null);
-
-
-
-    }
-
     public void testGetFirstMessageId() throws Exception
     {
         // Create message
@@ -1062,14 +1036,7 @@ abstract class AbstractQueueTestBase ext
             queue.enqueue(message,null, null);
 
         }
-        try
-        {
-            Thread.sleep(2000L);
-        }
-        catch (InterruptedException e)
-        {
-            _logger.error("Thread interrupted", e);
-        }
+
     }
 
     /**
@@ -1113,7 +1080,7 @@ abstract class AbstractQueueTestBase ext
         _queue = queue;
     }
 
-    public MockConsumer getConsumer()
+    public TestConsumerTarget getConsumer()
     {
         return _consumerTarget;
     }
@@ -1230,14 +1197,9 @@ abstract class AbstractQueueTestBase ext
         return _exchange;
     }
 
-    public MockConsumer getConsumerTarget()
+    public TestConsumerTarget getConsumerTarget()
     {
         return _consumerTarget;
     }
 
-    public long getQueueRunnerWaitTime()
-    {
-        return _queueRunnerWaitTime;
-    }
-
 }

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java Tue Nov 15 14:16:10 2016
@@ -162,12 +162,6 @@ public class MockMessageInstance impleme
     {
     }
 
-    @Override
-    public boolean resend()
-    {
-        return false;
-    }
-
 
     public void setRedelivered()
     {

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java Tue Nov 15 14:16:10 2016
@@ -64,7 +64,8 @@ public class PriorityQueueTest extends A
 
         // Register subscriber
         queue.addConsumer(getConsumer(), null, null, "test", EnumSet.noneOf(ConsumerImpl.Option.class), 0);
-        Thread.sleep(getQueueRunnerWaitTime());
+
+        while(getConsumer().processPending());
 
         ArrayList<MessageInstance> msgs = getConsumer().getMessages();
         try

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java Tue Nov 15 14:16:10 2016
@@ -20,6 +20,7 @@
 */
 package org.apache.qpid.server.queue;
 
+import static org.apache.qpid.server.model.Queue.QUEUE_SCAVANGE_COUNT;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -48,15 +49,11 @@ public class StandardQueueEntryListTest
     private StandardQueueImpl _testQueue;
     private StandardQueueEntryList _sqel;
 
-    private static final String SCAVENGE_PROP = "qpid.queue.scavenge_count";
-    private String oldScavengeValue = null;
     private ConfiguredObjectFactoryImpl _factory;
 
     @Override
     protected void setUp()
     {
-        oldScavengeValue = System.setProperty(SCAVENGE_PROP, "9");
-
         Map<String,Object> queueAttributes = new HashMap<String, Object>();
         queueAttributes.put(Queue.ID, UUID.randomUUID());
         queueAttributes.put(Queue.NAME, getName());
@@ -87,19 +84,6 @@ public class StandardQueueEntryListTest
     }
 
     @Override
-    protected void tearDown()
-    {
-        if(oldScavengeValue != null)
-        {
-            System.setProperty(SCAVENGE_PROP, oldScavengeValue);
-        }
-        else
-        {
-            System.clearProperty(SCAVENGE_PROP);
-        }
-    }
-
-    @Override
     public StandardQueueEntryList getTestList()
     {
         return getTestList(false);
@@ -165,7 +149,9 @@ public class StandardQueueEntryListTest
 
     public void testScavenge() throws Exception
     {
-        OrderedQueueEntryList sqel = new StandardQueueEntryList(mock(StandardQueue.class), new QueueStatistics());
+        StandardQueueImpl mockQueue = mock(StandardQueueImpl.class);
+        when(mockQueue.getContextValue(Integer.class, QUEUE_SCAVANGE_COUNT)).thenReturn(9);
+        OrderedQueueEntryList sqel = new StandardQueueEntryList(mockQueue, new QueueStatistics());
         ConcurrentMap<Integer,QueueEntry> entriesMap = new ConcurrentHashMap<Integer,QueueEntry>();
 
 

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Tue Nov 15 14:16:10 2016
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.queue;
 
-import java.security.AccessController;
 import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -31,7 +30,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.consumer.ConsumerTarget;
-import org.apache.qpid.server.consumer.MockConsumer;
+import org.apache.qpid.server.consumer.TestConsumerTarget;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.LifetimePolicy;
@@ -66,74 +65,6 @@ public class StandardQueueTest extends A
                    getQueue().isDeleted());
     }
 
-    public void testActiveConsumerCount() throws Exception
-    {
-
-        Map<String,Object> queueAttributes = new HashMap<>();
-        queueAttributes.put(Queue.NAME, "testActiveConsumerCount");
-        queueAttributes.put(Queue.OWNER, "testOwner");
-        final StandardQueueImpl queue = new StandardQueueImpl(queueAttributes, getVirtualHost());
-        queue.open();
-        //verify adding an active consumer increases the count
-        final MockConsumer consumer1 = new MockConsumer();
-        consumer1.setActive(true);
-        consumer1.setState(ConsumerTarget.State.ACTIVE);
-        assertEquals("Unexpected active consumer count", 0, queue.getConsumerCountWithCredit());
-        queue.addConsumer(consumer1,
-                          null,
-                          createMessage(-1l).getClass(),
-                          "test",
-                          EnumSet.of(ConsumerImpl.Option.ACQUIRES,
-                                     ConsumerImpl.Option.SEES_REQUEUES), 0);
-        assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
-
-        //verify adding an inactive consumer doesn't increase the count
-        final MockConsumer consumer2 = new MockConsumer();
-        consumer2.setActive(false);
-        consumer2.setState(ConsumerTarget.State.SUSPENDED);
-        assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
-        queue.addConsumer(consumer2,
-                          null,
-                          createMessage(-1l).getClass(),
-                          "test",
-                          EnumSet.of(ConsumerImpl.Option.ACQUIRES,
-                                     ConsumerImpl.Option.SEES_REQUEUES), 0);
-        assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
-
-        //verify behaviour in face of expected state changes:
-
-        //verify a consumer going suspended->active increases the count
-        consumer2.setState(ConsumerTarget.State.ACTIVE);
-        assertEquals("Unexpected active consumer count", 2, queue.getConsumerCountWithCredit());
-
-        //verify a consumer going active->suspended decreases the count
-        consumer2.setState(ConsumerTarget.State.SUSPENDED);
-        assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
-
-        //verify a consumer going suspended->closed doesn't change the count
-        consumer2.setState(ConsumerTarget.State.CLOSED);
-        assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
-
-        //verify a consumer going active->active doesn't change the count
-        consumer1.setState(ConsumerTarget.State.ACTIVE);
-        assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
-
-        consumer1.setState(ConsumerTarget.State.SUSPENDED);
-        assertEquals("Unexpected active consumer count", 0, queue.getConsumerCountWithCredit());
-
-        //verify a consumer going suspended->suspended doesn't change the count
-        consumer1.setState(ConsumerTarget.State.SUSPENDED);
-        assertEquals("Unexpected active consumer count", 0, queue.getConsumerCountWithCredit());
-
-        consumer1.setState(ConsumerTarget.State.ACTIVE);
-        assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
-
-        //verify a consumer going active->closed  decreases the count
-        consumer1.setState(ConsumerTarget.State.CLOSED);
-        assertEquals("Unexpected active consumer count", 0, queue.getConsumerCountWithCredit());
-
-    }
-
 
     /**
      * Tests that entry in dequeued state are not enqueued and not delivered to consumer
@@ -144,7 +75,7 @@ public class StandardQueueTest extends A
         AbstractQueue queue = new DequeuedQueue(getVirtualHost());
         queue.create();
         // create a consumer
-        MockConsumer consumer = new MockConsumer();
+        TestConsumerTarget consumer = new TestConsumerTarget();
 
         // register consumer
         queue.addConsumer(consumer,
@@ -156,6 +87,7 @@ public class StandardQueueTest extends A
 
         // put test messages into a queue
         putGivenNumberOfMessages(queue, 4);
+        while(consumer.processPending());
 
         // assert received messages
         List<MessageInstance> messages = consumer.getMessages();
@@ -167,8 +99,7 @@ public class StandardQueueTest extends A
     }
 
     /**
-     * Tests whether dequeued entry is sent to subscriber in result of
-     * invocation of {@link AbstractQueue#processQueue(QueueRunner)}
+     * Tests whether dequeued entry is sent to subscriber
      */
     public void testProcessQueueWithDequeuedEntry() throws Exception
     {
@@ -193,8 +124,15 @@ public class StandardQueueTest extends A
         final CountDownLatch latch = new CountDownLatch(messageNumber -1);
 
         // create a consumer
-        MockConsumer consumer = new MockConsumer()
+        TestConsumerTarget consumer = new TestConsumerTarget()
         {
+
+            @Override
+            public void notifyWork()
+            {
+                while(processPending());
+            }
+
             /**
              * Send a message and decrement latch
              * @param consumer
@@ -217,14 +155,6 @@ public class StandardQueueTest extends A
                               EnumSet.of(ConsumerImpl.Option.ACQUIRES,
                                          ConsumerImpl.Option.SEES_REQUEUES), 0);
 
-        // process queue
-        testQueue.processQueue(new QueueRunner(testQueue, AccessController.getContext())
-        {
-            public void run()
-            {
-                // do nothing
-            }
-        });
 
         // wait up to 1 minute for message receipt
         try

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java Tue Nov 15 14:16:10 2016
@@ -19,6 +19,7 @@
 package org.apache.qpid.server.security;
 
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Mockito.mock;
@@ -42,11 +43,14 @@ import org.apache.qpid.server.consumer.C
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.TrustStore;
 import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.queue.AbstractQueue;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.TestMemoryMessageStore;
+import org.apache.qpid.server.virtualhost.AbstractSystemMessageSource;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 
@@ -78,11 +82,10 @@ public class TrustStoreMessageSourceTest
         final ConsumerTarget target = mock(ConsumerTarget.class);
         when(target.allocateCredit(any(ServerMessage.class))).thenReturn(true);
 
-        _trustStoreMessageSource.addConsumer(target, null, ServerMessage.class, getTestName(), options, 0);
-
-        ArgumentCaptor<MessageInstance> argumentCaptor = ArgumentCaptor.forClass(MessageInstance.class);
-        verify(target).send(any(ConsumerImpl.class), argumentCaptor.capture(), anyBoolean());
-        final ServerMessage message = argumentCaptor.getValue().getMessage();
+        ConsumerImpl consumer = _trustStoreMessageSource.addConsumer(target, null, ServerMessage.class, getTestName(), options, 0);
+        final AbstractQueue.MessageContainer messageContainer = consumer.pullMessage();
+        assertNotNull("Could not pull message of TrustStore", messageContainer);
+        final ServerMessage message = messageContainer._messageInstance.getMessage();
         assertCertificates(getCertificatesFromMessage(message));
     }
 

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java Tue Nov 15 14:16:10 2016
@@ -149,7 +149,6 @@ public class TCPandSSLTransportTest exte
         when(port.getNumberOfSelectors()).thenReturn(1);
         when(port.getSSLContext()).thenReturn(sslContext);
         when(port.getContextValue(Long.class, AmqpPort.PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT)).thenReturn(1l);
-        when(port.getContextValue(Long.class, AmqpPort.PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE)).thenReturn(AmqpPort.DEFAULT_PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE);
         when(port.getContextValue(Integer.class, AmqpPort.PORT_AMQP_ACCEPT_BACKLOG)).thenReturn(AmqpPort.DEFAULT_PORT_AMQP_ACCEPT_BACKLOG);
         when(port.getProtocolHandshakeTimeout()).thenReturn(AmqpPort.DEFAULT_PROTOCOL_HANDSHAKE_TIMEOUT);
         ObjectMapper mapper = new ObjectMapper();

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java Tue Nov 15 14:16:10 2016
@@ -31,6 +31,7 @@ import org.apache.qpid.server.consumer.C
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.queue.AbstractQueue;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.TestMemoryMessageStore;
 import org.apache.qpid.test.utils.QpidTestCase;
@@ -56,7 +57,8 @@ public class VirtualHostPropertiesNodeTe
         final ConsumerTarget target = mock(ConsumerTarget.class);
         when(target.allocateCredit(any(ServerMessage.class))).thenReturn(true);
 
-        _virtualHostPropertiesNode.addConsumer(target, null, ServerMessage.class, getTestName(), options, 0);
-        verify(target).send(any(ConsumerImpl.class), any(MessageInstance.class), anyBoolean());
+        ConsumerImpl consumer = _virtualHostPropertiesNode.addConsumer(target, null, ServerMessage.class, getTestName(), options, 0);
+        final AbstractQueue.MessageContainer messageContainer = consumer.pullMessage();
+        assertNotNull("Could not pull message from VirtualHostPropertyNode", messageContainer);
     }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Tue Nov 15 14:16:10 2016
@@ -27,6 +27,8 @@ import java.security.PrivilegedAction;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -55,7 +57,7 @@ import org.apache.qpid.transport.Constan
 import org.apache.qpid.server.transport.AggregateTicker;
 
 
-public class AMQPConnection_0_10 extends AbstractAMQPConnection<AMQPConnection_0_10>
+public class AMQPConnection_0_10 extends AbstractAMQPConnection<AMQPConnection_0_10, ServerConnection>
 {
     private static final Logger _logger = LoggerFactory.getLogger(AMQPConnection_0_10.class);
     private final ServerInputHandler _inputHandler;
@@ -68,6 +70,9 @@ public class AMQPConnection_0_10 extends
     private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
     private ServerDisassembler _disassembler;
 
+    private final Set<AMQSessionModel<?>> _sessionsWithWork =
+            Collections.newSetFromMap(new ConcurrentHashMap<AMQSessionModel<?>, Boolean>());
+
 
     public AMQPConnection_0_10(final Broker<?> broker,
                                ServerNetworkConnection network,
@@ -251,7 +256,7 @@ public class AMQPConnection_0_10 extends
     {
         if (isIOThread())
         {
-            return _connection.processPendingIterator();
+            return _connection.processPendingIterator(_sessionsWithWork);
         }
         else
         {
@@ -277,6 +282,13 @@ public class AMQPConnection_0_10 extends
         }
     }
 
+    @Override
+    public void notifyWork(final AMQSessionModel<?> sessionModel)
+    {
+        _sessionsWithWork.add(sessionModel);
+        notifyWork();
+    }
+
     public void clearWork()
     {
         _stateChanged.set(false);
@@ -294,6 +306,7 @@ public class AMQPConnection_0_10 extends
 
     public void sendConnectionCloseAsync(final AMQConstant cause, final String message)
     {
+        stopConnection();
         _connection.sendConnectionCloseAsync(cause, message);
     }
 
@@ -303,6 +316,12 @@ public class AMQPConnection_0_10 extends
         _connection.closeSessionAsync((ServerSession) session, cause, message);
     }
 
+    @Override
+    protected void addAsyncTask(final Action<? super ServerConnection> action)
+    {
+        _connection.addAsyncTask(action);
+    }
+
     public void block()
     {
         _connection.block();

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Tue Nov 15 14:16:10 2016
@@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.consumer.AbstractConsumerTarget;
 import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.message.MessageInstance;
@@ -62,13 +61,12 @@ import org.apache.qpid.transport.Option;
 import org.apache.qpid.util.ByteBufferUtils;
 import org.apache.qpid.util.GZIPUtils;
 
-public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener
+public class ConsumerTarget_0_10 extends AbstractConsumerTarget
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerTarget_0_10.class);
 
     private static final Option[] BATCHED = new Option[] { Option.BATCH };
 
-    private final AtomicBoolean _deleted = new AtomicBoolean(false);
     private final String _name;
     private final String _targetAddress;
 
@@ -116,14 +114,13 @@ public class ConsumerTarget_0_10 extends
                                Map<String, Object> arguments,
                                boolean multiQueue)
     {
-        super(State.SUSPENDED, isPullOnly(arguments), multiQueue, session.getAMQPConnection());
+        super(multiQueue, session.getAMQPConnection());
         _session = session;
         _postIdSettingAction = new AddMessageDispositionListenerAction(session);
         _acceptMode = acceptMode;
         _acquireMode = acquireMode;
         _creditManager = creditManager;
         _flowMode = flowMode;
-        _creditManager.addStateListener(this);
         _name = name;
         if(arguments != null && arguments.containsKey("local-address"))
         {
@@ -135,41 +132,15 @@ public class ConsumerTarget_0_10 extends
         }
     }
 
-    private static boolean isPullOnly(Map<String, Object> arguments)
-    {
-        return arguments != null
-               && arguments.containsKey(PULL_ONLY_CONSUMER)
-               && Boolean.valueOf(String.valueOf(arguments.get(PULL_ONLY_CONSUMER)));
-    }
-
-    @Override
-    public boolean isFlowSuspended()
-    {
-        return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getAMQPConnection().isConnectionStopped();
-        // TODO check for Session suspension
-    }
-
     @Override
-    protected void doCloseInternal()
+    public void updateNotifyWorkDesired()
     {
-        _creditManager.removeListener(this);
-    }
+        final AMQPConnection_0_10 amqpConnection = _session.getAMQPConnection();
 
-    public void creditStateChanged(boolean hasCredit)
-    {
+        boolean state = !amqpConnection.isTransportBlockedForWriting()
+                        && getCreditManager().hasCredit();
 
-        if(hasCredit)
-        {
-            if(!updateState(State.SUSPENDED, State.ACTIVE))
-            {
-                // this is a hack to get round the issue of increasing bytes credit
-                notifyCurrentState();
-            }
-        }
-        else
-        {
-            updateState(State.ACTIVE, State.SUSPENDED);
-        }
+        setNotifyWorkDesired(state);
     }
 
     public String getName()
@@ -180,6 +151,7 @@ public class ConsumerTarget_0_10 extends
     public void transportStateChanged()
     {
         _creditManager.restoreCredit(0, 0);
+        updateNotifyWorkDesired();
     }
 
     public static class AddMessageDispositionListenerAction implements Runnable
@@ -404,11 +376,12 @@ public class ConsumerTarget_0_10 extends
 
     public void flushCreditState(boolean strict)
     {
-        if(strict || !isFlowSuspended() || _deferredMessageCredit >= 200
-          || !(_creditManager instanceof WindowCreditManager)
-          || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 )
+        if(strict || !isSuspended() || _deferredMessageCredit >= 200
+           || !(_creditManager instanceof WindowCreditManager)
+           || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 )
         {
-            _creditManager.restoreCredit(_deferredMessageCredit, _deferredSizeCredit);
+            restoreCredit(_deferredMessageCredit, _deferredSizeCredit);
+
             _deferredMessageCredit = 0;
             _deferredSizeCredit = 0l;
         }
@@ -521,21 +494,26 @@ public class ConsumerTarget_0_10 extends
         return (maxDeliveryLimit > 0 && entry.getDeliveryCount() >= maxDeliveryLimit);
     }
 
-    public void queueDeleted()
+    public boolean allocateCredit(ServerMessage message)
     {
-        _deleted.set(true);
+        boolean creditAllocated = _creditManager.useCreditForMessage(message.getSize());
+        updateNotifyWorkDesired();
+        return creditAllocated;
     }
 
-    public boolean allocateCredit(ServerMessage message)
+    public void restoreCredit(ServerMessage message)
     {
-        return _creditManager.useCreditForMessage(message.getSize());
+        restoreCredit(1, message.getSize());
     }
 
-    public void restoreCredit(ServerMessage message)
+    void restoreCredit(int count, long size)
     {
-        _creditManager.restoreCredit(1, message.getSize());
+        _creditManager.restoreCredit(count, size);
+        updateNotifyWorkDesired();
     }
 
+
+
     public FlowCreditManager_0_10 getCreditManager()
     {
         return _creditManager;
@@ -543,25 +521,13 @@ public class ConsumerTarget_0_10 extends
 
     public void stop()
     {
-        try
-        {
-            getSendLock();
-
-            updateState(State.ACTIVE, State.SUSPENDED);
-            _stopped.set(true);
-            FlowCreditManager_0_10 creditManager = getCreditManager();
-            creditManager.clearCredit();
-        }
-        finally
-        {
-            releaseSendLock();
-        }
+        getCreditManager().clearCredit();
+        updateNotifyWorkDesired();
     }
 
     public void addCredit(MessageCreditUnit unit, long value)
     {
         FlowCreditManager_0_10 creditManager = getCreditManager();
-
         switch (unit)
         {
             case MESSAGE:
@@ -572,22 +538,11 @@ public class ConsumerTarget_0_10 extends
                 creditManager.addCredit(0l, value);
                 break;
         }
-
-        _stopped.set(false);
-
-        if(creditManager.hasCredit())
-        {
-            updateState(State.SUSPENDED, State.ACTIVE);
-        }
-
+        updateNotifyWorkDesired();
     }
 
     public void setFlowMode(MessageFlowMode flowMode)
     {
-
-
-        _creditManager.removeListener(this);
-
         switch(flowMode)
         {
             case CREDIT:
@@ -601,24 +556,18 @@ public class ConsumerTarget_0_10 extends
                 throw new ConnectionScopedRuntimeException("Unknown message flow mode: " + flowMode);
         }
         _flowMode = flowMode;
-        updateState(State.ACTIVE, State.SUSPENDED);
-
-        _creditManager.addStateListener(this);
-
+        updateNotifyWorkDesired();
     }
 
-    public boolean isStopped()
+    public boolean isFlowModeChangeAllowed()
     {
-        return _stopped.get();
+        return _creditManager.hasNeitherCredit();
     }
 
     public void flush()
     {
         flushCreditState(true);
-        for(ConsumerImpl consumer : getConsumers())
-        {
-            consumer.flush();
-        }
+        while(sendNextMessage());
         stop();
     }
 
@@ -657,30 +606,6 @@ public class ConsumerTarget_0_10 extends
     }
 
     @Override
-    protected void processClosed()
-    {
-
-    }
-
-    @Override
-    protected void processStateChanged()
-    {
-
-    }
-
-    @Override
-    protected boolean hasStateChanged()
-    {
-        return false;
-    }
-
-    @Override
-    protected boolean hasClosed()
-    {
-        return false;
-    }
-
-    @Override
     public String toString()
     {
         return "ConsumerTarget_0_10[name=" + _name + ", session=" + _session.toLogString() + "]";

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java Tue Nov 15 14:16:10 2016
@@ -22,9 +22,8 @@ package org.apache.qpid.server.protocol.
 
 
 import org.apache.qpid.server.transport.ProtocolEngine;
-import org.apache.qpid.server.flow.AbstractFlowCreditManager;
 
-public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
+public class CreditCreditManager implements FlowCreditManager_0_10
 {
     private final ProtocolEngine _protocolEngine;
     private volatile long _bytesCredit;
@@ -35,13 +34,10 @@ public class CreditCreditManager extends
         _protocolEngine = protocolEngine;
         _bytesCredit = bytesCredit;
         _messageCredit = messageCredit;
-        setSuspended(!hasCredit());
-
     }
 
     public synchronized void restoreCredit(final long messageCredit, final long bytesCredit)
     {
-        setSuspended(!hasCredit());
     }
 
 
@@ -52,26 +48,16 @@ public class CreditCreditManager extends
             _messageCredit += messageCredit;
         }
 
-        boolean notifyIncrease = false;
-
         if(_bytesCredit >= 0L && bytesCredit > 0L)
         {
-            notifyIncrease = _messageCredit != 0L && bytesCredit > 0L;
             _bytesCredit += bytesCredit;
         }
-
-        if(!setSuspended(!hasCredit()) && notifyIncrease)
-        {
-            notifyIncreaseBytesCredit();
-        }
-
     }
 
     public synchronized void clearCredit()
     {
         _bytesCredit = 0l;
         _messageCredit = 0l;
-        setSuspended(true);
     }
 
 
@@ -81,11 +67,16 @@ public class CreditCreditManager extends
         return (_bytesCredit != 0L  && _messageCredit != 0L && !_protocolEngine.isTransportBlockedForWriting());
     }
 
+    @Override
+    public boolean hasNeitherCredit()
+    {
+        return _bytesCredit == 0L && _messageCredit == 0L;
+    }
+
     public synchronized boolean useCreditForMessage(long msgSize)
     {
         if (_protocolEngine.isTransportBlockedForWriting())
         {
-            setSuspended(true);
             return false;
         }
         else if(_messageCredit >= 0L)
@@ -112,7 +103,6 @@ public class CreditCreditManager extends
             }
             else
             {
-                setSuspended(true);
                 return false;
             }
         }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java Tue Nov 15 14:16:10 2016
@@ -24,7 +24,9 @@ import org.apache.qpid.server.flow.FlowC
 
 public interface FlowCreditManager_0_10 extends FlowCreditManager
 {
-    public void addCredit(long count, long bytes);
+    void addCredit(long count, long bytes);
 
     void clearCredit();
+
+    boolean hasNeitherCredit();
 }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java Tue Nov 15 14:16:10 2016
@@ -56,7 +56,7 @@ public class MessageAcceptCompletionList
     {
         if(_restoreCredit)
         {
-            _sub.getCreditManager().restoreCredit(1l, _messageSize);
+            _sub.restoreCredit(1, _messageSize);
         }
         _session.acknowledge(_consumer, _sub, _entry);
 

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Tue Nov 15 14:16:10 2016
@@ -27,13 +27,12 @@ import java.security.AccessControlContex
 import java.security.AccessController;
 import java.security.Principal;
 import java.security.PrivilegedAction;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -43,13 +42,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.model.NamedAddressSpace;
-import org.apache.qpid.server.protocol.ConnectionClosingTicker;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.ConnectionClosingTicker;
 import org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -133,7 +132,7 @@ public class ServerConnection extends Co
             getAmqpConnection().getAggregateTicker().addTicker(new ConnectionClosingTicker(System.currentTimeMillis() + CLOSE_OK_TIMEOUT, (ServerNetworkConnection) getNetworkConnection()));
 
             // trigger a wakeup to ensure the ticker will be taken into account
-            notifyWork();
+            getAmqpConnection().notifyWork();
         }
     }
 
@@ -468,10 +467,10 @@ public class ServerConnection extends Co
         super.doHeartBeat();
     }
 
-    private void addAsyncTask(final Action<ServerConnection> action)
+    void addAsyncTask(final Action<? super ServerConnection> action)
     {
         _asyncTaskList.add(action);
-        notifyWork();
+        getAmqpConnection().notifyWork();
     }
 
     public int getMessageCompressionThreshold()
@@ -492,30 +491,26 @@ public class ServerConnection extends Co
         }
     }
 
-    public void notifyWork()
-    {
-        _amqpConnection.notifyWork();
-    }
-
-    public Iterator<Runnable> processPendingIterator()
+    public Iterator<Runnable> processPendingIterator(final Set<AMQSessionModel<?>> sessionsWithWork)
     {
-        return new ProcessPendingIterator();
+        return new ProcessPendingIterator(sessionsWithWork);
     }
 
     private class ProcessPendingIterator implements Iterator<Runnable>
     {
-        private final Collection<? extends ServerSession> _sessionsWithPending;
+        private final Collection<AMQSessionModel<?>> _sessionsWithPending;
         private Iterator<? extends AMQSessionModel<?>> _sessionIterator;
-        private ProcessPendingIterator()
+        private ProcessPendingIterator(final Set<AMQSessionModel<?>> sessionsWithWork)
         {
-            _sessionsWithPending = new ArrayList<>(getSessionModels());
+            _sessionsWithPending = sessionsWithWork;
             _sessionIterator = _sessionsWithPending.iterator();
         }
 
         @Override
         public boolean hasNext()
         {
-            return !(_sessionsWithPending.isEmpty() && _asyncTaskList.isEmpty());
+            return (!_sessionsWithPending.isEmpty() && !isClosing() && !_amqpConnection.isConnectionStopped())
+                   || !_asyncTaskList.isEmpty();
         }
 
         @Override
@@ -523,22 +518,39 @@ public class ServerConnection extends Co
         {
             if(!_sessionsWithPending.isEmpty())
             {
-                if(!_sessionIterator.hasNext())
+                if(isClosing() || _amqpConnection.isConnectionStopped())
                 {
-                    _sessionIterator = _sessionsWithPending.iterator();
+                    // in case the connection was marked as closing between a call to hasNext() and
+                    // a subsequent call to next()
+                    return new Runnable()
+                    {
+                        @Override
+                        public void run()
+                        {
+
+                        }
+                    };
                 }
-                final AMQSessionModel<?> session = _sessionIterator.next();
-                return new Runnable()
+                else
                 {
-                    @Override
-                    public void run()
+                    if (!_sessionIterator.hasNext())
+                    {
+                        _sessionIterator = _sessionsWithPending.iterator();
+                    }
+                    final AMQSessionModel<?> session = _sessionIterator.next();
+                    return new Runnable()
                     {
-                        if(!session.processPending())
+                        @Override
+                        public void run()
                         {
                             _sessionIterator.remove();
+                            if (session.processPending())
+                            {
+                                _sessionsWithPending.add(session);
+                            }
                         }
-                    }
-                };
+                    };
+                }
             }
             else if(!_asyncTaskList.isEmpty())
             {
@@ -556,6 +568,7 @@ public class ServerConnection extends Co
             {
                 throw new NoSuchElementException();
             }
+
         }
 
         @Override

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Tue Nov 15 14:16:10 2016
@@ -143,8 +143,11 @@ public class ServerSession extends Sessi
     private long _blockTime;
     private long _blockingTimeout;
     private boolean _wireBlockingState;
-    private final List<ConsumerTarget> _consumersWithPendingWork = new ArrayList<>();
-    private final PublishAuthorisationCache _publishAuthCahe;
+    private final Set<ConsumerTarget> _consumersWithPendingWork =
+            Collections.newSetFromMap(new ConcurrentHashMap<ConsumerTarget, Boolean>());
+    private Iterator<ConsumerTarget> _processPendingIterator;
+
+    private final PublishAuthorisationCache _publishAuthCache;
 
 
     public static interface MessageDispositionChangeListener
@@ -206,10 +209,10 @@ public class ServerSession extends Sessi
         }
 
         _blockingTimeout = serverConnection.getBroker().getContextValue(Long.class, Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
-        _maxUncommittedInMemorySize = getConnection().getAmqpConnection().getContextProvider().getContextValue(Long.class, org.apache.qpid.server.model.Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
-        _publishAuthCahe = new PublishAuthorisationCache(_token,
-                                                         amqpConnection.getContextValue(Long.class, org.apache.qpid.server.model.Session.PRODUCER_AUTH_CACHE_TIMEOUT),
-                                                         amqpConnection.getContextValue(Integer.class, org.apache.qpid.server.model.Session.PRODUCER_AUTH_CACHE_SIZE));
+        _maxUncommittedInMemorySize = getAMQPConnection().getContextProvider().getContextValue(Long.class, org.apache.qpid.server.model.Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
+        _publishAuthCache = new PublishAuthorisationCache(_token,
+                                                          amqpConnection.getContextValue(Long.class, org.apache.qpid.server.model.Session.PRODUCER_AUTH_CACHE_TIMEOUT),
+                                                          amqpConnection.getContextValue(Integer.class, org.apache.qpid.server.model.Session.PRODUCER_AUTH_CACHE_SIZE));
 
     }
 
@@ -226,7 +229,7 @@ public class ServerSession extends Sessi
 
             if (state == State.OPEN)
             {
-                getConnection().getAmqpConnection().getEventLogger().message(ChannelMessages.CREATE());
+                getAMQPConnection().getEventLogger().message(ChannelMessages.CREATE());
             }
         }
         else
@@ -275,7 +278,7 @@ public class ServerSession extends Sessi
                           final boolean immediate,
                           final long currentTime)
     {
-        _publishAuthCahe.authorisePublish(destination, routingKey, immediate, currentTime);
+        _publishAuthCache.authorisePublish(destination, routingKey, immediate, currentTime);
     }
 
     @Override
@@ -320,8 +323,8 @@ public class ServerSession extends Sessi
                 handle.flowToDisk();
                 if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getMetaData().getContentSize())
                 {
-                    getConnection().getAmqpConnection().getEventLogger()
-                            .message(_logSubject, ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));
+                    getAMQPConnection().getEventLogger()
+                                       .message(_logSubject, ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));
                 }
 
                 if(!_uncommittedMessages.isEmpty())
@@ -528,7 +531,7 @@ public class ServerSession extends Sessi
         {
             operationalLoggingMessage = ChannelMessages.CLOSE();
         }
-        getConnection().getAmqpConnection().getEventLogger().message(getLogSubject(), operationalLoggingMessage);
+        getAMQPConnection().getEventLogger().message(getLogSubject(), operationalLoggingMessage);
     }
 
     @Override
@@ -853,10 +856,10 @@ public class ServerSession extends Sessi
 
                 if(_blocking.compareAndSet(false,true))
                 {
-                    getConnection().getAmqpConnection().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
+                    getAMQPConnection().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
                     if(getState() == State.OPEN)
                     {
-                        getConnection().notifyWork();
+                        getAMQPConnection().notifyWork(this);
                     }
                 }
 
@@ -881,8 +884,8 @@ public class ServerSession extends Sessi
         {
             if(_blocking.compareAndSet(true,false) && !isClosing())
             {
-                getConnection().getAmqpConnection().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED());
-                getConnection().notifyWork();
+                getAMQPConnection().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED());
+                getAMQPConnection().notifyWork(this);
             }
         }
     }
@@ -1185,7 +1188,7 @@ public class ServerSession extends Sessi
     @Override
     public boolean processPending()
     {
-        if (!getAMQPConnection().isIOThread())
+        if (!getAMQPConnection().isIOThread() || isClosing())
         {
             return false;
         }
@@ -1206,40 +1209,31 @@ public class ServerSession extends Sessi
             _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
         }
 
-        boolean consumerListNeedsRefreshing;
-        if(_consumersWithPendingWork.isEmpty())
-        {
-            _consumersWithPendingWork.addAll(getSubscriptions());
-            consumerListNeedsRefreshing = false;
-        }
-        else
+        if (!_consumersWithPendingWork.isEmpty())
         {
-            consumerListNeedsRefreshing = true;
-        }
-
-        // QPID-7447: prevent unnecessary allocation of empty iterator
-        Iterator<ConsumerTarget> iter = _consumersWithPendingWork.isEmpty() ? Collections.<ConsumerTarget>emptyIterator() : _consumersWithPendingWork.iterator();
+            if (_processPendingIterator == null || !_processPendingIterator.hasNext())
+            {
+                _processPendingIterator = _consumersWithPendingWork.iterator();
+            }
 
-        boolean consumerHasMoreWork = false;
-        while(iter.hasNext())
-        {
-            final ConsumerTarget target = iter.next();
-            iter.remove();
-            if(target.hasPendingWork())
+            if (_processPendingIterator.hasNext())
             {
-                consumerHasMoreWork = true;
-                target.processPending();
-                break;
+                ConsumerTarget target = _processPendingIterator.next();
+                _processPendingIterator.remove();
+                if (target.processPending())
+                {
+                    _consumersWithPendingWork.add(target);
+                }
             }
         }
 
-        return consumerHasMoreWork || consumerListNeedsRefreshing;
+        return !_consumersWithPendingWork.isEmpty();
     }
 
     @Override
     public void addTicker(final Ticker ticker)
     {
-        getConnection().getAmqpConnection().getAggregateTicker().addTicker(ticker);
+        getAMQPConnection().getAggregateTicker().addTicker(ticker);
         // trigger a wakeup to ensure the ticker will be taken into account
         getAMQPConnection().notifyWork();
     }
@@ -1247,36 +1241,15 @@ public class ServerSession extends Sessi
     @Override
     public void removeTicker(final Ticker ticker)
     {
-        getConnection().getAmqpConnection().getAggregateTicker().removeTicker(ticker);
-    }
-
-    @Override
-    public void notifyConsumerTargetCurrentStates()
-    {
-        Collection<ConsumerTarget_0_10> consumerTargets = getSubscriptions();
-        for(ConsumerTarget_0_10 consumerTarget: consumerTargets)
-        {
-            if(!consumerTarget.isPullOnly())
-            {
-                consumerTarget.notifyCurrentState();
-            }
-        }
+        getAMQPConnection().getAggregateTicker().removeTicker(ticker);
     }
 
     @Override
-    public void ensureConsumersNoticedStateChange()
+    public void notifyWork(final ConsumerTarget target)
     {
-        Collection<ConsumerTarget_0_10> consumerTargets = getSubscriptions();
-        for(ConsumerTarget_0_10 consumerTarget: consumerTargets)
+        if(_consumersWithPendingWork.add(target))
         {
-            try
-            {
-                consumerTarget.getSendLock();
-            }
-            finally
-            {
-                consumerTarget.releaseSendLock();
-            }
+            getAMQPConnection().notifyWork(this);
         }
     }
 

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Tue Nov 15 14:16:10 2016
@@ -361,6 +361,7 @@ public class ServerSessionDelegate exten
                                                        options,
                                                        priority));
                         }
+                        target.updateNotifyWorkDesired();
                     }
                     catch (Queue.ExistingExclusiveConsumer existing)
                     {
@@ -378,6 +379,10 @@ public class ServerSessionDelegate exten
                     {
                         exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an incompatible exclusivity policy");
                     }
+                    catch (MessageSource.QueueDeleted queueDeleted)
+                    {
+                        exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue was deleted");
+                    }
                 }
             }
         }
@@ -1676,10 +1681,14 @@ public class ServerSessionDelegate exten
         {
             exception(session, sfm, ExecutionErrorCode.NOT_FOUND, "not-found: destination '" + destination + "'");
         }
-        else if(sub.isStopped())
+        else if(sub.isFlowModeChangeAllowed())
         {
             sub.setFlowMode(sfm.getFlowMode());
         }
+        else
+        {
+            exception(session, sfm, ExecutionErrorCode.PRECONDITION_FAILED, "destination '" + destination + "' has credit");
+        }
     }
 
     @Override

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java Tue Nov 15 14:16:10 2016
@@ -25,9 +25,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.transport.ProtocolEngine;
-import org.apache.qpid.server.flow.AbstractFlowCreditManager;
 
-public class WindowCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
+public class WindowCreditManager implements FlowCreditManager_0_10
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(WindowCreditManager.class);
     private final ProtocolEngine _protocolEngine;
@@ -45,8 +44,6 @@ public class WindowCreditManager extends
         _protocolEngine = protocolEngine;
         _bytesCreditLimit = bytesCreditLimit;
         _messageCreditLimit = messageCreditLimit;
-        setSuspended(!hasCredit());
-
     }
 
     public synchronized long getMessageCreditLimit()
@@ -77,30 +74,14 @@ public class WindowCreditManager extends
             _messageUsed = 0;
         }
 
-        boolean notifyIncrease = true;
-
-        if (_messageCreditLimit > 0L)
-        {
-            notifyIncrease = (_messageUsed != _messageCreditLimit);
-        }
-
         _bytesUsed -= bytesCredit;
         if (_bytesUsed < 0L)
         {
             LOGGER.error("Bytes credit used value was negative: " + _bytesUsed);
             _bytesUsed = 0;
         }
-
-        notifyIncrease = notifyIncrease && bytesCredit > 0 && _bytesCreditLimit > 0L ;
-
-        if (!setSuspended(!hasCredit()) && notifyIncrease)
-        {
-            notifyIncreaseBytesCredit();
-        }
     }
 
-
-
     public synchronized boolean hasCredit()
     {
         return (_bytesCreditLimit < 0L || _bytesCreditLimit > _bytesUsed)
@@ -108,11 +89,16 @@ public class WindowCreditManager extends
                 && !_protocolEngine.isTransportBlockedForWriting();
     }
 
+    @Override
+    public boolean hasNeitherCredit()
+    {
+        return _bytesCreditLimit == 0L && _messageCreditLimit == 0L;
+    }
+
     public synchronized boolean useCreditForMessage(final long msgSize)
     {
         if (_protocolEngine.isTransportBlockedForWriting())
         {
-            setSuspended(true);
             return false;
         }
         else if(_messageCreditLimit >= 0L)
@@ -139,7 +125,6 @@ public class WindowCreditManager extends
             }
             else
             {
-                setSuspended(true);
                 return false;
             }
         }
@@ -191,6 +176,5 @@ public class WindowCreditManager extends
     {
         _bytesCreditLimit = 0l;
         _messageCreditLimit = 0l;
-        setSuspended(true);
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org