You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/03/16 23:01:30 UTC

svn commit: r1667142 - /qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java

Author: rgodfrey
Date: Mon Mar 16 22:01:30 2015
New Revision: 1667142

URL: http://svn.apache.org/r1667142
Log:
QPID-6429 : ensure that when message suspension is set, all targets have finished any in flight deliveries

Modified:
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1667142&r1=1667141&r2=1667142&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Mon Mar 16 22:01:30 2015
@@ -219,13 +219,21 @@ public class AMQProtocolEngine implement
     public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
     {
         _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null);
-        if(!messageAssignmentSuspended)
+        for(AMQSessionModel<?,?> session : getSessionModels())
         {
-            for(AMQSessionModel<?,?> session : getSessionModels())
+            for (Consumer<?> consumer : session.getConsumers())
             {
-                for(Consumer<?> consumer : session.getConsumers())
+                ConsumerImpl consumerImpl = (ConsumerImpl) consumer;
+                if (!messageAssignmentSuspended)
                 {
-                    ((ConsumerImpl)consumer).getTarget().notifyCurrentState();
+                    consumerImpl.getTarget().notifyCurrentState();
+                }
+                else
+                {
+                    // ensure that by the time the method returns, no consumer can be in the process of
+                    // delivering a message.
+                    consumerImpl.getSendLock();
+                    consumerImpl.releaseSendLock();
                 }
             }
         }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org