You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jl...@apache.org on 2006/11/20 07:22:02 UTC

svn commit: r477068 - in /incubator/activemq/branches/activemq-4.1/activemq-core/src: main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java

Author: jlim
Date: Sun Nov 19 22:22:01 2006
New Revision: 477068

URL: http://svn.apache.org/viewvc?view=rev&rev=477068
Log:
applied patch for http://issues.apache.org/activemq/browse/AMQ-1006

Modified:
    incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
    incubator/activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java

Modified: incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java?view=diff&rev=477068&r1=477067&r2=477068
==============================================================================
--- incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java (original)
+++ incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java Sun Nov 19 22:22:01 2006
@@ -44,6 +44,7 @@
         synchronized(consumers) {
             int count = 0;
             
+            Subscription firstMatchingConsumer = null;
             for (Iterator iter = consumers.iterator(); iter.hasNext();) {
                 Subscription sub = (Subscription) iter.next();
                 
@@ -51,15 +52,21 @@
                 if (!sub.matches(node, msgContext)) 
                     continue;
                 
+                if (firstMatchingConsumer == null) {
+                    firstMatchingConsumer = sub;
+                }              
                 sub.add(node);
                 count++;
             }
             
-            // Rotate the consumer list.
-            try {
-                consumers.add(consumers.remove(0));
-            } catch (Throwable bestEffort) {
-            }
+
+            if (firstMatchingConsumer != null) {
+                // Rotate the consumer list.
+                try {
+                    consumers.remove(firstMatchingConsumer);
+                    consumers.add(firstMatchingConsumer);
+                } catch (Throwable bestEffort) { }
+             }
             return count > 0;
         }        
     }

Modified: incubator/activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java?view=diff&rev=477068&r1=477067&r2=477068
==============================================================================
--- incubator/activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java (original)
+++ incubator/activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java Sun Nov 19 22:22:01 2006
@@ -23,6 +23,11 @@
 import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
 public class RoundRobinDispatchPolicyTest extends QueueSubscriptionTest {
 
     protected BrokerService createBroker() throws Exception {
@@ -81,6 +86,24 @@
         super.testManyProducersManyConsumers();
         assertMessagesDividedAmongConsumers();
     }
+    
+    public void testOneProducerTwoMatchingConsumersOneNotMatchingConsumer() throws Exception {
+    // Create consumer that won't consume any message
+        createMessageConsumer(createConnectionFactory().createConnection(), createDestination(), "JMSPriority<1");
+        super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
+        assertMessagesDividedAmongConsumers();
+    }
+    	
+    protected MessageConsumer createMessageConsumer(Connection conn, Destination dest, String selector) throws Exception {
+        connections.add(conn);
+    
+        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    	final MessageConsumer consumer = sess.createConsumer(dest, selector);
+    	conn.start();
+    	
+    	return consumer;
+    }    
+    
 
     public void assertMessagesDividedAmongConsumers() {
         assertEachConsumerReceivedAtLeastXMessages((messageCount * producerCount) / consumerCount);