You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/08/14 10:24:12 UTC
svn commit: r685806 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region:
BaseDestination.java PrefetchSubscription.java Queue.java
Author: rajdavies
Date: Thu Aug 14 01:24:11 2008
New Revision: 685806
URL: http://svn.apache.org/viewvc?rev=685806&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1866
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=685806&r1=685805&r2=685806&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Thu Aug 14 01:24:11 2008
@@ -37,7 +37,7 @@
* The default number of messages to page in to the destination
* from persistent storage
*/
- public static final int DEFAULT_PAGE_SIZE=100;
+ public static final int DEFAULT_PAGE_SIZE=200;
protected final ActiveMQDestination destination;
protected final Broker broker;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=685806&r1=685805&r2=685806&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Thu Aug 14 01:24:11 2008
@@ -378,9 +378,9 @@
}
}
if (callDispatchMatched && destination != null) {
- if (destination.isLazyDispatch()) {
+// if (destination.isLazyDispatch()) {
destination.wakeup();
- }
+// }
dispatchPending();
} else {
if (isSlave()) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=685806&r1=685805&r2=685806&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Aug 14 01:24:11 2008
@@ -81,6 +81,8 @@
protected final List<Subscription> consumers = new ArrayList<Subscription>(50);
protected PendingMessageCursor messages;
private final LinkedHashMap<MessageId,QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId,QueueMessageReference>();
+ // Messages that are paged in but have not yet been targeted at a subscription
+ private List<QueueMessageReference> pagedInPendingDispatch = new ArrayList<QueueMessageReference>(100);
private MessageGroupMap messageGroupOwners;
private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
@@ -317,6 +319,7 @@
}
public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
+// System.out.println(getName()+" send "+message.getMessageId());
final ConnectionContext context = producerExchange.getConnectionContext();
// There is delay between the client sending it and it arriving at the
// destination.. it may have expired.
@@ -946,6 +949,18 @@
result = !messages.isEmpty();
}
+ // Kinda ugly.. but I think dispatchLock is the only mutex protecting the
+ // pagedInPendingDispatch variable.
+ dispatchLock.lock();
+ try {
+ result |= !pagedInPendingDispatch.isEmpty();
+ } finally {
+ dispatchLock.unlock();
+ }
+
+ // Perhaps we should page always into the pagedInPendingDispatch list is
+ // !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty()
+ // then we do a dispatch.
if (result) {
try {
pageInMessages(false);
@@ -1134,58 +1149,76 @@
}
private void doDispatch(List<QueueMessageReference> list) throws Exception {
- if (list != null) {
- List<Subscription> consumers;
- dispatchLock.lock();
- try {
- synchronized (this.consumers) {
- consumers = new ArrayList<Subscription>(this.consumers);
+ dispatchLock.lock();
+ try {
+ if(!pagedInPendingDispatch.isEmpty()) {
+// System.out.println(getName()+": dispatching from pending: "+pagedInPendingDispatch.size());
+ // Try to first dispatch anything that had not been dispatched before.
+ pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
+// System.out.println(getName()+": new pending list1: "+pagedInPendingDispatch.size());
+ }
+ // and now see if we can dispatch the new stuff.. and append to the pending
+ // list anything that does not actually get dispatched.
+ if (list != null && !list.isEmpty()) {
+// System.out.println(getName()+": dispatching from paged in: "+list.size());
+ pagedInPendingDispatch.addAll(doActualDispatch(list));
+// System.out.println(getName()+": new pending list2: "+pagedInPendingDispatch.size());
+ }
+ } finally {
+ dispatchLock.unlock();
+ }
+ }
+
+ /**
+ * @return list of messages that could get dispatched to consumers if they were not full.
+ */
+ private List<QueueMessageReference> doActualDispatch(List<QueueMessageReference> list) throws Exception {
+ List<QueueMessageReference> rc = new ArrayList<QueueMessageReference>(list.size());
+ List<Subscription> consumers;
+
+ synchronized (this.consumers) {
+ consumers = new ArrayList<Subscription>(this.consumers);
+ }
+
+ for (MessageReference node : list) {
+ Subscription target = null;
+ int interestCount=0;
+ for (Subscription s : consumers) {
+ if (dispatchSelector.canSelect(s, node)) {
+ if (!s.isFull()) {
+ // Dispatch it.
+ s.add(node);
+// System.out.println(getName()+" Dispatched to "+s.getConsumerInfo().getConsumerId()+", "+node.getMessageId());
+ target = s;
+ break;
+ }
+ interestCount++;
}
+ }
-
- for (MessageReference node : list) {
- Subscription target = null;
- List<Subscription> targets = null;
- for (Subscription s : consumers) {
- if (dispatchSelector.canSelect(s, node)) {
- if (!s.isFull()) {
- s.add(node);
- target = s;
- break;
- } else {
- if (targets == null) {
- targets = new ArrayList<Subscription>();
- }
- targets.add(s);
- }
- }
- }
- if (target == null && targets != null) {
- // pick the least loaded to add the message too
- for (Subscription s : targets) {
- if (target == null
- || target.getPendingQueueSize() > s.getPendingQueueSize()) {
- target = s;
- }
- }
- if (target != null) {
- target.add(node);
- }
- }
- if (target != null && !strictOrderDispatch && consumers.size() > 1 &&
- !dispatchSelector.isExclusiveConsumer(target)) {
- synchronized (this.consumers) {
- if( removeFromConsumerList(target) ) {
- addToConsumerList(target);
- consumers = new ArrayList<Subscription>(this.consumers);
- }
- }
+ if (target == null && interestCount>0) {
+ // This means all subs were full...
+ rc.add((QueueMessageReference)node);
+ }
+
+ // If it got dispatched, rotate the consumer list to get round robin distribution.
+ if (target != null && !strictOrderDispatch && consumers.size() > 1 &&
+ !dispatchSelector.isExclusiveConsumer(target)) {
+ synchronized (this.consumers) {
+ if( removeFromConsumerList(target) ) {
+ addToConsumerList(target);
+ consumers = new ArrayList<Subscription>(this.consumers);
}
}
- } finally {
- dispatchLock.unlock();
}
}
+
+ //LOG.info(getName()+" Pending messages:");
+ //for (MessageReference n : rc) {
+ // LOG.info(getName()+" - " + n.getMessageId());
+ // }
+
+ return rc;
}
private void pageInMessages() throws Exception {