You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jl...@apache.org on 2006/11/20 07:22:02 UTC
svn commit: r477068 - in
/incubator/activemq/branches/activemq-4.1/activemq-core/src:
main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java
Author: jlim
Date: Sun Nov 19 22:22:01 2006
New Revision: 477068
URL: http://svn.apache.org/viewvc?view=rev&rev=477068
Log:
applied patch for http://issues.apache.org/activemq/browse/AMQ-1006
Modified:
incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
incubator/activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java
Modified: incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java?view=diff&rev=477068&r1=477067&r2=477068
==============================================================================
--- incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java (original)
+++ incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java Sun Nov 19 22:22:01 2006
@@ -44,6 +44,7 @@
synchronized(consumers) {
int count = 0;
+ Subscription firstMatchingConsumer = null;
for (Iterator iter = consumers.iterator(); iter.hasNext();) {
Subscription sub = (Subscription) iter.next();
@@ -51,15 +52,21 @@
if (!sub.matches(node, msgContext))
continue;
+ if (firstMatchingConsumer == null) {
+ firstMatchingConsumer = sub;
+ }
sub.add(node);
count++;
}
- // Rotate the consumer list.
- try {
- consumers.add(consumers.remove(0));
- } catch (Throwable bestEffort) {
- }
+
+ if (firstMatchingConsumer != null) {
+ // Rotate the consumer list.
+ try {
+ consumers.remove(firstMatchingConsumer);
+ consumers.add(firstMatchingConsumer);
+ } catch (Throwable bestEffort) { }
+ }
return count > 0;
}
}
Modified: incubator/activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java?view=diff&rev=477068&r1=477067&r2=477068
==============================================================================
--- incubator/activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java (original)
+++ incubator/activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java Sun Nov 19 22:22:01 2006
@@ -23,6 +23,11 @@
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.apache.activemq.broker.region.policy.PolicyMap;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
public class RoundRobinDispatchPolicyTest extends QueueSubscriptionTest {
protected BrokerService createBroker() throws Exception {
@@ -81,6 +86,24 @@
super.testManyProducersManyConsumers();
assertMessagesDividedAmongConsumers();
}
+
+ public void testOneProducerTwoMatchingConsumersOneNotMatchingConsumer() throws Exception {
+ // Create consumer that won't consume any message
+ createMessageConsumer(createConnectionFactory().createConnection(), createDestination(), "JMSPriority<1");
+ super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
+ assertMessagesDividedAmongConsumers();
+ }
+
+ protected MessageConsumer createMessageConsumer(Connection conn, Destination dest, String selector) throws Exception {
+ connections.add(conn);
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer consumer = sess.createConsumer(dest, selector);
+ conn.start();
+
+ return consumer;
+ }
+
public void assertMessagesDividedAmongConsumers() {
assertEachConsumerReceivedAtLeastXMessages((messageCount * producerCount) / consumerCount);