You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/08/21 13:54:32 UTC

svn commit: r1375484 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java

Author: gtully
Date: Tue Aug 21 11:54:31 2012
New Revision: 1375484

URL: http://svn.apache.org/viewvc?rev=1375484&view=rev
Log:
avoid repated additions to the full list on dispatch, may need to revisit the pendingWakeups count on iterate, seems we will attempt to dispatch too often which is a waste of cpu in the event that there are pending messages or new sends and slow consumers, relates to: https://issues.apache.org/jira/browse/AMQ-1902

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1375484&r1=1375483&r2=1375484&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Tue Aug 21 11:54:31 2012
@@ -1693,7 +1693,7 @@ public class Queue extends BaseDestinati
             pendingWakeups.incrementAndGet();
             this.taskRunner.wakeup();
         } catch (InterruptedException e) {
-            LOG.warn("Async task tunner failed to wakeup ", e);
+            LOG.warn("Async task runner failed to wakeup ", e);
         }
     }
 
@@ -1879,19 +1879,19 @@ public class Queue extends BaseDestinati
                     interestCount++;
                     continue;
                 }
-                if (!fullConsumers.contains(s) && !s.isFull()) {
-                    if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference) node).isAcked() ) {
-                        // Dispatch it.
-                        s.add(node);
-                        target = s;
-                        break;
-                    }
-                } else {
-                    // no further dispatch of list to a full consumer to
-                    // avoid out of order message receipt
-                    fullConsumers.add(s);
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("Sub full " + s);
+                if (!fullConsumers.contains(s)) {
+                    if (!s.isFull()) {
+                        if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference) node).isAcked() ) {
+                            // Dispatch it.
+                            s.add(node);
+                            target = s;
+                            break;
+                        }
+                    } else {
+                        // no further dispatch of list to a full consumer to
+                        // avoid out of order message receipt
+                        fullConsumers.add(s);
+                        LOG.trace("Subscription full {}", s);
                     }
                 }
                 // make sure it gets dispatched again