You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2008/08/21 12:01:12 UTC
svn commit: r687677 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/
test/java/org/apache/activemq/broker/region/cursors/
test/java/org/apache/activemq/bugs/
Author: gtully
Date: Thu Aug 21 03:01:10 2008
New Revision: 687677
URL: http://svn.apache.org/viewvc?rev=687677&view=rev
Log:
fix for AMQ-1902 introduced by fix for AMQ-1866
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=687677&r1=687676&r2=687677&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Aug 21 03:01:10 2008
@@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
@@ -1166,19 +1167,24 @@
// list anything that does not actually get dispatched.
if (list != null && !list.isEmpty()) {
// System.out.println(getName()+": dispatching from paged in: "+list.size());
- pagedInPendingDispatch.addAll(doActualDispatch(list));
+ if (pagedInPendingDispatch.isEmpty()) {
+ pagedInPendingDispatch.addAll(doActualDispatch(list));
+ } else {
+ pagedInPendingDispatch.addAll(list);
+ }
// System.out.println(getName()+": new pending list2: "+pagedInPendingDispatch.size());
}
} finally {
dispatchLock.unlock();
}
}
-
+
/**
* @return list of messages that could get dispatched to consumers if they were not full.
*/
private List<QueueMessageReference> doActualDispatch(List<QueueMessageReference> list) throws Exception {
List<QueueMessageReference> rc = new ArrayList<QueueMessageReference>(list.size());
+ Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());
List<Subscription> consumers;
synchronized (this.consumers) {
@@ -1190,13 +1196,18 @@
int interestCount=0;
for (Subscription s : consumers) {
if (dispatchSelector.canSelect(s, node)) {
- if (!s.isFull()) {
- // Dispatch it.
- s.add(node);
-// System.out.println(getName()+" Dispatched to "+s.getConsumerInfo().getConsumerId()+", "+node.getMessageId());
- target = s;
- break;
- }
+ if (!fullConsumers.contains(s)) {
+ if (!s.isFull()) {
+ // Dispatch it.
+ s.add(node);
+ //System.err.println(getName()+" Dispatched to "+s.getConsumerInfo().getConsumerId()+", "+node.getMessageId());
+ target = s;
+ break;
+ } else {
+ // no further dispatch of list to a full consumer to avoid out of order message receipt
+ fullConsumers.add(s);
+ }
+ }
interestCount++;
}
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java?rev=687677&r1=687676&r2=687677&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java Thu Aug 21 03:01:10 2008
@@ -23,6 +23,8 @@
import javax.jms.MessageConsumer;
import javax.jms.Session;
+import junit.framework.Test;
+
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -62,4 +64,12 @@
answer.addConnector(bindAddress);
answer.setDeleteAllMessagesOnStartup(true);
}
+
+ public static Test suite() {
+ return suite(CursorQueueStoreTest.class);
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java?rev=687677&r1=687676&r2=687677&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java Thu Aug 21 03:01:10 2008
@@ -23,6 +23,7 @@
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -31,19 +32,24 @@
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
+
+import junit.framework.Test;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerTest;
+import org.apache.activemq.broker.region.Queue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* @version $Revision: 1.3 $
*/
-public abstract class CursorSupport extends TestCase {
+public abstract class CursorSupport extends CombinationTestSupport {
- protected static final int MESSAGE_COUNT = 500;
- protected static final int PREFETCH_SIZE = 50;
+ public int MESSAGE_COUNT = 500;
+ public int PREFETCH_SIZE = 50;
private static final Log LOG = LogFactory.getLog(CursorSupport.class);
protected BrokerService broker;
@@ -55,7 +61,7 @@
protected abstract void configureBroker(BrokerService answer) throws Exception;
- public void XtestSendFirstThenConsume() throws Exception {
+ public void testSendFirstThenConsume() throws Exception {
ConnectionFactory factory = createConnectionFactory();
Connection consumerConnection = getConsumerConnection(factory);
MessageConsumer consumer = getConsumer(consumerConnection);
@@ -85,7 +91,15 @@
consumerConnection.close();
}
- public void testSendWhilstConaume() throws Exception {
+
+ public void initCombosForTestSendWhilstConsume() {
+ addCombinationValues("MESSAGE_COUNT", new Object[] {Integer.valueOf(400),
+ Integer.valueOf(500)});
+ addCombinationValues("PREFETCH_SIZE", new Object[] {Integer.valueOf(100),
+ Integer.valueOf(50)});
+ }
+
+ public void testSendWhilstConsume() throws Exception {
ConnectionFactory factory = createConnectionFactory();
Connection consumerConnection = getConsumerConnection(factory);
// create durable subs
@@ -150,7 +164,7 @@
assertEquals("This should be the same at pos " + i + " in the list", sent.getJMSMessageID(), consumed.getJMSMessageID());
}
}
-
+
protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException {
Connection connection = fac.createConnection();
connection.setClientID("testConsumer");
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java?rev=687677&r1=687676&r2=687677&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java Thu Aug 21 03:01:10 2008
@@ -21,13 +21,13 @@
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
-import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
+
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
@@ -89,19 +89,16 @@
brokerService.stop();
}
- // Failing
public void testConsumerSlowDownPrefetch0() throws Exception {
ACTIVEMQ_BROKER_URI = "tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=0";
doTestConsumerSlowDown();
}
- // Failing
public void testConsumerSlowDownPrefetch10() throws Exception {
ACTIVEMQ_BROKER_URI = "tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=10";
doTestConsumerSlowDown();
}
- // Passing
public void testConsumerSlowDownDefaultPrefetch() throws Exception {
ACTIVEMQ_BROKER_URI = "tcp://localhost:61616";
doTestConsumerSlowDown();
@@ -137,15 +134,18 @@
threads.add(c2);
c2.start();
+ int totalReceived = 0;
for ( int i=0; i < 30; i++) {
Thread.sleep(1000);
long c1Counter = c1.counter.getAndSet(0);
long c2Counter = c2.counter.getAndSet(0);
System.out.println("c1: "+c1Counter+", c2: "+c2Counter);
+ totalReceived += c1Counter;
+ totalReceived += c2Counter;
// Once message have been flowing for a few seconds, start asserting that c2 always gets messages. It should be receiving about 100 / sec
- if( i > 3 ) {
- assertTrue("Consumer 2 should be receiving new messages every second.", c2Counter > 0);
+ if( i > 10 ) {
+ assertTrue("Total received=" + totalReceived + ", Consumer 2 should be receiving new messages every second.", c2Counter > 0);
}
}
}
@@ -204,8 +204,8 @@
} else {
sleepingTime = 1;
}
- Thread.sleep(sleepingTime);
counter.incrementAndGet();
+ Thread.sleep(sleepingTime);
}
}