You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/10/02 16:55:03 UTC
svn commit: r1392945 -
/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java
Author: gtully
Date: Tue Oct 2 14:55:03 2012
New Revision: 1392945
URL: http://svn.apache.org/viewvc?rev=1392945&view=rev
Log:
add verification of blocking via full advisory - odd that advisory arrives late, hmm
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java?rev=1392945&r1=1392944&r2=1392945&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java Tue Oct 2 14:55:03 2012
@@ -17,6 +17,7 @@
package org.apache.activemq.usecases;
import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
@@ -29,6 +30,7 @@ import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -41,7 +43,7 @@ public class TopicProducerFlowControlTes
private static final Logger LOG = LoggerFactory.getLogger(TopicProducerFlowControlTest.class);
private static final String brokerName = "testBroker";
private static final String brokerUrl = "vm://" + brokerName;
- private static final int destinationMemLimit = 2097152; // 2MB
+ protected static final int destinationMemLimit = 2097152; // 2MB
private static final AtomicLong produced = new AtomicLong();
private static final AtomicLong consumed = new AtomicLong();
private static final int numMessagesToSend = 50000;
@@ -66,6 +68,7 @@ public class TopicProducerFlowControlTes
tpe.setTopic(">");
tpe.setMemoryLimit(destinationMemLimit);
tpe.setProducerFlowControl(true);
+ tpe.setAdvisoryWhenFull(true);
// Setup the topic destination policy
PolicyEntry qpe = new PolicyEntry();
@@ -73,23 +76,27 @@ public class TopicProducerFlowControlTes
qpe.setMemoryLimit(destinationMemLimit);
qpe.setProducerFlowControl(true);
qpe.setQueuePrefetch(1);
+ qpe.setAdvisoryWhenFull(true);
pm.setPolicyEntries(Arrays.asList(new PolicyEntry[]{tpe, qpe}));
- broker.setDestinationPolicy(pm);
+ setDestinationPolicy(broker, pm);
// Start the broker
broker.start();
broker.waitUntilStarted();
}
+ protected void setDestinationPolicy(BrokerService broker, PolicyMap pm) {
+ broker.setDestinationPolicy(pm);
+ }
+
protected void tearDown() throws Exception {
broker.stop();
broker.waitUntilStopped();
}
public void testTopicProducerFlowControl() throws Exception {
- Destination destination = new ActiveMQTopic("test");
// Create the connection factory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
@@ -99,7 +106,18 @@ public class TopicProducerFlowControlTes
// Start the test destination listener
Connection c = connectionFactory.createConnection();
c.start();
- c.createSession(false, 1).createConsumer(destination).setMessageListener(new TopicProducerFlowControlTest());
+ Session listenerSession = c.createSession(false, 1);
+ Destination destination = createDestination(listenerSession);
+
+ listenerSession.createConsumer(destination).setMessageListener(new TopicProducerFlowControlTest());
+ final AtomicInteger blockedCounter = new AtomicInteger(0);
+ listenerSession.createConsumer(new ActiveMQTopic(AdvisorySupport.FULL_TOPIC_PREFIX + ">")).setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ LOG.info("Got full advisory, blockedCounter: " + blockedCounter.get());
+ blockedCounter.incrementAndGet();
+ }
+ });
// Start producing the test messages
final Session session = connectionFactory.createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -112,7 +130,7 @@ public class TopicProducerFlowControlTes
producer.send(session.createTextMessage("test"));
long count = produced.incrementAndGet();
- if (count % 100 == 0) {
+ if (count % 10000 == 0) {
LOG.info("Produced " + count + " messages");
}
}
@@ -138,17 +156,30 @@ public class TopicProducerFlowControlTes
assertEquals("Didn't produce all messages", numMessagesToSend, produced.get());
assertEquals("Didn't consume all messages", numMessagesToSend, consumed.get());
+
+ assertTrue("Producer got blocked", Wait.waitFor(new Wait.Condition() {
+ public boolean isSatisified() throws Exception {
+ return blockedCounter.get() > 0;
+ }
+ }, 5 * 1000));
+ }
+
+ protected Destination createDestination(Session listenerSession) throws Exception {
+ return new ActiveMQTopic("test");
}
@Override
public void onMessage(Message message) {
long count = consumed.incrementAndGet();
if (count % 100 == 0) {
- LOG.info("\tConsumed " + count + " messages");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
+ if (count % 10000 == 0) {
+ LOG.info("\tConsumed " + count + " messages");
+ }
+
}
}