You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by fo...@apache.org on 2006/09/30 19:00:57 UTC
svn commit: r451612 -
/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpiration.java
Author: foconer
Date: Sat Sep 30 10:00:57 2006
New Revision: 451612
URL: http://svn.apache.org/viewvc?view=rev&rev=451612
Log:
Added unit test for consuming expired topic and queue. - AMQ-936
Added:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpiration.java
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpiration.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpiration.java?view=auto&rev=451612
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpiration.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpiration.java Sat Sep 30 10:00:57 2006
@@ -0,0 +1,216 @@
+package org.apache.activemq;
+
+import java.util.Date;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+public class JmsSendReceiveWithMessageExpiration extends TestSupport {
+
+ private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
+ .getLog(JmsQueueSendReceiveWithMessageExpiration.class);
+
+ protected int messageCount = 100;
+ protected String[] data;
+ protected Session session;
+ protected Destination consumerDestination;
+ protected Destination producerDestination;
+ protected boolean durable = false;
+ protected int deliveryMode = DeliveryMode.PERSISTENT;
+ protected long timeToLive = 5000;
+ protected boolean verbose = false;
+
+ protected Connection connection;
+
+ protected void setUp() throws Exception {
+
+ super.setUp();
+
+ data = new String[messageCount];
+
+ for (int i = 0; i < messageCount; i++) {
+ data[i] = "Text for message: " + i + " at " + new Date();
+ }
+
+ connectionFactory = createConnectionFactory();
+ connection = createConnection();
+
+ if (durable) {
+ connection.setClientID(getClass().getName());
+ }
+
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ /**
+ * Sends and consumes the messages to a queue destination.
+ *
+ * @throws Exception
+ */
+ public void testConsumeExpiredQueue() throws Exception {
+
+ MessageProducer producer = createProducer(timeToLive);
+
+ consumerDestination = session.createQueue(getConsumerSubject());
+ producerDestination = session.createQueue(getProducerSubject());
+
+ MessageConsumer consumer = createConsumer();
+ connection.start();
+
+ for (int i = 0; i < data.length; i++) {
+ Message message = session.createTextMessage(data[i]);
+ message.setStringProperty("stringProperty",data[i]);
+ message.setIntProperty("intProperty",i);
+
+ if (verbose) {
+ if (log.isDebugEnabled()) {
+ log.debug("About to send a queue message: " + message + " with text: " + data[i]);
+ }
+ }
+
+ producer.send(producerDestination, message);
+ }
+
+ Thread.sleep(timeToLive + 1000);
+
+ // message should have expired.
+ assertNull(consumer.receive(1000));
+ }
+
+ /**
+ * Sends and consumes the messages to a queue destination.
+ *
+ * @throws Exception
+ */
+ public void testConsumeQueue() throws Exception {
+
+ MessageProducer producer = createProducer(0);
+
+ consumerDestination = session.createQueue(getConsumerSubject());
+ producerDestination = session.createQueue(getProducerSubject());
+
+ MessageConsumer consumer = createConsumer();
+ connection.start();
+
+ for (int i = 0; i < data.length; i++) {
+ Message message = session.createTextMessage(data[i]);
+ message.setStringProperty("stringProperty",data[i]);
+ message.setIntProperty("intProperty",i);
+
+ if (verbose) {
+ if (log.isDebugEnabled()) {
+ log.debug("About to send a queue message: " + message + " with text: " + data[i]);
+ }
+ }
+
+ producer.send(producerDestination, message);
+ }
+
+ // message should have expired.
+ assertNotNull(consumer.receive(1000));
+ }
+
+ /**
+ * Sends and consumes the messages to a topic destination.
+ *
+ * @throws Exception
+ */
+ public void testConsumeExpiredTopic() throws Exception {
+
+ MessageProducer producer = createProducer(timeToLive);
+
+ consumerDestination = session.createTopic(getConsumerSubject());
+ producerDestination = session.createTopic(getProducerSubject());
+
+ MessageConsumer consumer = createConsumer();
+ connection.start();
+
+ for (int i = 0; i < data.length; i++) {
+ Message message = session.createTextMessage(data[i]);
+ message.setStringProperty("stringProperty",data[i]);
+ message.setIntProperty("intProperty",i);
+
+ if (verbose) {
+ if (log.isDebugEnabled()) {
+ log.debug("About to send a topic message: " + message + " with text: " + data[i]);
+ }
+ }
+
+ producer.send(producerDestination, message);
+ }
+
+ Thread.sleep(timeToLive + 1000);
+
+ // message should have expired.
+ assertNull(consumer.receive(1000));
+ }
+
+ /**
+ * Sends and consumes the messages to a topic destination.
+ *
+ * @throws Exception
+ */
+ public void testConsumeTopic() throws Exception {
+
+ MessageProducer producer = createProducer(0);
+
+ consumerDestination = session.createTopic(getConsumerSubject());
+ producerDestination = session.createTopic(getProducerSubject());
+
+ MessageConsumer consumer = createConsumer();
+ connection.start();
+
+ for (int i = 0; i < data.length; i++) {
+ Message message = session.createTextMessage(data[i]);
+ message.setStringProperty("stringProperty",data[i]);
+ message.setIntProperty("intProperty",i);
+
+ if (verbose) {
+ if (log.isDebugEnabled()) {
+ log.debug("About to send a topic message: " + message + " with text: " + data[i]);
+ }
+ }
+
+ producer.send(producerDestination, message);
+ }
+
+ // message should have expired.
+ assertNotNull(consumer.receive(1000));
+ }
+
+
+
+ protected MessageProducer createProducer(long timeToLive) throws JMSException {
+ MessageProducer producer = session.createProducer(null);
+ producer.setDeliveryMode(deliveryMode);
+ producer.setTimeToLive(timeToLive);
+
+ return producer;
+ }
+
+ protected MessageConsumer createConsumer() throws JMSException {
+ if (durable) {
+ log.info("Creating durable consumer");
+ return session.createDurableSubscriber((Topic) consumerDestination, getName());
+ }
+ return session.createConsumer(consumerDestination);
+ }
+
+ protected void tearDown() throws Exception {
+ log.info("Dumping stats...");
+
+ log.info("Closing down connection");
+
+ /** TODO we should be able to shut down properly */
+ session.close();
+ connection.close();
+ }
+
+}