You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/01/18 14:21:38 UTC

svn commit: r900391 - in /activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases: ConsumeQueuePrefetchTest.java ConsumeTopicPrefetchTest.java

Author: gtully
Date: Mon Jan 18 13:21:38 2010
New Revision: 900391

URL: http://svn.apache.org/viewvc?rev=900391&view=rev
Log:
merge -c 899145 https://svn.apache.org/repos/asf/activemq/trunk - add a few more test variants for inflight: https://issues.apache.org/activemq/browse/AMQ-2566

Modified:
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ConsumeQueuePrefetchTest.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ConsumeTopicPrefetchTest.java

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ConsumeQueuePrefetchTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ConsumeQueuePrefetchTest.java?rev=900391&r1=900390&r2=900391&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ConsumeQueuePrefetchTest.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ConsumeQueuePrefetchTest.java Mon Jan 18 13:21:38 2010
@@ -17,26 +17,41 @@
 package org.apache.activemq.usecases;
 
 import javax.jms.JMSException;
+import javax.jms.Message;
 
-/**
- * @version $Revision: 1.1.1.1 $
- */
-public class ConsumeQueuePrefetchTest extends ConsumeTopicPrefetchTest {
-
-    /**
-     * TODO disabled failing test cases until we fix queue dispatching
-     */
-    public void testSendDoublePrefetchSize() throws JMSException {
-    }
-
-    /**
-     * TODO disabled failing test cases until we fix queue dispatching
-     */
-    public void testSendPrefetchSizePlusOne() throws JMSException {
-    }
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
+public class ConsumeQueuePrefetchTest extends ConsumeTopicPrefetchTest {
+    private static final Log LOG = LogFactory.getLog(ConsumeQueuePrefetchTest.class);
+    
     protected void setUp() throws Exception {
         topic = false;
         super.setUp();
     }
+    
+    public void testInflightWithConsumerPerMessage() throws JMSException {
+        makeMessages(prefetchSize);
+
+        LOG.info("About to send and receive: " + prefetchSize + " on destination: " + destination
+                + " of type: " + destination.getClass().getName());
+
+        for (int i = 0; i < prefetchSize; i++) {
+            Message message = session.createTextMessage(messageTexts[i]);
+            producer.send(message);
+        }
+
+        validateConsumerPrefetch(this.getSubject(), prefetchSize);
+        
+        // new consumer per 20 messages
+        for (int i = 0; i < prefetchSize; i+=20) {
+            consumer.close();
+            consumer = session.createConsumer(destination);
+            validateConsumerPrefetch(this.getSubject(), prefetchSize - i);
+            for (int j=0; j<20; j++) {
+                Message message = consumeMessge(i+j);
+                message.acknowledge();
+            }
+        }
+    }
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ConsumeTopicPrefetchTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ConsumeTopicPrefetchTest.java?rev=900391&r1=900390&r2=900391&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ConsumeTopicPrefetchTest.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ConsumeTopicPrefetchTest.java Mon Jan 18 13:21:38 2010
@@ -22,7 +22,12 @@
 import javax.jms.TextMessage;
 
 import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.broker.BrokerRegistry;
 import org.apache.activemq.broker.policy.IndividualDeadLetterViaXmlTest;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -59,6 +64,8 @@
             producer.send(message);
         }
 
+        validateConsumerPrefetch(this.getSubject(), prefetchSize);
+        
         // lets consume them in two fetch batches
         for (int i = 0; i < messageCount; i++) {
             consumeMessge(i);
@@ -72,12 +79,13 @@
         return connection;
     }
 
-    protected void consumeMessge(int i) throws JMSException {
+    protected TextMessage consumeMessge(int i) throws JMSException {
         Message message = consumer.receive(consumerTimeout);
         assertTrue("Should have received a message by now for message: " + i, message != null);
         assertTrue("Should be a TextMessage: " + message, message instanceof TextMessage);
         TextMessage textMessage = (TextMessage) message;
         assertEquals("Message content", messageTexts[i], textMessage.getText());
+        return textMessage;
     }
 
 
@@ -88,4 +96,27 @@
         }
     }
 
+    protected void validateConsumerPrefetch(String destination, final long expectedCount) throws JMSException {
+        RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().lookup("localhost").getRegionBroker();
+        for (org.apache.activemq.broker.region.Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) {
+            final org.apache.activemq.broker.region.Destination target = dest;
+            if (dest.getName().equals(destination)) {
+                try {
+                    Wait.waitFor(new Condition() {
+                        public boolean isSatisified() throws Exception {
+                            DestinationStatistics stats = target.getDestinationStatistics();
+                            LOG.info("inflight for : " + target.getName() + ": " +  stats.getInflight().getCount());
+                            return stats.getInflight().getCount() == expectedCount;
+                        }
+                    });
+                } catch (Exception e) {
+                    throw new JMSException(e.toString());
+                }
+                DestinationStatistics stats = dest.getDestinationStatistics();
+                LOG.info("inflight for : " + dest.getName() + ": " + stats.getInflight().getCount());
+                assertEquals("inflight for: " + dest.getName() + ": " + stats.getInflight().getCount() + " matches", 
+                        expectedCount, stats.getInflight().getCount());      
+            }
+        }
+    }
 }