You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/08/14 10:24:12 UTC

svn commit: r685806 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: BaseDestination.java PrefetchSubscription.java Queue.java

Author: rajdavies
Date: Thu Aug 14 01:24:11 2008
New Revision: 685806

URL: http://svn.apache.org/viewvc?rev=685806&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1866

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=685806&r1=685805&r2=685806&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Thu Aug 14 01:24:11 2008
@@ -37,7 +37,7 @@
      * The default number of messages to page in to the destination
      * from persistent storage
      */
-    public static final int DEFAULT_PAGE_SIZE=100;
+    public static final int DEFAULT_PAGE_SIZE=200;
    
     protected final ActiveMQDestination destination;
     protected final Broker broker;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=685806&r1=685805&r2=685806&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Thu Aug 14 01:24:11 2008
@@ -378,9 +378,9 @@
             }
         }
         if (callDispatchMatched && destination != null) {
-            if (destination.isLazyDispatch()) {
+//            if (destination.isLazyDispatch()) {
                 destination.wakeup();
-            }
+//            }
             dispatchPending();
         } else {
             if (isSlave()) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=685806&r1=685805&r2=685806&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Aug 14 01:24:11 2008
@@ -81,6 +81,8 @@
     protected final List<Subscription> consumers = new ArrayList<Subscription>(50);
     protected PendingMessageCursor messages;
     private final LinkedHashMap<MessageId,QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId,QueueMessageReference>();
+    // Messages that are paged in but have not yet been targeted at a subscription
+    private List<QueueMessageReference> pagedInPendingDispatch = new ArrayList<QueueMessageReference>(100);
     private MessageGroupMap messageGroupOwners;
     private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
     private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
@@ -317,6 +319,7 @@
     }
 
     public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
+//        System.out.println(getName()+" send "+message.getMessageId());
         final ConnectionContext context = producerExchange.getConnectionContext();
         // There is delay between the client sending it and it arriving at the
         // destination.. it may have expired.
@@ -946,6 +949,18 @@
 	            result = !messages.isEmpty();
 	        }               
 	        
+	        // Kinda ugly.. but I think dispatchLock is the only mutex protecting the 
+	        // pagedInPendingDispatch variable. 	        
+	        dispatchLock.lock();
+	        try {
+	            result |= !pagedInPendingDispatch.isEmpty();
+	        } finally {
+	            dispatchLock.unlock();
+	        }
+	        
+	        // Perhaps we should page always into the pagedInPendingDispatch list is 
+                // !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty()
+                // then we do a dispatch.
 	        if (result) {
 	            try {
 	               pageInMessages(false);
@@ -1134,58 +1149,76 @@
     }
     
     private void doDispatch(List<QueueMessageReference> list) throws Exception {
-        if (list != null) {
-            List<Subscription> consumers;
-            dispatchLock.lock();
-            try {
-                synchronized (this.consumers) {
-                    consumers = new ArrayList<Subscription>(this.consumers);
+        dispatchLock.lock();
+        try {
+            if(!pagedInPendingDispatch.isEmpty()) {
+//                System.out.println(getName()+": dispatching from pending: "+pagedInPendingDispatch.size());
+                // Try to first dispatch anything that had not been dispatched before.
+                pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
+//                System.out.println(getName()+": new pending list1: "+pagedInPendingDispatch.size());
+            }
+            // and now see if we can dispatch the new stuff.. and append to the pending 
+            // list anything that does not actually get dispatched.
+            if (list != null && !list.isEmpty()) {
+//                System.out.println(getName()+": dispatching from paged in: "+list.size());
+                pagedInPendingDispatch.addAll(doActualDispatch(list));
+//                System.out.println(getName()+": new pending list2: "+pagedInPendingDispatch.size());
+            }
+        } finally {
+            dispatchLock.unlock();
+        }
+    }
+
+    /**
+     * @return list of messages that could get dispatched to consumers if they were not full.
+     */
+    private List<QueueMessageReference> doActualDispatch(List<QueueMessageReference> list) throws Exception {
+        List<QueueMessageReference> rc = new ArrayList<QueueMessageReference>(list.size());
+        List<Subscription> consumers;
+        
+        synchronized (this.consumers) {
+            consumers = new ArrayList<Subscription>(this.consumers);
+        }
+
+        for (MessageReference node : list) {
+            Subscription target = null;
+            int interestCount=0;
+            for (Subscription s : consumers) {
+                if (dispatchSelector.canSelect(s, node)) {
+                    if (!s.isFull()) {
+                        // Dispatch it.
+                        s.add(node);
+//                        System.out.println(getName()+" Dispatched to "+s.getConsumerInfo().getConsumerId()+", "+node.getMessageId());
+                        target = s;
+                        break;
+                    } 
+                    interestCount++;
                 }
+            }
             
-            
-                for (MessageReference node : list) {
-                    Subscription target = null;
-                    List<Subscription> targets = null;
-                    for (Subscription s : consumers) {
-                        if (dispatchSelector.canSelect(s, node)) {
-                            if (!s.isFull()) {
-                                s.add(node);
-                                target = s;
-                                break;
-                            } else {
-                                if (targets == null) {
-                                    targets = new ArrayList<Subscription>();
-                                }
-                                targets.add(s);
-                            }
-                        }
-                    }
-                    if (target == null && targets != null) {
-                        // pick the least loaded to add the message too
-                        for (Subscription s : targets) {
-                            if (target == null
-                                    || target.getPendingQueueSize() > s.getPendingQueueSize()) {
-                                target = s;
-                            }
-                        }
-                        if (target != null) {
-                            target.add(node);
-                        }
-                    }
-                    if (target != null && !strictOrderDispatch && consumers.size() > 1 &&
-                            !dispatchSelector.isExclusiveConsumer(target)) {
-                        synchronized (this.consumers) {
-                            if( removeFromConsumerList(target) ) {
-                                addToConsumerList(target);
-                                consumers = new ArrayList<Subscription>(this.consumers);
-                            }
-                        }
+            if (target == null && interestCount>0) {
+                // This means all subs were full...
+                rc.add((QueueMessageReference)node);
+            }
+
+            // If it got dispatched, rotate the consumer list to get round robin distribution. 
+            if (target != null && !strictOrderDispatch && consumers.size() > 1 &&
+                    !dispatchSelector.isExclusiveConsumer(target)) {
+                synchronized (this.consumers) {
+                    if( removeFromConsumerList(target) ) {
+                        addToConsumerList(target);
+                        consumers = new ArrayList<Subscription>(this.consumers);
                     }
                 }
-            } finally {
-                dispatchLock.unlock();
             }
         }
+
+        //LOG.info(getName()+" Pending messages:");
+        //for (MessageReference n : rc) {
+       //     LOG.info(getName()+"  - " + n.getMessageId());
+       // }
+        
+        return rc;
     }
 
     private void pageInMessages() throws Exception {