You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/01/18 14:21:38 UTC
svn commit: r900391 - in
/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases:
ConsumeQueuePrefetchTest.java ConsumeTopicPrefetchTest.java
Author: gtully
Date: Mon Jan 18 13:21:38 2010
New Revision: 900391
URL: http://svn.apache.org/viewvc?rev=900391&view=rev
Log:
merge -c 899145 https://svn.apache.org/repos/asf/activemq/trunk - add a few more test variants for inflight: https://issues.apache.org/activemq/browse/AMQ-2566
Modified:
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ConsumeQueuePrefetchTest.java
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ConsumeTopicPrefetchTest.java
Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ConsumeQueuePrefetchTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ConsumeQueuePrefetchTest.java?rev=900391&r1=900390&r2=900391&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ConsumeQueuePrefetchTest.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ConsumeQueuePrefetchTest.java Mon Jan 18 13:21:38 2010
@@ -17,26 +17,41 @@
package org.apache.activemq.usecases;
import javax.jms.JMSException;
+import javax.jms.Message;
-/**
- * @version $Revision: 1.1.1.1 $
- */
-public class ConsumeQueuePrefetchTest extends ConsumeTopicPrefetchTest {
-
- /**
- * TODO disabled failing test cases until we fix queue dispatching
- */
- public void testSendDoublePrefetchSize() throws JMSException {
- }
-
- /**
- * TODO disabled failing test cases until we fix queue dispatching
- */
- public void testSendPrefetchSizePlusOne() throws JMSException {
- }
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+public class ConsumeQueuePrefetchTest extends ConsumeTopicPrefetchTest {
+ private static final Log LOG = LogFactory.getLog(ConsumeQueuePrefetchTest.class);
+
protected void setUp() throws Exception {
topic = false;
super.setUp();
}
+
+ public void testInflightWithConsumerPerMessage() throws JMSException {
+ makeMessages(prefetchSize);
+
+ LOG.info("About to send and receive: " + prefetchSize + " on destination: " + destination
+ + " of type: " + destination.getClass().getName());
+
+ for (int i = 0; i < prefetchSize; i++) {
+ Message message = session.createTextMessage(messageTexts[i]);
+ producer.send(message);
+ }
+
+ validateConsumerPrefetch(this.getSubject(), prefetchSize);
+
+ // new consumer per 20 messages
+ for (int i = 0; i < prefetchSize; i+=20) {
+ consumer.close();
+ consumer = session.createConsumer(destination);
+ validateConsumerPrefetch(this.getSubject(), prefetchSize - i);
+ for (int j=0; j<20; j++) {
+ Message message = consumeMessge(i+j);
+ message.acknowledge();
+ }
+ }
+ }
}
Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ConsumeTopicPrefetchTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ConsumeTopicPrefetchTest.java?rev=900391&r1=900390&r2=900391&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ConsumeTopicPrefetchTest.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ConsumeTopicPrefetchTest.java Mon Jan 18 13:21:38 2010
@@ -22,7 +22,12 @@
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.policy.IndividualDeadLetterViaXmlTest;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -59,6 +64,8 @@
producer.send(message);
}
+ validateConsumerPrefetch(this.getSubject(), prefetchSize);
+
// lets consume them in two fetch batches
for (int i = 0; i < messageCount; i++) {
consumeMessge(i);
@@ -72,12 +79,13 @@
return connection;
}
- protected void consumeMessge(int i) throws JMSException {
+ protected TextMessage consumeMessge(int i) throws JMSException {
Message message = consumer.receive(consumerTimeout);
assertTrue("Should have received a message by now for message: " + i, message != null);
assertTrue("Should be a TextMessage: " + message, message instanceof TextMessage);
TextMessage textMessage = (TextMessage) message;
assertEquals("Message content", messageTexts[i], textMessage.getText());
+ return textMessage;
}
@@ -88,4 +96,27 @@
}
}
+ protected void validateConsumerPrefetch(String destination, final long expectedCount) throws JMSException {
+ RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().lookup("localhost").getRegionBroker();
+ for (org.apache.activemq.broker.region.Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) {
+ final org.apache.activemq.broker.region.Destination target = dest;
+ if (dest.getName().equals(destination)) {
+ try {
+ Wait.waitFor(new Condition() {
+ public boolean isSatisified() throws Exception {
+ DestinationStatistics stats = target.getDestinationStatistics();
+ LOG.info("inflight for : " + target.getName() + ": " + stats.getInflight().getCount());
+ return stats.getInflight().getCount() == expectedCount;
+ }
+ });
+ } catch (Exception e) {
+ throw new JMSException(e.toString());
+ }
+ DestinationStatistics stats = dest.getDestinationStatistics();
+ LOG.info("inflight for : " + dest.getName() + ": " + stats.getInflight().getCount());
+ assertEquals("inflight for: " + dest.getName() + ": " + stats.getInflight().getCount() + " matches",
+ expectedCount, stats.getInflight().getCount());
+ }
+ }
+ }
}