You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/05/17 14:12:34 UTC

svn commit: r538882 - /incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java

Author: ritchiem
Date: Thu May 17 05:12:34 2007
New Revision: 538882

URL: http://svn.apache.org/viewvc?view=rev&rev=538882
Log:
Fix for broken CSDM message purging routine that was causing python test_get to fail.

Replaced long while control with a method call that is easier to understand and has more comments.

Modified:
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=538882&r1=538881&r2=538882
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Thu May 17 05:12:34 2007
@@ -451,13 +451,7 @@
         AMQMessage message = messages.peek();
 
         //while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.)
-        while (message != null
-               && (
-                ((sub != null && !sub.isBrowser()) || message.isTaken(_queue))
-                || sub == null)
-               && (message.taken(_queue, sub) // Message not taken by another consumer ... unless it is expired
-                   || (sub == null || message.expired(sub.getChannel().getStoreContext(), _queue))) // Message not expired
-                )
+        while (purgeMessage(message, sub))
         {
             //remove the already taken message or expired
             AMQMessage removed = messages.poll();
@@ -476,6 +470,54 @@
         }
 
         return message;
+    }
+
+    /**
+     * 
+     * @param message
+     * @param sub
+     * @return
+     * @throws AMQException
+     */
+    private boolean purgeMessage(AMQMessage message, Subscription sub) throws AMQException
+    {
+        //Original.. complicated while loop control
+//                (message != null
+//                            && (
+//                ((sub != null && !sub.isBrowser()) || message.isTaken(_queue))
+//                || sub == null)
+//                            && message.taken(_queue, sub));
+
+        boolean purge = false;
+
+        // if the message is null then don't purge as we have no messagse.
+        if (message != null)
+        {
+            // if we have a subscriber perform message checks
+            if (sub != null)
+            {
+                // Check that the message hasn't expired.
+                if (message.expired(sub.getChannel().getStoreContext(), _queue))
+                {
+                    return true;
+                }
+
+                // if we have a queue browser(we don't purge) so check mark the message as taken
+                purge = ((!sub.isBrowser() || message.isTaken(_queue)));
+            }
+            else
+            {
+                // if there is no subscription we are doing
+                // a get or purging so mark message as taken.
+                message.isTaken(_queue);
+                // and then ensure that it gets purged
+                purge = true;
+            }
+        }
+
+        // if we are purging then ensure we mark this message taken for the current subscriber
+        // the current subscriber may be null in the case of a get or a purge but this is ok.
+        return purge && message.taken(_queue, sub);
     }
 
     public void sendNextMessage(Subscription sub, AMQQueue queue)//Queue<AMQMessage> messageQueue)