You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/03/16 23:03:45 UTC

svn commit: r1667144 - in /qpid/trunk/qpid/java/broker-plugins: amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/

Author: rgodfrey
Date: Mon Mar 16 22:03:45 2015
New Revision: 1667144

URL: http://svn.apache.org/r1667144
Log:
QPID-6429 : ensure that when message suspension is set, all targets have finished any in flight deliveries

Modified:
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java?rev=1667144&r1=1667143&r2=1667144&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java Mon Mar 16 22:03:45 2015
@@ -92,15 +92,23 @@ public class ProtocolEngine_0_10  extend
     {
         _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null);
 
-        if(!messageAssignmentSuspended)
+        for(AMQSessionModel<?,?> session : _connection.getSessionModels())
         {
-           for(AMQSessionModel<?,?> session : _connection.getSessionModels())
-           {
-               for(Consumer<?> consumer : session.getConsumers())
-               {
-                   ((ConsumerImpl)consumer).getTarget().notifyCurrentState();
-               }
-           }
+            for (Consumer<?> consumer : session.getConsumers())
+            {
+                ConsumerImpl consumerImpl = (ConsumerImpl) consumer;
+                if (!messageAssignmentSuspended)
+                {
+                    consumerImpl.getTarget().notifyCurrentState();
+                }
+                else
+                {
+                    // ensure that by the time the method returns, no consumer can be in the process of
+                    // delivering a message.
+                    consumerImpl.getSendLock();
+                    consumerImpl.releaseSendLock();
+                }
+            }
         }
     }
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java?rev=1667144&r1=1667143&r2=1667144&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java Mon Mar 16 22:03:45 2015
@@ -175,13 +175,21 @@ public class ProtocolEngine_1_0_0_SASL i
     {
         _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null);
 
-        if(!messageAssignmentSuspended)
+        for(AMQSessionModel<?,?> session : _connection.getSessionModels())
         {
-            for(AMQSessionModel<?,?> session : _connection.getSessionModels())
+            for(Consumer<?> consumer : session.getConsumers())
             {
-                for(Consumer<?> consumer : session.getConsumers())
+                ConsumerImpl consumerImpl = (ConsumerImpl) consumer;
+                if (!messageAssignmentSuspended)
                 {
-                    ((ConsumerImpl)consumer).getTarget().notifyCurrentState();
+                    consumerImpl.getTarget().notifyCurrentState();
+                }
+                else
+                {
+                    // ensure that by the time the method returns, no consumer can be in the process of
+                    // delivering a message.
+                    consumerImpl.getSendLock();
+                    consumerImpl.releaseSendLock();
                 }
             }
         }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org