You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/03/16 23:01:30 UTC
svn commit: r1667142 -
/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
Author: rgodfrey
Date: Mon Mar 16 22:01:30 2015
New Revision: 1667142
URL: http://svn.apache.org/r1667142
Log:
QPID-6429 : ensure that when message suspension is set, all targets have finished any in flight deliveries
Modified:
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1667142&r1=1667141&r2=1667142&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Mon Mar 16 22:01:30 2015
@@ -219,13 +219,21 @@ public class AMQProtocolEngine implement
public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
{
_messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null);
- if(!messageAssignmentSuspended)
+ for(AMQSessionModel<?,?> session : getSessionModels())
{
- for(AMQSessionModel<?,?> session : getSessionModels())
+ for (Consumer<?> consumer : session.getConsumers())
{
- for(Consumer<?> consumer : session.getConsumers())
+ ConsumerImpl consumerImpl = (ConsumerImpl) consumer;
+ if (!messageAssignmentSuspended)
{
- ((ConsumerImpl)consumer).getTarget().notifyCurrentState();
+ consumerImpl.getTarget().notifyCurrentState();
+ }
+ else
+ {
+ // ensure that by the time the method returns, no consumer can be in the process of
+ // delivering a message.
+ consumerImpl.getSendLock();
+ consumerImpl.releaseSendLock();
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org