You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/08/10 21:01:24 UTC
svn commit: r1371809 -
/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java
Author: tabish
Date: Fri Aug 10 19:01:24 2012
New Revision: 1371809
URL: http://svn.apache.org/viewvc?rev=1371809&view=rev
Log:
Test didn't need to depend on hard coded port so use :0 instead. Add some additional test cases.
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java?rev=1371809&r1=1371808&r2=1371809&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java Fri Aug 10 19:01:24 2012
@@ -1,10 +1,8 @@
package org.apache.activemq.bugs;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import java.net.URI;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
@@ -18,77 +16,63 @@ import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
-import javax.management.ObjectName;
-import junit.framework.Assert;
-
-import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQMessageConsumer;
-import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.DestinationViewMBean;
-import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.Wait;
-
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test for AMQ-3965.
- * A consumer may be stalled in case it uses optimizeAcknowledge and receives
- * a number of messages that expire before being dispatched to application code.
- * See AMQ-3965 for more details.
+ * A consumer may be stalled in case it uses optimizeAcknowledge and receives
+ * a number of messages that expire before being dispatched to application code.
+ * See for more details.
+ *
*/
public class OptimizeAcknowledgeWithExpiredMsgsTest {
-
+
private final static Logger LOG = LoggerFactory.getLogger(OptimizeAcknowledgeWithExpiredMsgsTest.class);
- private static BrokerService broker = null;
- protected static final String DATA_DIR = "target/activemq-data/";
- public final String brokerUrl = "tcp://localhost:61614";
-
-
- /**
- * Creates a broker instance and starts it.
- *
+ private BrokerService broker = null;
+
+ private String connectionUri;
+
+ /**
+ * Creates a broker instance but does not start it.
+ *
* @param brokerUri - transport uri of broker
* @param brokerName - name for the broker
* @return a BrokerService instance with transport uri and broker name set
* @throws Exception
*/
- protected BrokerService createBroker(URI brokerUri, String brokerName) throws Exception {
- BrokerService broker = BrokerFactory.createBroker(brokerUri);
- broker.setBrokerName(brokerName);
- broker.setBrokerId(brokerName);
- broker.setDataDirectory(DATA_DIR);
- broker.setEnableStatistics(true);
+ protected BrokerService createBroker() throws Exception {
+ BrokerService broker = new BrokerService();
+ broker.setPersistent(false);
+ broker.setDeleteAllMessagesOnStartup(true);
broker.setUseJmx(false);
+ connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
return broker;
}
-
-
+
@Before
public void setUp() throws Exception {
- final String options = "?persistent=false&useJmx=false&deleteAllMessagesOnStartup=true";
-
- broker = createBroker(new URI("broker:(" + brokerUrl + ")" + options), "localhost");
+ broker = createBroker();
broker.start();
- broker.waitUntilStarted();
-
+ broker.waitUntilStarted();
}
-
-
+
@After
- public void tearDown() throws Exception {
- if (broker != null)
- broker.stop();
+ public void tearDown() throws Exception {
+ if (broker != null) {
+ broker.stop();
+ broker.waitUntilStopped();
+ broker = null;
+ }
}
-
/**
* Tests for AMQ-3965
@@ -96,136 +80,217 @@ public class OptimizeAcknowledgeWithExpi
* Creates producer and consumer. Producer sends 45 msgs that will expire
* at consumer (but before being dispatched to app code).
* Producer then sends 60 msgs without expiry.
- *
+ *
* Consumer receives msgs using a MessageListener and increments a counter.
- * Main thread sleeps for 5 seconds and checks the counter value.
+ * Main thread sleeps for 5 seconds and checks the counter value.
* If counter != 60 msgs (the number of msgs that should get dispatched
- * to consumer) the test fails.
+ * to consumer) the test fails.
*/
@Test
public void testOptimizedAckWithExpiredMsgs() throws Exception
{
-
- ActiveMQConnectionFactory connectionFactory =
- new ActiveMQConnectionFactory(brokerUrl + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100");
+ ActiveMQConnectionFactory connectionFactory =
+ new ActiveMQConnectionFactory(connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100");
// Create JMS resources
Connection connection = connectionFactory.createConnection();
- connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("TEST.FOO");
- // ***** Consumer code *****
- MessageConsumer consumer = session.createConsumer(destination);
-
- MyMessageListener listener = new MyMessageListener();
+ // ***** Consumer code *****
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ final MyMessageListener listener = new MyMessageListener();
connection.setExceptionListener((ExceptionListener) listener);
-
+
// ***** Producer Code *****
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
- TextMessage message;
-
+ TextMessage message;
+
+ // Produce msgs that will expire quickly
+ for (int i=0; i<45; i++) {
+ message = session.createTextMessage(text);
+ producer.send(message,1,1,100);
+ LOG.trace("Sent message: "+ message.getJMSMessageID() +
+ " with expiry 10 msec");
+ }
+ // Produce msgs that don't expire
+ for (int i=0; i<60; i++) {
+ message = session.createTextMessage(text);
+ producer.send(message,1,1,60000);
+ // producer.send(message);
+ LOG.trace("Sent message: "+ message.getJMSMessageID() +
+ " with expiry 30 sec");
+ }
consumer.setMessageListener(listener);
- listener.setDelay(100);
-
+
+ sleep(1000); // let the batch of 45 expire.
+
+ connection.start();
+
+ assertTrue("Should receive all expected messages, counter at " + listener.getCounter(), Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return listener.getCounter() == 60;
+ }
+ }));
+
+ LOG.info("Received all expected messages with counter at: " + listener.getCounter());
+
+ // Cleanup
+ producer.close();
+ consumer.close();
+ session.close();
+ connection.close();
+ }
+
+ @Test
+ public void testOptimizedAckWithExpiredMsgsSync() throws Exception
+ {
+ ActiveMQConnectionFactory connectionFactory =
+ new ActiveMQConnectionFactory(connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100");
+
+ // Create JMS resources
+ Connection connection = connectionFactory.createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue("TEST.FOO");
+
+ // ***** Consumer code *****
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // ***** Producer Code *****
+ MessageProducer producer = session.createProducer(destination);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
+ TextMessage message;
+
// Produce msgs that will expire quickly
for (int i=0; i<45; i++) {
message = session.createTextMessage(text);
- producer.send(message,1,1,30);
- LOG.trace("Sent message: "+ message.getJMSMessageID() +
- " with expiry 30 msec");
+ producer.send(message,1,1,10);
+ LOG.trace("Sent message: "+ message.getJMSMessageID() +
+ " with expiry 10 msec");
}
// Produce msgs that don't expire
for (int i=0; i<60; i++) {
message = session.createTextMessage(text);
- producer.send(message);
- LOG.trace("Sent message: "+ message.getJMSMessageID() +
- " with no expiry.");
- }
- listener.setDelay(0);
-
- // set exit condition
- TestExitCondition cond = new TestExitCondition(listener);
- Wait.waitFor(cond, 5000);
-
- Assert.assertTrue("Error: Some non-expired messages were not received.", listener.getCounter() >= 60);
-
- LOG.info("Received all expected messages with counter at " + listener.getCounter());
-
+ producer.send(message,1,1,30000);
+ // producer.send(message);
+ LOG.trace("Sent message: "+ message.getJMSMessageID() +
+ " with expiry 30 sec");
+ }
+ sleep(200);
+
+ int counter = 1;
+ for (; counter <= 60; ++counter) {
+ assertNotNull(consumer.receive(2000));
+ LOG.info("counter at " + counter);
+ }
+ LOG.info("Received all expected messages with counter at: " + counter);
+
+ // Cleanup
+ producer.close();
+ consumer.close();
+ session.close();
+ connection.close();
+ }
+
+ @Test
+ public void testOptimizedAckWithExpiredMsgsSync2() throws Exception
+ {
+ ActiveMQConnectionFactory connectionFactory =
+ new ActiveMQConnectionFactory(connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100");
+
+ // Create JMS resources
+ Connection connection = connectionFactory.createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue("TEST.FOO");
+
+ // ***** Consumer code *****
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // ***** Producer Code *****
+ MessageProducer producer = session.createProducer(destination);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
+ TextMessage message;
+
+ // Produce msgs that don't expire
+ for (int i=0; i<56; i++) {
+ message = session.createTextMessage(text);
+ producer.send(message,1,1,30000);
+ // producer.send(message);
+ LOG.trace("Sent message: "+ message.getJMSMessageID() +
+ " with expiry 30 sec");
+ }
+ // Produce msgs that will expire quickly
+ for (int i=0; i<44; i++) {
+ message = session.createTextMessage(text);
+ producer.send(message,1,1,10);
+ LOG.trace("Sent message: "+ message.getJMSMessageID() +
+ " with expiry 10 msec");
+ }
+ // Produce some moremsgs that don't expire
+ for (int i=0; i<4; i++) {
+ message = session.createTextMessage(text);
+ producer.send(message,1,1,30000);
+ // producer.send(message);
+ LOG.trace("Sent message: "+ message.getJMSMessageID() +
+ " with expiry 30 sec");
+ }
+
+ sleep(200);
+
+ int counter = 1;
+ for (; counter <= 60; ++counter) {
+ assertNotNull(consumer.receive(2000));
+ LOG.info("counter at " + counter);
+ }
+ LOG.info("Received all expected messages with counter at: " + counter);
+
// Cleanup
- LOG.info("Cleaning up.");
producer.close();
consumer.close();
session.close();
connection.close();
- listener = null;
}
-
private void sleep(int milliSecondTime) {
try {
Thread.sleep(milliSecondTime);
} catch (InterruptedException igonred) {
- }
+ }
}
-
-
- /**
- * Defines the exit condition for the test.
- */
- private class TestExitCondition implements Wait.Condition {
- private MyMessageListener listener;
-
- public TestExitCondition(MyMessageListener l) {
- this.listener = l;
- }
-
- public boolean isSatisified() throws Exception {
- return listener.getCounter() == 36;
- }
-
- }
-
-
- /**
+ /**
* Standard JMS MessageListener
*/
private class MyMessageListener implements MessageListener, ExceptionListener {
-
- private AtomicInteger counter = new AtomicInteger(0);
- private int delay = 0;
-
- public void onMessage(final Message message) {
- try {
- LOG.trace("Got Message " + message.getJMSMessageID());
- LOG.debug("counter at " + counter.incrementAndGet());
- if (delay>0) {
- sleep(delay);
- }
- } catch (final Exception e) {
- e.printStackTrace();
+
+ private AtomicInteger counter = new AtomicInteger(0);
+
+ public void onMessage(final Message message) {
+ try {
+ LOG.trace("Got Message " + message.getJMSMessageID());
+ LOG.info("counter at " + counter.incrementAndGet());
+ } catch (final Exception e) {
}
- }
-
- public int getCounter() {
- return counter.get();
- }
-
- public int getDelay() {
- return delay;
- }
-
- public void setDelay(int newDelay) {
- this.delay = newDelay;
- }
-
- public synchronized void onException(JMSException ex) {
+ }
+
+ public int getCounter() {
+ return counter.get();
+ }
+
+ public synchronized void onException(JMSException ex) {
LOG.error("JMS Exception occured. Shutting down client.");
}
}
}
-
\ No newline at end of file