You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/04/19 17:07:57 UTC

svn commit: r530441 - /incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java

Author: ritchiem
Date: Thu Apr 19 08:07:54 2007
New Revision: 530441

URL: http://svn.apache.org/viewvc?view=rev&rev=530441
Log:
QPID-459 - NoLocal broken when messages already exist on queue from consumer. With test.

ConcurrentSelectorDeliveryManager - method changes from hasFilter to filtersMessages.

Forgot to include the file in the commit.

Modified:
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=530441&r1=530440&r2=530441
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Thu Apr 19 08:07:54 2007
@@ -28,7 +28,6 @@
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -372,7 +371,7 @@
         {
             for (Subscription sub : _subscriptions.getSubscriptions())
             {
-                if (!sub.isSuspended() && sub.hasFilters())
+                if (!sub.isSuspended() && sub.filtersMessages())
                 {
                     Queue<AMQMessage> preDeliveryQueue = sub.getPreDeliveryQueue();
                     for (AMQMessage msg : messageList)
@@ -613,6 +612,11 @@
             _processingThreadName = Thread.currentThread().getName();
         }
 
+        if (_log.isDebugEnabled())
+        {
+            _log.debug(debugIdentity() + "Running process Queue." + currentStatus());
+        }
+
         // Continue to process delivery while we haveSubscribers and messages
         boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
 
@@ -633,11 +637,17 @@
                 }
             }
         }
+
+        if (_log.isDebugEnabled())
+        {
+            _log.debug(debugIdentity() + "Done process Queue." + currentStatus());
+        }
+
     }
 
 //    private void sendNextMessage(Subscription sub)
 //    {
-//        if (sub.hasFilters())
+//        if (sub.filtersMessages())
 //        {
 //            sendNextMessage(sub, sub.getPreDeliveryQueue());
 //            if (sub.isAutoClose())
@@ -817,6 +827,10 @@
             //are we already running? if so, don't re-run
             if (_processing.compareAndSet(false, true))
             {
+                if (_log.isDebugEnabled())
+                {
+                    _log.debug(debugIdentity() + "Executing Async process.");
+                }
                 executor.execute(asyncDelivery);
             }
         }