You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/05/17 14:12:34 UTC
svn commit: r538882 -
/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Author: ritchiem
Date: Thu May 17 05:12:34 2007
New Revision: 538882
URL: http://svn.apache.org/viewvc?view=rev&rev=538882
Log:
Fix for broken CSDM message purging routine that was causing python test_get to fail.
Replaced long while control with a method call that is easier to understand and has more comments.
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=538882&r1=538881&r2=538882
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Thu May 17 05:12:34 2007
@@ -451,13 +451,7 @@
AMQMessage message = messages.peek();
//while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.)
- while (message != null
- && (
- ((sub != null && !sub.isBrowser()) || message.isTaken(_queue))
- || sub == null)
- && (message.taken(_queue, sub) // Message not taken by another consumer ... unless it is expired
- || (sub == null || message.expired(sub.getChannel().getStoreContext(), _queue))) // Message not expired
- )
+ while (purgeMessage(message, sub))
{
//remove the already taken message or expired
AMQMessage removed = messages.poll();
@@ -476,6 +470,54 @@
}
return message;
+ }
+
+ /**
+ *
+ * @param message
+ * @param sub
+ * @return
+ * @throws AMQException
+ */
+ private boolean purgeMessage(AMQMessage message, Subscription sub) throws AMQException
+ {
+ //Original.. complicated while loop control
+// (message != null
+// && (
+// ((sub != null && !sub.isBrowser()) || message.isTaken(_queue))
+// || sub == null)
+// && message.taken(_queue, sub));
+
+ boolean purge = false;
+
+ // if the message is null then don't purge as we have no messagse.
+ if (message != null)
+ {
+ // if we have a subscriber perform message checks
+ if (sub != null)
+ {
+ // Check that the message hasn't expired.
+ if (message.expired(sub.getChannel().getStoreContext(), _queue))
+ {
+ return true;
+ }
+
+ // if we have a queue browser(we don't purge) so check mark the message as taken
+ purge = ((!sub.isBrowser() || message.isTaken(_queue)));
+ }
+ else
+ {
+ // if there is no subscription we are doing
+ // a get or purging so mark message as taken.
+ message.isTaken(_queue);
+ // and then ensure that it gets purged
+ purge = true;
+ }
+ }
+
+ // if we are purging then ensure we mark this message taken for the current subscriber
+ // the current subscriber may be null in the case of a get or a purge but this is ok.
+ return purge && message.taken(_queue, sub);
}
public void sendNextMessage(Subscription sub, AMQQueue queue)//Queue<AMQMessage> messageQueue)