You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2016/11/18 11:10:24 UTC

svn commit: r1770363 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/queue/ broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/s...

Author: lquack
Date: Fri Nov 18 11:10:23 2016
New Revision: 1770363

URL: http://svn.apache.org/viewvc?rev=1770363&view=rev
Log:
QPID-7514: [Java Broker] Performance optimisation and refactoring

* move isTransportBlockedForWriting check from CreditManagers to PendingWorkIterators
* batch ConsumerTarget notifications when highPrefetch is used

Added:
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/FlowCreditManager_0_8.java
      - copied, changed from r1770362, qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java
Removed:
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java
Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/InfiniteCreditCreditManager.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManagerTest.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java Fri Nov 18 11:10:23 2016
@@ -142,7 +142,7 @@ public class QueueConsumerManagerImpl im
     public void clearStateAffirmationFlag(final QueueConsumer consumer)
     {
         QueueConsumerNode node = consumer.getQueueConsumerNode();
-        node.clearAffirmaion();
+        node.clearAffirmation();
     }
 
     // Set by the Queue any IO thread

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java Fri Nov 18 11:10:23 2016
@@ -47,7 +47,7 @@ final class QueueConsumerNode
         return _state;
     }
 
-    public synchronized void clearAffirmaion()
+    public synchronized void clearAffirmation()
     {
         _affirmed = false;
     }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Fri Nov 18 11:10:23 2016
