You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/08/21 13:54:32 UTC
svn commit: r1375484 -
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Author: gtully
Date: Tue Aug 21 11:54:31 2012
New Revision: 1375484
URL: http://svn.apache.org/viewvc?rev=1375484&view=rev
Log:
avoid repated additions to the full list on dispatch, may need to revisit the pendingWakeups count on iterate, seems we will attempt to dispatch too often which is a waste of cpu in the event that there are pending messages or new sends and slow consumers, relates to: https://issues.apache.org/jira/browse/AMQ-1902
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1375484&r1=1375483&r2=1375484&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Tue Aug 21 11:54:31 2012
@@ -1693,7 +1693,7 @@ public class Queue extends BaseDestinati
pendingWakeups.incrementAndGet();
this.taskRunner.wakeup();
} catch (InterruptedException e) {
- LOG.warn("Async task tunner failed to wakeup ", e);
+ LOG.warn("Async task runner failed to wakeup ", e);
}
}
@@ -1879,19 +1879,19 @@ public class Queue extends BaseDestinati
interestCount++;
continue;
}
- if (!fullConsumers.contains(s) && !s.isFull()) {
- if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference) node).isAcked() ) {
- // Dispatch it.
- s.add(node);
- target = s;
- break;
- }
- } else {
- // no further dispatch of list to a full consumer to
- // avoid out of order message receipt
- fullConsumers.add(s);
- if (LOG.isTraceEnabled()) {
- LOG.trace("Sub full " + s);
+ if (!fullConsumers.contains(s)) {
+ if (!s.isFull()) {
+ if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference) node).isAcked() ) {
+ // Dispatch it.
+ s.add(node);
+ target = s;
+ break;
+ }
+ } else {
+ // no further dispatch of list to a full consumer to
+ // avoid out of order message receipt
+ fullConsumers.add(s);
+ LOG.trace("Subscription full {}", s);
}
}
// make sure it gets dispatched again