You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/10/02 16:55:03 UTC

svn commit: r1392945 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java

Author: gtully
Date: Tue Oct  2 14:55:03 2012
New Revision: 1392945

URL: http://svn.apache.org/viewvc?rev=1392945&view=rev
Log:
add verification of blocking via full advisory - odd that advisory arrives late, hmm

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java?rev=1392945&r1=1392944&r2=1392945&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java Tue Oct  2 14:55:03 2012
@@ -17,6 +17,7 @@
 package org.apache.activemq.usecases;
 
 import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.Connection;
@@ -29,6 +30,7 @@ import javax.jms.Session;
 import junit.framework.TestCase;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -41,7 +43,7 @@ public class TopicProducerFlowControlTes
     private static final Logger LOG = LoggerFactory.getLogger(TopicProducerFlowControlTest.class);
     private static final String brokerName = "testBroker";
     private static final String brokerUrl = "vm://" + brokerName;
-    private static final int destinationMemLimit = 2097152; // 2MB
+    protected static final int destinationMemLimit = 2097152; // 2MB
     private static final AtomicLong produced = new AtomicLong();
     private static final AtomicLong consumed = new AtomicLong();
     private static final int numMessagesToSend = 50000;
@@ -66,6 +68,7 @@ public class TopicProducerFlowControlTes
         tpe.setTopic(">");
         tpe.setMemoryLimit(destinationMemLimit);
         tpe.setProducerFlowControl(true);
+        tpe.setAdvisoryWhenFull(true);
 
         // Setup the topic destination policy
         PolicyEntry qpe = new PolicyEntry();
@@ -73,23 +76,27 @@ public class TopicProducerFlowControlTes
         qpe.setMemoryLimit(destinationMemLimit);
         qpe.setProducerFlowControl(true);
         qpe.setQueuePrefetch(1);
+        qpe.setAdvisoryWhenFull(true);
 
         pm.setPolicyEntries(Arrays.asList(new PolicyEntry[]{tpe, qpe}));
 
-        broker.setDestinationPolicy(pm);
+        setDestinationPolicy(broker, pm);
 
         // Start the broker
         broker.start();
         broker.waitUntilStarted();
     }
 
+    protected void setDestinationPolicy(BrokerService broker, PolicyMap pm) {
+        broker.setDestinationPolicy(pm);
+    }
+
     protected void tearDown() throws Exception {
         broker.stop();
         broker.waitUntilStopped();
     }
 
     public void testTopicProducerFlowControl() throws Exception {
-        Destination destination = new ActiveMQTopic("test");
 
         // Create the connection factory
         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
@@ -99,7 +106,18 @@ public class TopicProducerFlowControlTes
         // Start the test destination listener
         Connection c = connectionFactory.createConnection();
         c.start();
-        c.createSession(false, 1).createConsumer(destination).setMessageListener(new TopicProducerFlowControlTest());
+        Session listenerSession = c.createSession(false, 1);
+        Destination destination = createDestination(listenerSession);
+
+        listenerSession.createConsumer(destination).setMessageListener(new TopicProducerFlowControlTest());
+        final AtomicInteger blockedCounter = new AtomicInteger(0);
+        listenerSession.createConsumer(new ActiveMQTopic(AdvisorySupport.FULL_TOPIC_PREFIX + ">")).setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                LOG.info("Got full advisory, blockedCounter: " + blockedCounter.get());
+                blockedCounter.incrementAndGet();
+            }
+        });
 
         // Start producing the test messages
         final Session session = connectionFactory.createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -112,7 +130,7 @@ public class TopicProducerFlowControlTes
                         producer.send(session.createTextMessage("test"));
 
                         long count = produced.incrementAndGet();
-                        if (count % 100 == 0) {
+                        if (count % 10000 == 0) {
                             LOG.info("Produced " + count + " messages");
                         }
                     }
@@ -138,17 +156,30 @@ public class TopicProducerFlowControlTes
 
         assertEquals("Didn't produce all messages", numMessagesToSend, produced.get());
         assertEquals("Didn't consume all messages", numMessagesToSend, consumed.get());
+
+         assertTrue("Producer got blocked", Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                return blockedCounter.get() > 0;
+            }
+        }, 5 * 1000));
+    }
+
+    protected Destination createDestination(Session listenerSession) throws Exception {
+        return new ActiveMQTopic("test");
     }
 
     @Override
     public void onMessage(Message message) {
         long count = consumed.incrementAndGet();
         if (count % 100 == 0) {
-            LOG.info("\tConsumed " + count + " messages");
             try {
                 Thread.sleep(100);
             } catch (InterruptedException e) {
             }
         }
+        if (count % 10000 == 0) {
+            LOG.info("\tConsumed " + count + " messages");
+        }
+
     }
 }