@@ -546,10 +546,10 @@ public class ConsumerTarget_0_10 extends
         switch(flowMode)
         {
             case CREDIT:
-                _creditManager = new CreditCreditManager(0l, 0l, _session.getConnection().getAmqpConnection());
+                _creditManager = new CreditCreditManager(0l, 0l);
                 break;
             case WINDOW:
-                _creditManager = new WindowCreditManager(0l, 0l, _session.getConnection().getAmqpConnection());
+                _creditManager = new WindowCreditManager(0l, 0l);
                 break;
             default:
                 // this should never happen, as 0-10 is finalised and so the enum should never change

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java Fri Nov 18 11:10:23 2016
@@ -21,17 +21,13 @@
 package org.apache.qpid.server.protocol.v0_10;
 
 
-import org.apache.qpid.server.transport.ProtocolEngine;
-
 public class CreditCreditManager implements FlowCreditManager_0_10
 {
-    private final ProtocolEngine _protocolEngine;
     private volatile long _bytesCredit;
     private volatile long _messageCredit;
 
-    public CreditCreditManager(long bytesCredit, long messageCredit, final ProtocolEngine protocolEngine)
+    public CreditCreditManager(long bytesCredit, long messageCredit)
     {
-        _protocolEngine = protocolEngine;
         _bytesCredit = bytesCredit;
         _messageCredit = messageCredit;
     }
@@ -64,7 +60,7 @@ public class CreditCreditManager impleme
     public synchronized boolean hasCredit()
     {
         // Note !=, if credit is < 0 that indicates infinite credit
-        return (_bytesCredit != 0L  && _messageCredit != 0L && !_protocolEngine.isTransportBlockedForWriting());
+        return (_bytesCredit != 0L  && _messageCredit != 0L);
     }
 
     @Override
@@ -75,11 +71,7 @@ public class CreditCreditManager impleme
 
     public synchronized boolean useCreditForMessage(long msgSize)
     {
-        if (_protocolEngine.isTransportBlockedForWriting())
-        {
-            return false;
-        }
-        else if(_messageCredit >= 0L)
+        if(_messageCredit >= 0L)
         {
             if(_messageCredit > 0)
             {

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Fri Nov 18 11:10:23 2016
@@ -905,6 +905,10 @@ public class ServerSession extends Sessi
         {
             consumerTarget.transportStateChanged();
         }
+        if (!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
+        {
+            getAMQPConnection().notifyWork(this);
+        }
     }
 
     @Override
@@ -1209,7 +1213,7 @@ public class ServerSession extends Sessi
             _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
         }
 
-        if (!_consumersWithPendingWork.isEmpty())
+        if (!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
         {
             if (_processPendingIterator == null || !_processPendingIterator.hasNext())
             {
@@ -1227,7 +1231,7 @@ public class ServerSession extends Sessi
             }
         }
 
-        return !_consumersWithPendingWork.isEmpty();
+        return !_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting();
     }
 
     @Override

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Fri Nov 18 11:10:23 2016
@@ -256,7 +256,7 @@ public class ServerSessionDelegate exten
                 else
                 {
                     ProtocolEngine protocolEngine = getServerConnection(session).getAmqpConnection();
-                    FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L, protocolEngine);
+                    FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
 
                     FilterManager filterManager = null;
                     try

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java Fri Nov 18 11:10:23 2016
@@ -24,12 +24,9 @@ package org.apache.qpid.server.protocol.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.server.transport.ProtocolEngine;
-
 public class WindowCreditManager implements FlowCreditManager_0_10
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(WindowCreditManager.class);
-    private final ProtocolEngine _protocolEngine;
 
     private volatile long _bytesCreditLimit;
     private volatile long _messageCreditLimit;
@@ -38,10 +35,8 @@ public class WindowCreditManager impleme
     private volatile long _messageUsed;
 
     public WindowCreditManager(long bytesCreditLimit,
-                               long messageCreditLimit,
-                               ProtocolEngine protocolEngine)
+                               long messageCreditLimit)
     {
-        _protocolEngine = protocolEngine;
         _bytesCreditLimit = bytesCreditLimit;
         _messageCreditLimit = messageCreditLimit;
     }
@@ -85,8 +80,7 @@ public class WindowCreditManager impleme
     public synchronized boolean hasCredit()
     {
         return (_bytesCreditLimit < 0L || _bytesCreditLimit > _bytesUsed)
-                && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed)
-                && !_protocolEngine.isTransportBlockedForWriting();
+                && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed);
     }
 
     @Override
@@ -97,11 +91,7 @@ public class WindowCreditManager impleme
 
     public synchronized boolean useCreditForMessage(final long msgSize)
     {
-        if (_protocolEngine.isTransportBlockedForWriting())
-        {
-            return false;
-        }
-        else if(_messageCreditLimit >= 0L)
+        if(_messageCreditLimit >= 0L)
         {
             if(_messageUsed < _messageCreditLimit)
             {

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java Fri Nov 18 11:10:23 2016
@@ -38,7 +38,7 @@ public class WindowCreditManagerTest ext
         _protocolEngine = mock(ProtocolEngine.class);
         when(_protocolEngine.isTransportBlockedForWriting()).thenReturn(false);
 
-        _creditManager = new WindowCreditManager(0l, 0l, _protocolEngine);
+        _creditManager = new WindowCreditManager(0l, 0l);
     }
 
     /**

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Fri Nov 18 11:10:23 2016
@@ -66,7 +66,6 @@ import org.apache.qpid.server.filter.Fil
 import org.apache.qpid.server.filter.FilterManagerFactory;
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.filter.MessageFilter;
-import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.EventLoggerProvider;
 import org.apache.qpid.server.logging.LogMessage;
@@ -127,6 +126,7 @@ public class AMQChannel
     public static final int DEFAULT_PREFETCH = 4096;
 
     private static final Logger _logger = LoggerFactory.getLogger(AMQChannel.class);
+    public static final InfiniteCreditCreditManager INFINITE_CREDIT_CREDIT_MANAGER = new InfiniteCreditCreditManager();
     private final DefaultQueueAssociationClearingTask
             _defaultQueueAssociationClearingTask = new DefaultQueueAssociationClearingTask();
 
@@ -134,7 +134,6 @@ public class AMQChannel
 
 
     private final Pre0_10CreditManager _creditManager;
-    private final FlowCreditManager _noAckCreditManager;
     private final AccessControlContext _accessControllerContext;
     private final SecurityToken _token;
 
@@ -237,8 +236,7 @@ public class AMQChannel
 
     public AMQChannel(AMQPConnection_0_8 connection, int channelId, final MessageStore messageStore)
     {
-        _creditManager = new Pre0_10CreditManager(0l,0l, connection);
-        _noAckCreditManager = new NoAckCreditManager(connection);
+        _creditManager = new Pre0_10CreditManager(0l,0l);
 
         _connection = connection;
         _channelId = channelId;
@@ -305,9 +303,6 @@ public class AMQChannel
                    MessageSource.ExistingExclusiveConsumer, MessageSource.ConsumerAccessRefused,
                    MessageSource.QueueDeleted
     {
-
-        final FlowCreditManager singleMessageCredit = new InfiniteCreditCreditManager();
-
         final GetDeliveryMethod getDeliveryMethod =
                 new GetDeliveryMethod(queue);
 
@@ -319,13 +314,13 @@ public class AMQChannel
 
             target = ConsumerTarget_0_8.createAckTarget(this,
                                                         AMQShortString.EMPTY_STRING, null,
-                                                        singleMessageCredit, getDeliveryMethod);
+                                                        INFINITE_CREDIT_CREDIT_MANAGER, getDeliveryMethod);
         }
         else
         {
             target = ConsumerTarget_0_8.createGetNoAckTarget(this,
                                                              AMQShortString.EMPTY_STRING, null,
-                                                             singleMessageCredit, getDeliveryMethod);
+                                                             INFINITE_CREDIT_CREDIT_MANAGER, getDeliveryMethod);
         }
 
         ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options, null);
@@ -725,7 +720,8 @@ public class AMQChannel
         final boolean multiQueue = sources.size()>1;
         if(arguments != null && Boolean.TRUE.equals(arguments.get(AMQPFilterTypes.NO_CONSUME.getValue())))
         {
-            target = ConsumerTarget_0_8.createBrowserTarget(this, tag, arguments, _noAckCreditManager, multiQueue);
+            target = ConsumerTarget_0_8.createBrowserTarget(this, tag, arguments,
+                                                            INFINITE_CREDIT_CREDIT_MANAGER, multiQueue);
         }
         else if(acks)
         {
@@ -735,7 +731,8 @@ public class AMQChannel
         }
         else
         {
-            target = ConsumerTarget_0_8.createNoAckTarget(this, tag, arguments, _noAckCreditManager, multiQueue);
+            target = ConsumerTarget_0_8.createNoAckTarget(this, tag, arguments,
+                                                          INFINITE_CREDIT_CREDIT_MANAGER, multiQueue);
             options.add(ConsumerImpl.Option.ACQUIRES);
             options.add(ConsumerImpl.Option.SEES_REQUEUES);
         }
@@ -1716,7 +1713,11 @@ public class AMQChannel
     {
         updateAllConsumerNotifyWorkDesired();
         _creditManager.restoreCredit(0, 0);
-        _noAckCreditManager.restoreCredit(0, 0);
+        INFINITE_CREDIT_CREDIT_MANAGER.restoreCredit(0, 0);
+        if (!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
+        {
+            getAMQPConnection().notifyWork(this);
+        }
     }
 
     void updateAllConsumerNotifyWorkDesired()
@@ -3733,7 +3734,7 @@ public class AMQChannel
             _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
         }
 
-        if(!_consumersWithPendingWork.isEmpty())
+        if(!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
         {
             if (_processPendingIterator == null || !_processPendingIterator.hasNext())
             {
@@ -3751,7 +3752,7 @@ public class AMQChannel
             }
         }
 
-        return !_consumersWithPendingWork.isEmpty();
+        return !_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting();
     }
 
     @Override

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Fri Nov 18 11:10:23 2016
@@ -56,7 +56,7 @@ public abstract class ConsumerTarget_0_8
 
     public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel,
                                                          AMQShortString consumerTag, FieldTable filters,
-                                                         FlowCreditManager creditManager, final boolean multiQueue)
+                                                         FlowCreditManager_0_8 creditManager, final boolean multiQueue)
     {
         return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(),
                                    multiQueue);
@@ -65,7 +65,7 @@ public abstract class ConsumerTarget_0_8
     public static ConsumerTarget_0_8 createGetNoAckTarget(final AMQChannel channel,
                                                           final AMQShortString consumerTag,
                                                           final FieldTable filters,
-                                                          final FlowCreditManager creditManager,
+                                                          final FlowCreditManager_0_8 creditManager,
                                                           final ClientDeliveryMethod deliveryMethod)
     {
         return new GetNoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod);
@@ -76,7 +76,7 @@ public abstract class ConsumerTarget_0_8
         public BrowserConsumer(AMQChannel channel,
                                AMQShortString consumerTag,
                                FieldTable filters,
-                               FlowCreditManager creditManager,
+                               FlowCreditManager_0_8 creditManager,
                                ClientDeliveryMethod deliveryMethod,
                                boolean multiQueue)
         {
@@ -110,7 +110,7 @@ public abstract class ConsumerTarget_0_8
 
     public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
                                                        AMQShortString consumerTag, FieldTable filters,
-                                                       FlowCreditManager creditManager,
+                                                       FlowCreditManager_0_8 creditManager,
                                                        boolean multiQueue)
     {
         return new NoAckConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(),
@@ -124,7 +124,7 @@ public abstract class ConsumerTarget_0_8
         public NoAckConsumer(AMQChannel channel,
                              AMQShortString consumerTag,
                              FieldTable filters,
-                             FlowCreditManager creditManager,
+                             FlowCreditManager_0_8 creditManager,
                              ClientDeliveryMethod deliveryMethod,
                              boolean multiQueue)
         {
@@ -190,7 +190,7 @@ public abstract class ConsumerTarget_0_8
     {
         public GetNoAckConsumer(AMQChannel channel,
                                 AMQShortString consumerTag, FieldTable filters,
-                                FlowCreditManager creditManager,
+                                FlowCreditManager_0_8 creditManager,
                                 ClientDeliveryMethod deliveryMethod)
         {
             super(channel, consumerTag, filters, creditManager, deliveryMethod, false);
@@ -202,7 +202,7 @@ public abstract class ConsumerTarget_0_8
     public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel,
                                                      AMQShortString consumerTag,
                                                      FieldTable filters,
-                                                     FlowCreditManager creditManager,
+                                                     FlowCreditManager_0_8 creditManager,
                                                      boolean multiQueue)
     {
         return new AckConsumer(channel,
@@ -215,7 +215,7 @@ public abstract class ConsumerTarget_0_8
 
     public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel,
                                                      AMQShortString consumerTag, FieldTable filters,
-                                                     FlowCreditManager creditManager,
+                                                     FlowCreditManager_0_8 creditManager,
                                                      ClientDeliveryMethod deliveryMethod)
     {
         return new AckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, false);
@@ -225,7 +225,7 @@ public abstract class ConsumerTarget_0_8
     {
         public AckConsumer(AMQChannel channel,
                            AMQShortString consumerTag, FieldTable filters,
-                           FlowCreditManager creditManager,
+                           FlowCreditManager_0_8 creditManager,
                            ClientDeliveryMethod deliveryMethod,
                            boolean multiQueue)
         {
@@ -270,14 +270,14 @@ public abstract class ConsumerTarget_0_8
 
     private final AMQShortString _consumerTag;
 
-    private final FlowCreditManager _creditManager;
+    private final FlowCreditManager_0_8 _creditManager;
 
     private final Boolean _autoClose;
 
     public ConsumerTarget_0_8(AMQChannel channel,
                               AMQShortString consumerTag,
                               FieldTable arguments,
-                              FlowCreditManager creditManager,
+                              FlowCreditManager_0_8 creditManager,
                               ClientDeliveryMethod deliveryMethod,
                               boolean multiQueue)
     {
@@ -392,11 +392,24 @@ public abstract class ConsumerTarget_0_8
         _creditManager.restoreCredit(1, message.getSize());
         if(_creditManager.hasCredit() != hasCredit)
         {
-            _channel.updateAllConsumerNotifyWorkDesired();
+            if (hasCredit || !_creditManager.isNotBytesLimitedAndHighPrefetch())
+            {
+                _channel.updateAllConsumerNotifyWorkDesired();
+            }
         }
         else if (hasCredit)
         {
-            notifyWork();
+            if (_creditManager.isNotBytesLimitedAndHighPrefetch())
+            {
+                if (_creditManager.isCreditOverBatchLimit())
+                {
+                    _channel.updateAllConsumerNotifyWorkDesired();
+                }
+            }
+            else
+            {
+                notifyWork();
+            }
         }
     }
 

Copied: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/FlowCreditManager_0_8.java (from r1770362, qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/FlowCreditManager_0_8.java?p2=qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/FlowCreditManager_0_8.java&p1=qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java&r1=1770362&r2=1770363&rev=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/FlowCreditManager_0_8.java Fri Nov 18 11:10:23 2016
@@ -20,31 +20,11 @@
 package org.apache.qpid.server.protocol.v0_8;
 
 import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.transport.ProtocolEngine;
 
-public class NoAckCreditManager implements FlowCreditManager
+public interface FlowCreditManager_0_8 extends FlowCreditManager
 {
-    private final ProtocolEngine _protocolEngine;
 
-    public NoAckCreditManager(ProtocolEngine protocolEngine)
-    {
-        _protocolEngine = protocolEngine;
-    }
+    boolean isNotBytesLimitedAndHighPrefetch();
 
-    @Override
-    public void restoreCredit(final long messageCredit, final long bytesCredit)
-    {
-    }
-
-    @Override
-    public boolean hasCredit()
-    {
-        return !_protocolEngine.isTransportBlockedForWriting();
-    }
-
-    @Override
-    public boolean useCreditForMessage(final long msgSize)
-    {
-        return hasCredit();
-    }
+    boolean isCreditOverBatchLimit();
 }

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/InfiniteCreditCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/InfiniteCreditCreditManager.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/InfiniteCreditCreditManager.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/InfiniteCreditCreditManager.java Fri Nov 18 11:10:23 2016
@@ -20,9 +20,8 @@
 */
 package org.apache.qpid.server.protocol.v0_8;
 
-import org.apache.qpid.server.flow.FlowCreditManager;
 
-public class InfiniteCreditCreditManager implements FlowCreditManager
+public class InfiniteCreditCreditManager implements FlowCreditManager_0_8
 {
 
     public InfiniteCreditCreditManager()
@@ -43,4 +42,15 @@ public class InfiniteCreditCreditManager
         return true;
     }
 
+    @Override
+    public boolean isNotBytesLimitedAndHighPrefetch()
+    {
+        return true;
+    }
+
+    @Override
+    public boolean isCreditOverBatchLimit()
+    {
+        return false;
+    }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java Fri Nov 18 11:10:23 2016
@@ -21,13 +21,11 @@
 package org.apache.qpid.server.protocol.v0_8;
 
 
-import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.transport.ProtocolEngine;
-
-public class Pre0_10CreditManager implements FlowCreditManager
+public class Pre0_10CreditManager implements FlowCreditManager_0_8
 {
 
-    private final ProtocolEngine _protocolEngine;
+    private static final long HIGH_PREFETCH_LIMIT = 100L;
+    private static final long BATCH_LIMIT = 10L;
     private volatile long _bytesCreditLimit;
     private volatile long _messageCreditLimit;
 
@@ -35,10 +33,8 @@ public class Pre0_10CreditManager implem
     private volatile long _messageCredit;
 
     public Pre0_10CreditManager(long bytesCreditLimit,
-                                long messageCreditLimit,
-                                ProtocolEngine protocolEngine)
+                                long messageCreditLimit)
     {
-        _protocolEngine = protocolEngine;
         _bytesCreditLimit = bytesCreditLimit;
         _messageCreditLimit = messageCreditLimit;
         _bytesCredit = bytesCreditLimit;
@@ -89,17 +85,11 @@ public class Pre0_10CreditManager implem
     public synchronized boolean hasCredit()
     {
         return (_bytesCreditLimit == 0L || _bytesCredit > 0)
-               && (_messageCreditLimit == 0L || _messageCredit > 0)
-               && !_protocolEngine.isTransportBlockedForWriting();
+               && (_messageCreditLimit == 0L || _messageCredit > 0);
     }
 
     public synchronized boolean useCreditForMessage(final long msgSize)
     {
-        if (_protocolEngine.isTransportBlockedForWriting())
-        {
-            return false;
-        }
-
         if (_messageCreditLimit != 0)
         {
             if (_messageCredit <= 0)
@@ -119,4 +109,16 @@ public class Pre0_10CreditManager implem
         _bytesCredit -= msgSize;
         return true;
     }
+
+    @Override
+    public boolean isNotBytesLimitedAndHighPrefetch()
+    {
+        return _bytesCreditLimit == 0L && _messageCreditLimit > HIGH_PREFETCH_LIMIT;
+    }
+
+    @Override
+    public boolean isCreditOverBatchLimit()
+    {
+        return _messageCredit > BATCH_LIMIT;
+    }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManagerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManagerTest.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManagerTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManagerTest.java Fri Nov 18 11:10:23 2016
@@ -19,12 +19,9 @@
 
 package org.apache.qpid.server.protocol.v0_8;
 
-import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 
-import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.test.utils.QpidTestCase;
 
@@ -42,7 +39,7 @@ public class Pre0_10CreditManagerTest ex
 
     public void testBasicMessageCredit() throws Exception
     {
-        _creditManager = new Pre0_10CreditManager(0, 0, _protocolEngine);
+        _creditManager = new Pre0_10CreditManager(0, 0);
         _creditManager.setCreditLimits(0, 2);
         assertTrue("Creditmanager should have credit", _creditManager.hasCredit());
         assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(37));
@@ -57,7 +54,7 @@ public class Pre0_10CreditManagerTest ex
 
     public void testBytesLimitDoesNotPreventLargeMessage() throws Exception
     {
-        _creditManager = new Pre0_10CreditManager(0, 0, _protocolEngine);
+        _creditManager = new Pre0_10CreditManager(0, 0);
         _creditManager.setCreditLimits(10, 0);
         assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(3));
         assertFalse("Creditmanager should not be able to useCredit", _creditManager.useCreditForMessage(30));
@@ -67,7 +64,7 @@ public class Pre0_10CreditManagerTest ex
 
     public void testUseCreditWithNegativeMessageCredit() throws Exception
     {
-        _creditManager = new Pre0_10CreditManager(0, 0, _protocolEngine);
+        _creditManager = new Pre0_10CreditManager(0, 0);
         _creditManager.setCreditLimits(0, 3);
         assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(37));
         assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(37));
@@ -85,7 +82,7 @@ public class Pre0_10CreditManagerTest ex
 
     public void testUseCreditWithNegativeBytesCredit() throws Exception
     {
-        _creditManager = new Pre0_10CreditManager(0, 0, _protocolEngine);
+        _creditManager = new Pre0_10CreditManager(0, 0);
         _creditManager.setCreditLimits(3, 0);
         assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(1));
         assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(1));
@@ -103,7 +100,7 @@ public class Pre0_10CreditManagerTest ex
 
     public void testCreditAccountingWhileMessageLimitNotSet() throws Exception
     {
-        _creditManager = new Pre0_10CreditManager(0, 0, _protocolEngine);
+        _creditManager = new Pre0_10CreditManager(0, 0);
         assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(37));
         assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(37));
         assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(37));

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Fri Nov 18 11:10:23 2016
@@ -276,8 +276,7 @@ class ConsumerTarget_1_0 extends Abstrac
     public boolean allocateCredit(final ServerMessage msg)
     {
         ProtocolEngine protocolEngine = getSession().getConnection();
-        final boolean hasCredit =
-                _link.isAttached() && getEndpoint().hasCreditToSend() && !protocolEngine.isTransportBlockedForWriting();
+        final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend();
 
         updateNotifyWorkDesired();
 
@@ -310,8 +309,7 @@ class ConsumerTarget_1_0 extends Abstrac
     {
         updateNotifyWorkDesired();
 
-        ProtocolEngine protocolEngine = getSession().getConnection();
-        if (isSuspended() && getEndpoint() != null && !protocolEngine.isTransportBlockedForWriting())
+        if (isSuspended() && getEndpoint() != null)
         {
             _transactionId = _link.getTransactionId();
         }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Fri Nov 18 11:10:23 2016
@@ -1257,11 +1257,11 @@ public class Session_1_0 implements AMQS
         {
             ConsumerTarget_1_0 target = link.getConsumerTarget();
             target.flowStateChanged();
-
-
         }
-
-
+        if (!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
+        {
+            getAMQPConnection().notifyWork(this);
+        }
     }
 
     @Override
@@ -1541,7 +1541,7 @@ public class Session_1_0 implements AMQS
         }
 
 
-        if(!_consumersWithPendingWork.isEmpty())
+        if(!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
         {
             if (_processPendingIterator == null || !_processPendingIterator.hasNext())
             {
@@ -1559,7 +1559,7 @@ public class Session_1_0 implements AMQS
             }
         }
 
-        return !_consumersWithPendingWork.isEmpty();
+        return !_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting();
     }
 
     @Override




---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org