You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2011/03/25 14:01:11 UTC

svn commit: r1085354 - /qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java

Author: robbie
Date: Fri Mar 25 13:01:10 2011
New Revision: 1085354

URL: http://svn.apache.org/viewvc?rev=1085354&view=rev
Log:
QPID-3165: ensure all subscriptions are checked before making the decision on whether to stop delivering. Use a boolean instead of doing a 0/1 toggle and update variables to generally clarify logic. Use an int instead of a Long for the iteration decrementing

Modified:
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1085354&r1=1085353&r2=1085354&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Fri Mar 25 13:01:10 2011
@@ -1503,14 +1503,40 @@ public class SimpleAMQQueue implements A
         }
     }
 
+    /**
+     * Used by queue Runners to asynchronously deliver messages to consumers.
+     *
+     * A queue Runner is started whenever a state change occurs, e.g when a new
+     * message arrives on the queue and cannot be immediately delivered to a
+     * subscription (i.e. asynchronous delivery is required). Unless there are
+     * SubFlushRunners operating (due to subscriptions unsuspending) which are
+     * capable of accepting/delivering all messages then these messages would
+     * otherwise remain on the queue.
+     *
+     * processQueue should be running while there are messages on the queue AND
+     * there are subscriptions that can deliver them. If there are no
+     * subscriptions capable of delivering the remaining messages on the queue
+     * then processQueue should stop to prevent spinning.
+     *
+     * Since processQueue is runs in a fixed size Executor, it should not run
+     * indefinitely to prevent starving other tasks of CPU (e.g jobs to process
+     * incoming messages may not be able to be scheduled in the thread pool
+     * because all threads are working on clearing down large queues). To solve
+     * this problem, after an arbitrary number of message deliveries the
+     * processQueue job stops iterating, resubmits itself to the executor, and
+     * ends the current instance
+     *
+     * @param runner the Runner to schedule
+     * @throws AMQException
+     */
     private void processQueue(Runnable runner) throws AMQException
     {
         long stateChangeCount;
         long previousStateChangeCount = Long.MIN_VALUE;
         boolean deliveryIncomplete = true;
 
-        int extraLoops = 1;
-        Long iterations = new Long(MAX_ASYNC_DELIVERIES);
+        boolean lastLoop = false;
+        int iterations = MAX_ASYNC_DELIVERIES;
 
         _asynchronousRunner.compareAndSet(runner, null);
 
@@ -1527,12 +1553,14 @@ public class SimpleAMQQueue implements A
 
             if (previousStateChangeCount != stateChangeCount)
             {
-                extraLoops = 1;
+                //further asynchronous delivery is required since the
+                //previous loop. keep going if iteration slicing allows.
+                lastLoop = false;
             }
 
             previousStateChangeCount = stateChangeCount;
-            deliveryIncomplete = _subscriptionList.size() != 0;
-            boolean done;
+            boolean allSubscriptionsDone = true;
+            boolean subscriptionDone;
 
             SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
             //iterate over the subscribers and try to advance their pointer
@@ -1542,29 +1570,26 @@ public class SimpleAMQQueue implements A
                 sub.getSendLock();
                 try
                 {
-                    done = attemptDelivery(sub);
-                    if (done)
+                    //attempt delivery. returns true if no further delivery currently possible to this sub
+                    subscriptionDone = attemptDelivery(sub);
+                    if (subscriptionDone)
                     {
-                        if (extraLoops == 0)
+                        //close autoClose subscriptions if we are not currently intent on continuing
+                        if (lastLoop && sub.isAutoClose())
                         {
-                            deliveryIncomplete = false;
-                            if (sub.isAutoClose())
-                            {
-                                unregisterSubscription(sub);
-
-                                ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
-                                converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
-                            }
-                        }
-                        else
-                        {
-                            extraLoops--;
+                            unregisterSubscription(sub);
+
+                            ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
+                            converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
                         }
                     }
                     else
                     {
+                        //this subscription can accept additional deliveries, so we must 
+                        //keep going after this (if iteration slicing allows it)
+                        allSubscriptionsDone = false;
+                        lastLoop = false;
                         iterations--;
-                        extraLoops = 1;
                     }
                 }
                 finally
@@ -1572,10 +1597,34 @@ public class SimpleAMQQueue implements A
                     sub.releaseSendLock();
                 }
             }
+
+            if(allSubscriptionsDone && lastLoop)
+            {
+                //We have done an extra loop already and there are again
+                //again no further delivery attempts possible, only
+                //keep going if state change demands it.
+                deliveryIncomplete = false;
+            }
+            else if(allSubscriptionsDone)
+            {
+                //All subscriptions reported being done, but we have to do
+                //an extra loop if the iterations are not exhausted and
+                //there is still any work to be done
+                deliveryIncomplete = _subscriptionList.size() != 0;
+                lastLoop = true;
+            }
+            else
+            {
+                //some subscriptions can still accept more messages,
+                //keep going if iteration count allows.
+                lastLoop = false;
+                deliveryIncomplete = true;
+            }
+
             _asynchronousRunner.set(null);
         }
 
-        // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit
+        // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit
         // therefore we should schedule this runner again (unless someone beats us to it :-) ).
         if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner))
         {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org