You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2016/11/18 11:10:24 UTC
svn commit: r1770363 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/queue/
broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/
broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/s...
Author: lquack
Date: Fri Nov 18 11:10:23 2016
New Revision: 1770363
URL: http://svn.apache.org/viewvc?rev=1770363&view=rev
Log:
QPID-7514: [Java Broker] Performance optimisation and refactoring
* move isTransportBlockedForWriting check from CreditManagers to PendingWorkIterators
* batch ConsumerTarget notifications when highPrefetch is used
Added:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/FlowCreditManager_0_8.java
- copied, changed from r1770362, qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java
Removed:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/InfiniteCreditCreditManager.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManagerTest.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java Fri Nov 18 11:10:23 2016
@@ -142,7 +142,7 @@ public class QueueConsumerManagerImpl im
public void clearStateAffirmationFlag(final QueueConsumer consumer)
{
QueueConsumerNode node = consumer.getQueueConsumerNode();
- node.clearAffirmaion();
+ node.clearAffirmation();
}
// Set by the Queue any IO thread
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java Fri Nov 18 11:10:23 2016
@@ -47,7 +47,7 @@ final class QueueConsumerNode
return _state;
}
- public synchronized void clearAffirmaion()
+ public synchronized void clearAffirmation()
{
_affirmed = false;
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Fri Nov 18 11:10:23 2016
@@ -546,10 +546,10 @@ public class ConsumerTarget_0_10 extends
switch(flowMode)
{
case CREDIT:
- _creditManager = new CreditCreditManager(0l, 0l, _session.getConnection().getAmqpConnection());
+ _creditManager = new CreditCreditManager(0l, 0l);
break;
case WINDOW:
- _creditManager = new WindowCreditManager(0l, 0l, _session.getConnection().getAmqpConnection());
+ _creditManager = new WindowCreditManager(0l, 0l);
break;
default:
// this should never happen, as 0-10 is finalised and so the enum should never change
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java Fri Nov 18 11:10:23 2016
@@ -21,17 +21,13 @@
package org.apache.qpid.server.protocol.v0_10;
-import org.apache.qpid.server.transport.ProtocolEngine;
-
public class CreditCreditManager implements FlowCreditManager_0_10
{
- private final ProtocolEngine _protocolEngine;
private volatile long _bytesCredit;
private volatile long _messageCredit;
- public CreditCreditManager(long bytesCredit, long messageCredit, final ProtocolEngine protocolEngine)
+ public CreditCreditManager(long bytesCredit, long messageCredit)
{
- _protocolEngine = protocolEngine;
_bytesCredit = bytesCredit;
_messageCredit = messageCredit;
}
@@ -64,7 +60,7 @@ public class CreditCreditManager impleme
public synchronized boolean hasCredit()
{
// Note !=, if credit is < 0 that indicates infinite credit
- return (_bytesCredit != 0L && _messageCredit != 0L && !_protocolEngine.isTransportBlockedForWriting());
+ return (_bytesCredit != 0L && _messageCredit != 0L);
}
@Override
@@ -75,11 +71,7 @@ public class CreditCreditManager impleme
public synchronized boolean useCreditForMessage(long msgSize)
{
- if (_protocolEngine.isTransportBlockedForWriting())
- {
- return false;
- }
- else if(_messageCredit >= 0L)
+ if(_messageCredit >= 0L)
{
if(_messageCredit > 0)
{
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Fri Nov 18 11:10:23 2016
@@ -905,6 +905,10 @@ public class ServerSession extends Sessi
{
consumerTarget.transportStateChanged();
}
+ if (!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
+ {
+ getAMQPConnection().notifyWork(this);
+ }
}
@Override
@@ -1209,7 +1213,7 @@ public class ServerSession extends Sessi
_blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
}
- if (!_consumersWithPendingWork.isEmpty())
+ if (!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
{
if (_processPendingIterator == null || !_processPendingIterator.hasNext())
{
@@ -1227,7 +1231,7 @@ public class ServerSession extends Sessi
}
}
- return !_consumersWithPendingWork.isEmpty();
+ return !_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting();
}
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Fri Nov 18 11:10:23 2016
@@ -256,7 +256,7 @@ public class ServerSessionDelegate exten
else
{
ProtocolEngine protocolEngine = getServerConnection(session).getAmqpConnection();
- FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L, protocolEngine);
+ FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
FilterManager filterManager = null;
try
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java Fri Nov 18 11:10:23 2016
@@ -24,12 +24,9 @@ package org.apache.qpid.server.protocol.
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.server.transport.ProtocolEngine;
-
public class WindowCreditManager implements FlowCreditManager_0_10
{
private static final Logger LOGGER = LoggerFactory.getLogger(WindowCreditManager.class);
- private final ProtocolEngine _protocolEngine;
private volatile long _bytesCreditLimit;
private volatile long _messageCreditLimit;
@@ -38,10 +35,8 @@ public class WindowCreditManager impleme
private volatile long _messageUsed;
public WindowCreditManager(long bytesCreditLimit,
- long messageCreditLimit,
- ProtocolEngine protocolEngine)
+ long messageCreditLimit)
{
- _protocolEngine = protocolEngine;
_bytesCreditLimit = bytesCreditLimit;
_messageCreditLimit = messageCreditLimit;
}
@@ -85,8 +80,7 @@ public class WindowCreditManager impleme
public synchronized boolean hasCredit()
{
return (_bytesCreditLimit < 0L || _bytesCreditLimit > _bytesUsed)
- && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed)
- && !_protocolEngine.isTransportBlockedForWriting();
+ && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed);
}
@Override
@@ -97,11 +91,7 @@ public class WindowCreditManager impleme
public synchronized boolean useCreditForMessage(final long msgSize)
{
- if (_protocolEngine.isTransportBlockedForWriting())
- {
- return false;
- }
- else if(_messageCreditLimit >= 0L)
+ if(_messageCreditLimit >= 0L)
{
if(_messageUsed < _messageCreditLimit)
{
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java Fri Nov 18 11:10:23 2016
@@ -38,7 +38,7 @@ public class WindowCreditManagerTest ext
_protocolEngine = mock(ProtocolEngine.class);
when(_protocolEngine.isTransportBlockedForWriting()).thenReturn(false);
- _creditManager = new WindowCreditManager(0l, 0l, _protocolEngine);
+ _creditManager = new WindowCreditManager(0l, 0l);
}
/**
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Fri Nov 18 11:10:23 2016
@@ -66,7 +66,6 @@ import org.apache.qpid.server.filter.Fil
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.MessageFilter;
-import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.EventLoggerProvider;
import org.apache.qpid.server.logging.LogMessage;
@@ -127,6 +126,7 @@ public class AMQChannel
public static final int DEFAULT_PREFETCH = 4096;
private static final Logger _logger = LoggerFactory.getLogger(AMQChannel.class);
+ public static final InfiniteCreditCreditManager INFINITE_CREDIT_CREDIT_MANAGER = new InfiniteCreditCreditManager();
private final DefaultQueueAssociationClearingTask
_defaultQueueAssociationClearingTask = new DefaultQueueAssociationClearingTask();
@@ -134,7 +134,6 @@ public class AMQChannel
private final Pre0_10CreditManager _creditManager;
- private final FlowCreditManager _noAckCreditManager;
private final AccessControlContext _accessControllerContext;
private final SecurityToken _token;
@@ -237,8 +236,7 @@ public class AMQChannel
public AMQChannel(AMQPConnection_0_8 connection, int channelId, final MessageStore messageStore)
{
- _creditManager = new Pre0_10CreditManager(0l,0l, connection);
- _noAckCreditManager = new NoAckCreditManager(connection);
+ _creditManager = new Pre0_10CreditManager(0l,0l);
_connection = connection;
_channelId = channelId;
@@ -305,9 +303,6 @@ public class AMQChannel
MessageSource.ExistingExclusiveConsumer, MessageSource.ConsumerAccessRefused,
MessageSource.QueueDeleted
{
-
- final FlowCreditManager singleMessageCredit = new InfiniteCreditCreditManager();
-
final GetDeliveryMethod getDeliveryMethod =
new GetDeliveryMethod(queue);
@@ -319,13 +314,13 @@ public class AMQChannel
target = ConsumerTarget_0_8.createAckTarget(this,
AMQShortString.EMPTY_STRING, null,
- singleMessageCredit, getDeliveryMethod);
+ INFINITE_CREDIT_CREDIT_MANAGER, getDeliveryMethod);
}
else
{
target = ConsumerTarget_0_8.createGetNoAckTarget(this,
AMQShortString.EMPTY_STRING, null,
- singleMessageCredit, getDeliveryMethod);
+ INFINITE_CREDIT_CREDIT_MANAGER, getDeliveryMethod);
}
ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options, null);
@@ -725,7 +720,8 @@ public class AMQChannel
final boolean multiQueue = sources.size()>1;
if(arguments != null && Boolean.TRUE.equals(arguments.get(AMQPFilterTypes.NO_CONSUME.getValue())))
{
- target = ConsumerTarget_0_8.createBrowserTarget(this, tag, arguments, _noAckCreditManager, multiQueue);
+ target = ConsumerTarget_0_8.createBrowserTarget(this, tag, arguments,
+ INFINITE_CREDIT_CREDIT_MANAGER, multiQueue);
}
else if(acks)
{
@@ -735,7 +731,8 @@ public class AMQChannel
}
else
{
- target = ConsumerTarget_0_8.createNoAckTarget(this, tag, arguments, _noAckCreditManager, multiQueue);
+ target = ConsumerTarget_0_8.createNoAckTarget(this, tag, arguments,
+ INFINITE_CREDIT_CREDIT_MANAGER, multiQueue);
options.add(ConsumerImpl.Option.ACQUIRES);
options.add(ConsumerImpl.Option.SEES_REQUEUES);
}
@@ -1716,7 +1713,11 @@ public class AMQChannel
{
updateAllConsumerNotifyWorkDesired();
_creditManager.restoreCredit(0, 0);
- _noAckCreditManager.restoreCredit(0, 0);
+ INFINITE_CREDIT_CREDIT_MANAGER.restoreCredit(0, 0);
+ if (!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
+ {
+ getAMQPConnection().notifyWork(this);
+ }
}
void updateAllConsumerNotifyWorkDesired()
@@ -3733,7 +3734,7 @@ public class AMQChannel
_blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
}
- if(!_consumersWithPendingWork.isEmpty())
+ if(!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
{
if (_processPendingIterator == null || !_processPendingIterator.hasNext())
{
@@ -3751,7 +3752,7 @@ public class AMQChannel
}
}
- return !_consumersWithPendingWork.isEmpty();
+ return !_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting();
}
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Fri Nov 18 11:10:23 2016
@@ -56,7 +56,7 @@ public abstract class ConsumerTarget_0_8
public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel,
AMQShortString consumerTag, FieldTable filters,
- FlowCreditManager creditManager, final boolean multiQueue)
+ FlowCreditManager_0_8 creditManager, final boolean multiQueue)
{
return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(),
multiQueue);
@@ -65,7 +65,7 @@ public abstract class ConsumerTarget_0_8
public static ConsumerTarget_0_8 createGetNoAckTarget(final AMQChannel channel,
final AMQShortString consumerTag,
final FieldTable filters,
- final FlowCreditManager creditManager,
+ final FlowCreditManager_0_8 creditManager,
final ClientDeliveryMethod deliveryMethod)
{
return new GetNoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod);
@@ -76,7 +76,7 @@ public abstract class ConsumerTarget_0_8
public BrowserConsumer(AMQChannel channel,
AMQShortString consumerTag,
FieldTable filters,
- FlowCreditManager creditManager,
+ FlowCreditManager_0_8 creditManager,
ClientDeliveryMethod deliveryMethod,
boolean multiQueue)
{
@@ -110,7 +110,7 @@ public abstract class ConsumerTarget_0_8
public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
AMQShortString consumerTag, FieldTable filters,
- FlowCreditManager creditManager,
+ FlowCreditManager_0_8 creditManager,
boolean multiQueue)
{
return new NoAckConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(),
@@ -124,7 +124,7 @@ public abstract class ConsumerTarget_0_8
public NoAckConsumer(AMQChannel channel,
AMQShortString consumerTag,
FieldTable filters,
- FlowCreditManager creditManager,
+ FlowCreditManager_0_8 creditManager,
ClientDeliveryMethod deliveryMethod,
boolean multiQueue)
{
@@ -190,7 +190,7 @@ public abstract class ConsumerTarget_0_8
{
public GetNoAckConsumer(AMQChannel channel,
AMQShortString consumerTag, FieldTable filters,
- FlowCreditManager creditManager,
+ FlowCreditManager_0_8 creditManager,
ClientDeliveryMethod deliveryMethod)
{
super(channel, consumerTag, filters, creditManager, deliveryMethod, false);
@@ -202,7 +202,7 @@ public abstract class ConsumerTarget_0_8
public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel,
AMQShortString consumerTag,
FieldTable filters,
- FlowCreditManager creditManager,
+ FlowCreditManager_0_8 creditManager,
boolean multiQueue)
{
return new AckConsumer(channel,
@@ -215,7 +215,7 @@ public abstract class ConsumerTarget_0_8
public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel,
AMQShortString consumerTag, FieldTable filters,
- FlowCreditManager creditManager,
+ FlowCreditManager_0_8 creditManager,
ClientDeliveryMethod deliveryMethod)
{
return new AckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, false);
@@ -225,7 +225,7 @@ public abstract class ConsumerTarget_0_8
{
public AckConsumer(AMQChannel channel,
AMQShortString consumerTag, FieldTable filters,
- FlowCreditManager creditManager,
+ FlowCreditManager_0_8 creditManager,
ClientDeliveryMethod deliveryMethod,
boolean multiQueue)
{
@@ -270,14 +270,14 @@ public abstract class ConsumerTarget_0_8
private final AMQShortString _consumerTag;
- private final FlowCreditManager _creditManager;
+ private final FlowCreditManager_0_8 _creditManager;
private final Boolean _autoClose;
public ConsumerTarget_0_8(AMQChannel channel,
AMQShortString consumerTag,
FieldTable arguments,
- FlowCreditManager creditManager,
+ FlowCreditManager_0_8 creditManager,
ClientDeliveryMethod deliveryMethod,
boolean multiQueue)
{
@@ -392,11 +392,24 @@ public abstract class ConsumerTarget_0_8
_creditManager.restoreCredit(1, message.getSize());
if(_creditManager.hasCredit() != hasCredit)
{
- _channel.updateAllConsumerNotifyWorkDesired();
+ if (hasCredit || !_creditManager.isNotBytesLimitedAndHighPrefetch())
+ {
+ _channel.updateAllConsumerNotifyWorkDesired();
+ }
}
else if (hasCredit)
{
- notifyWork();
+ if (_creditManager.isNotBytesLimitedAndHighPrefetch())
+ {
+ if (_creditManager.isCreditOverBatchLimit())
+ {
+ _channel.updateAllConsumerNotifyWorkDesired();
+ }
+ }
+ else
+ {
+ notifyWork();
+ }
}
}
Copied: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/FlowCreditManager_0_8.java (from r1770362, qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/FlowCreditManager_0_8.java?p2=qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/FlowCreditManager_0_8.java&p1=qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java&r1=1770362&r2=1770363&rev=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/FlowCreditManager_0_8.java Fri Nov 18 11:10:23 2016
@@ -20,31 +20,11 @@
package org.apache.qpid.server.protocol.v0_8;
import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.transport.ProtocolEngine;
-public class NoAckCreditManager implements FlowCreditManager
+public interface FlowCreditManager_0_8 extends FlowCreditManager
{
- private final ProtocolEngine _protocolEngine;
- public NoAckCreditManager(ProtocolEngine protocolEngine)
- {
- _protocolEngine = protocolEngine;
- }
+ boolean isNotBytesLimitedAndHighPrefetch();
- @Override
- public void restoreCredit(final long messageCredit, final long bytesCredit)
- {
- }
-
- @Override
- public boolean hasCredit()
- {
- return !_protocolEngine.isTransportBlockedForWriting();
- }
-
- @Override
- public boolean useCreditForMessage(final long msgSize)
- {
- return hasCredit();
- }
+ boolean isCreditOverBatchLimit();
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/InfiniteCreditCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/InfiniteCreditCreditManager.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/InfiniteCreditCreditManager.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/InfiniteCreditCreditManager.java Fri Nov 18 11:10:23 2016
@@ -20,9 +20,8 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.qpid.server.flow.FlowCreditManager;
-public class InfiniteCreditCreditManager implements FlowCreditManager
+public class InfiniteCreditCreditManager implements FlowCreditManager_0_8
{
public InfiniteCreditCreditManager()
@@ -43,4 +42,15 @@ public class InfiniteCreditCreditManager
return true;
}
+ @Override
+ public boolean isNotBytesLimitedAndHighPrefetch()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean isCreditOverBatchLimit()
+ {
+ return false;
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java Fri Nov 18 11:10:23 2016
@@ -21,13 +21,11 @@
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.transport.ProtocolEngine;
-
-public class Pre0_10CreditManager implements FlowCreditManager
+public class Pre0_10CreditManager implements FlowCreditManager_0_8
{
- private final ProtocolEngine _protocolEngine;
+ private static final long HIGH_PREFETCH_LIMIT = 100L;
+ private static final long BATCH_LIMIT = 10L;
private volatile long _bytesCreditLimit;
private volatile long _messageCreditLimit;
@@ -35,10 +33,8 @@ public class Pre0_10CreditManager implem
private volatile long _messageCredit;
public Pre0_10CreditManager(long bytesCreditLimit,
- long messageCreditLimit,
- ProtocolEngine protocolEngine)
+ long messageCreditLimit)
{
- _protocolEngine = protocolEngine;
_bytesCreditLimit = bytesCreditLimit;
_messageCreditLimit = messageCreditLimit;
_bytesCredit = bytesCreditLimit;
@@ -89,17 +85,11 @@ public class Pre0_10CreditManager implem
public synchronized boolean hasCredit()
{
return (_bytesCreditLimit == 0L || _bytesCredit > 0)
- && (_messageCreditLimit == 0L || _messageCredit > 0)
- && !_protocolEngine.isTransportBlockedForWriting();
+ && (_messageCreditLimit == 0L || _messageCredit > 0);
}
public synchronized boolean useCreditForMessage(final long msgSize)
{
- if (_protocolEngine.isTransportBlockedForWriting())
- {
- return false;
- }
-
if (_messageCreditLimit != 0)
{
if (_messageCredit <= 0)
@@ -119,4 +109,16 @@ public class Pre0_10CreditManager implem
_bytesCredit -= msgSize;
return true;
}
+
+ @Override
+ public boolean isNotBytesLimitedAndHighPrefetch()
+ {
+ return _bytesCreditLimit == 0L && _messageCreditLimit > HIGH_PREFETCH_LIMIT;
+ }
+
+ @Override
+ public boolean isCreditOverBatchLimit()
+ {
+ return _messageCredit > BATCH_LIMIT;
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManagerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManagerTest.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManagerTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManagerTest.java Fri Nov 18 11:10:23 2016
@@ -19,12 +19,9 @@
package org.apache.qpid.server.protocol.v0_8;
-import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
-import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -42,7 +39,7 @@ public class Pre0_10CreditManagerTest ex
public void testBasicMessageCredit() throws Exception
{
- _creditManager = new Pre0_10CreditManager(0, 0, _protocolEngine);
+ _creditManager = new Pre0_10CreditManager(0, 0);
_creditManager.setCreditLimits(0, 2);
assertTrue("Creditmanager should have credit", _creditManager.hasCredit());
assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(37));
@@ -57,7 +54,7 @@ public class Pre0_10CreditManagerTest ex
public void testBytesLimitDoesNotPreventLargeMessage() throws Exception
{
- _creditManager = new Pre0_10CreditManager(0, 0, _protocolEngine);
+ _creditManager = new Pre0_10CreditManager(0, 0);
_creditManager.setCreditLimits(10, 0);
assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(3));
assertFalse("Creditmanager should not be able to useCredit", _creditManager.useCreditForMessage(30));
@@ -67,7 +64,7 @@ public class Pre0_10CreditManagerTest ex
public void testUseCreditWithNegativeMessageCredit() throws Exception
{
- _creditManager = new Pre0_10CreditManager(0, 0, _protocolEngine);
+ _creditManager = new Pre0_10CreditManager(0, 0);
_creditManager.setCreditLimits(0, 3);
assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(37));
assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(37));
@@ -85,7 +82,7 @@ public class Pre0_10CreditManagerTest ex
public void testUseCreditWithNegativeBytesCredit() throws Exception
{
- _creditManager = new Pre0_10CreditManager(0, 0, _protocolEngine);
+ _creditManager = new Pre0_10CreditManager(0, 0);
_creditManager.setCreditLimits(3, 0);
assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(1));
assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(1));
@@ -103,7 +100,7 @@ public class Pre0_10CreditManagerTest ex
public void testCreditAccountingWhileMessageLimitNotSet() throws Exception
{
- _creditManager = new Pre0_10CreditManager(0, 0, _protocolEngine);
+ _creditManager = new Pre0_10CreditManager(0, 0);
assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(37));
assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(37));
assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(37));
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Fri Nov 18 11:10:23 2016
@@ -276,8 +276,7 @@ class ConsumerTarget_1_0 extends Abstrac
public boolean allocateCredit(final ServerMessage msg)
{
ProtocolEngine protocolEngine = getSession().getConnection();
- final boolean hasCredit =
- _link.isAttached() && getEndpoint().hasCreditToSend() && !protocolEngine.isTransportBlockedForWriting();
+ final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend();
updateNotifyWorkDesired();
@@ -310,8 +309,7 @@ class ConsumerTarget_1_0 extends Abstrac
{
updateNotifyWorkDesired();
- ProtocolEngine protocolEngine = getSession().getConnection();
- if (isSuspended() && getEndpoint() != null && !protocolEngine.isTransportBlockedForWriting())
+ if (isSuspended() && getEndpoint() != null)
{
_transactionId = _link.getTransactionId();
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1770363&r1=1770362&r2=1770363&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Fri Nov 18 11:10:23 2016
@@ -1257,11 +1257,11 @@ public class Session_1_0 implements AMQS
{
ConsumerTarget_1_0 target = link.getConsumerTarget();
target.flowStateChanged();
-
-
}
-
-
+ if (!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
+ {
+ getAMQPConnection().notifyWork(this);
+ }
}
@Override
@@ -1541,7 +1541,7 @@ public class Session_1_0 implements AMQS
}
- if(!_consumersWithPendingWork.isEmpty())
+ if(!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
{
if (_processPendingIterator == null || !_processPendingIterator.hasNext())
{
@@ -1559,7 +1559,7 @@ public class Session_1_0 implements AMQS
}
}
- return !_consumersWithPendingWork.isEmpty();
+ return !_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting();
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org