You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/09/30 17:54:47 UTC

svn commit: r820316 - /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java

Author: ritchiem
Date: Wed Sep 30 15:54:47 2009
New Revision: 820316

URL: http://svn.apache.org/viewvc?rev=820316&view=rev
Log:
QPID-2116 : Ensure that AMQChannel correctly notifies all running Subscription deliveries that the Channel has been suspended. This is down by taking out the subcription sendLock on each subscription on this Channel.

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=820316&r1=820315&r2=820316&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Wed Sep 30 15:54:47 2009
@@ -783,16 +783,31 @@
         return _unacknowledgedMessageMap;
     }
 
-
+    /**
+     * Called from the ChannelFlowHandler to suspend this Channel
+     * @param suspended boolean, should this Channel be suspended
+     */
     public void setSuspended(boolean suspended)
     {
-
-
         boolean wasSuspended = _suspended.getAndSet(suspended);
         if (wasSuspended != suspended)
         {
-            _actor.message(_logSubject, ChannelMessages.CHN_1002(suspended ? "Stopped" : "Started"));
+            // Log Flow Started before we start the subscriptions
+            if (!suspended)
+            {
+                _actor.message(_logSubject, ChannelMessages.CHN_1002("Started"));
+            }
 
+            // This section takes two different approaches to perform to perform
+            // the same function. Ensuring that the Subscription has taken note
+            // of the change in Channel State
+
+            // Here we have become unsuspended and so we ask each the queue to
+            // perform an Async delivery for each of the subscriptions in this
+            // Channel. The alternative would be to ensure that the subscription
+            // had received the change in suspension state. That way the logic
+            // behind decieding to start an async delivery was located with the
+            // Subscription.
             if (wasSuspended)
             {
                 // may need to deliver queued messages
@@ -801,6 +816,38 @@
                     s.getQueue().deliverAsync(s);
                 }
             }
+
+
+            // Here we have become suspended so we need to ensure that each of
+            // the Subscriptions has noticed this change so that we can be sure
+            // they are not still sending messages. Again the code here is a
+            // very simplistic approach to ensure that the change of suspension
+            // has been noticed by each of the Subscriptions. Unlike the above
+            // case we don't actually need to do anything else.
+            if (!wasSuspended)
+            {
+                // may need to deliver queued messages
+                for (Subscription s : _tag2SubscriptionMap.values())
+                {
+                    try
+                    {
+                        s.getSendLock();
+                    }
+                    finally
+                    {
+                        s.releaseSendLock();
+                    }
+                }
+            }
+
+
+            // Log Suspension only after we have confirmed all suspensions are
+            // stopped.
+            if (suspended)
+            {
+                _actor.message(_logSubject, ChannelMessages.CHN_1002("Stopped"));
+            }
+
         }
     }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org