You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/02/24 01:53:58 UTC

activemq git commit: Clean up some tests, reduce resources used and reduce runtime, convert to JUnit 4 when possible and add timeouts.

Repository: activemq
Updated Branches:
  refs/heads/master 05b401993 -> 02dc6ce98


Clean up some tests, reduce resources used and reduce runtime, convert
to JUnit 4 when possible and add timeouts.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/02dc6ce9
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/02dc6ce9
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/02dc6ce9

Branch: refs/heads/master
Commit: 02dc6ce982211ab56ac8f4c9fae401121302bb14
Parents: 05b4019
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Feb 23 19:53:14 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Feb 23 19:53:14 2015 -0500

----------------------------------------------------------------------
 .../activemq/EmbeddedBrokerTestSupport.java     |  10 +-
 .../apache/activemq/ExclusiveConsumerTest.java  |  91 +++++++--------
 .../JmsConsumerResetActiveListenerTest.java     |  55 +++++----
 .../JmsCreateConsumerInOnMessageTest.java       | 112 +++++++++----------
 4 files changed, 138 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/02dc6ce9/activemq-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java
index b049e96..48d56c1 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java
@@ -16,14 +16,15 @@
  */
 package org.apache.activemq;
 
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.springframework.jms.core.JmsTemplate;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
 
 /**
  * A useful base class which creates and closes an embedded broker
@@ -40,6 +41,7 @@ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport {
     protected ActiveMQDestination destination;
     protected JmsTemplate template;
 
+    @Override
     protected void setUp() throws Exception {
         if (broker == null) {
             broker = createBroker();
@@ -56,6 +58,7 @@ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport {
         template.afterPropertiesSet();
     }
 
+    @Override
     protected void tearDown() throws Exception {
         if (broker != null) {
             try {
@@ -119,6 +122,7 @@ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport {
     protected BrokerService createBroker() throws Exception {
         BrokerService answer = new BrokerService();
         answer.setPersistent(isPersistent());
+        answer.getManagementContext().setCreateConnector(false);
         answer.addConnector(bindAddress);
         return answer;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/02dc6ce9/activemq-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java
index 53ef28f..efe4c45 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq;
 
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
@@ -24,26 +27,35 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 
-import junit.framework.TestCase;
-
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
-public class ExclusiveConsumerTest extends TestCase {
+public class ExclusiveConsumerTest {
 
-    private static final String VM_BROKER_URL = "vm://localhost?broker.persistent=false&broker.useJmx=true";
+    private static final String VM_BROKER_URL = "vm://localhost";
 
-    public ExclusiveConsumerTest(String name) {
-        super(name);
-    }
+    private BrokerService brokerService;
 
-    @Override
-    protected void setUp() throws Exception {
-        super.setUp();
+    @Before
+    public void setUp() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(false);
+        brokerService.setSchedulerSupport(false);
+        brokerService.setAdvisorySupport(false);
+
+        brokerService.start();
     }
 
-    @Override
-    protected void tearDown() throws Exception {
-        super.tearDown();
+    @After
+    public void tearDown() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+            brokerService = null;
+        }
     }
 
     private Connection createConnection(final boolean start) throws JMSException {
@@ -55,6 +67,7 @@ public class ExclusiveConsumerTest extends TestCase {
         return conn;
     }
 
+    @Test(timeout = 60000)
     public void testExclusiveConsumerSelectedCreatedFirst() throws JMSException, InterruptedException {
         Connection conn = createConnection(true);
 
@@ -63,7 +76,6 @@ public class ExclusiveConsumerTest extends TestCase {
         Session senderSession = null;
 
         try {
-
             exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
             fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
             senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -87,15 +99,14 @@ public class ExclusiveConsumerTest extends TestCase {
             // Verify exclusive consumer receives the message.
             assertNotNull(exclusiveConsumer.receive(100));
             assertNull(fallbackConsumer.receive(100));
-
         } finally {
             fallbackSession.close();
             senderSession.close();
             conn.close();
         }
-
     }
 
+    @Test(timeout = 60000)
     public void testExclusiveConsumerSelectedCreatedAfter() throws JMSException, InterruptedException {
         Connection conn = createConnection(true);
 
@@ -104,7 +115,6 @@ public class ExclusiveConsumerTest extends TestCase {
         Session senderSession = null;
 
         try {
-
             exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
             fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
             senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -132,11 +142,10 @@ public class ExclusiveConsumerTest extends TestCase {
             senderSession.close();
             conn.close();
         }
-
     }
 
-    public void testFailoverToAnotherExclusiveConsumerCreatedFirst() throws JMSException,
-        InterruptedException {
+    @Test(timeout = 60000)
+    public void testFailoverToAnotherExclusiveConsumerCreatedFirst() throws JMSException, InterruptedException {
         Connection conn = createConnection(true);
 
         Session exclusiveSession1 = null;
@@ -145,14 +154,12 @@ public class ExclusiveConsumerTest extends TestCase {
         Session senderSession = null;
 
         try {
-
             exclusiveSession1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
             exclusiveSession2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
             fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
             senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
-            // This creates the exclusive consumer first which avoids AMQ-1024
-            // bug.
+            // This creates the exclusive consumer first which avoids AMQ-1024 bug.
             ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE2?consumer.exclusive=true");
             MessageConsumer exclusiveConsumer1 = exclusiveSession1.createConsumer(exclusiveQueue);
             MessageConsumer exclusiveConsumer2 = exclusiveSession2.createConsumer(exclusiveQueue);
@@ -173,8 +180,7 @@ public class ExclusiveConsumerTest extends TestCase {
             assertNull(exclusiveConsumer2.receive(100));
             assertNull(fallbackConsumer.receive(100));
 
-            // Close the exclusive consumer to verify the non-exclusive consumer
-            // takes over
+            // Close the exclusive consumer to verify the non-exclusive consumer takes over
             exclusiveConsumer1.close();
 
             producer.send(msg);
@@ -188,11 +194,10 @@ public class ExclusiveConsumerTest extends TestCase {
             senderSession.close();
             conn.close();
         }
-
     }
 
-    public void testFailoverToAnotherExclusiveConsumerCreatedAfter() throws JMSException,
-        InterruptedException {
+    @Test(timeout = 60000)
+    public void testFailoverToAnotherExclusiveConsumerCreatedAfter() throws JMSException, InterruptedException {
         Connection conn = createConnection(true);
 
         Session exclusiveSession1 = null;
@@ -201,14 +206,12 @@ public class ExclusiveConsumerTest extends TestCase {
         Session senderSession = null;
 
         try {
-
             exclusiveSession1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
             exclusiveSession2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
             fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
             senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
-            // This creates the exclusive consumer first which avoids AMQ-1024
-            // bug.
+            // This creates the exclusive consumer first which avoids AMQ-1024 bug.
             ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE6?consumer.exclusive=true");
             MessageConsumer exclusiveConsumer1 = exclusiveSession1.createConsumer(exclusiveQueue);
 
@@ -230,8 +233,7 @@ public class ExclusiveConsumerTest extends TestCase {
             assertNull(exclusiveConsumer2.receive(100));
             assertNull(fallbackConsumer.receive(100));
 
-            // Close the exclusive consumer to verify the non-exclusive consumer
-            // takes over
+            // Close the exclusive consumer to verify the non-exclusive consumer takes over
             exclusiveConsumer1.close();
 
             producer.send(msg);
@@ -239,15 +241,14 @@ public class ExclusiveConsumerTest extends TestCase {
 
             assertNotNull(exclusiveConsumer2.receive(1000));
             assertNull(fallbackConsumer.receive(100));
-
         } finally {
             fallbackSession.close();
             senderSession.close();
             conn.close();
         }
-
     }
 
+    @Test(timeout = 60000)
     public void testFailoverToNonExclusiveConsumer() throws JMSException, InterruptedException {
         Connection conn = createConnection(true);
 
@@ -261,8 +262,7 @@ public class ExclusiveConsumerTest extends TestCase {
             fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
             senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
-            // This creates the exclusive consumer first which avoids AMQ-1024
-            // bug.
+            // This creates the exclusive consumer first which avoids AMQ-1024 bug.
             ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE3?consumer.exclusive=true");
             MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
 
@@ -281,22 +281,20 @@ public class ExclusiveConsumerTest extends TestCase {
             assertNotNull(exclusiveConsumer.receive(100));
             assertNull(fallbackConsumer.receive(100));
 
-            // Close the exclusive consumer to verify the non-exclusive consumer
-            // takes over
+            // Close the exclusive consumer to verify the non-exclusive consumer takes over
             exclusiveConsumer.close();
 
             producer.send(msg);
 
             assertNotNull(fallbackConsumer.receive(100));
-
         } finally {
             fallbackSession.close();
             senderSession.close();
             conn.close();
         }
-
     }
 
+    @Test(timeout = 60000)
     public void testFallbackToExclusiveConsumer() throws JMSException, InterruptedException {
         Connection conn = createConnection(true);
 
@@ -310,8 +308,7 @@ public class ExclusiveConsumerTest extends TestCase {
             fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
             senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
-            // This creates the exclusive consumer first which avoids AMQ-1024
-            // bug.
+            // This creates the exclusive consumer first which avoids AMQ-1024 bug.
             ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE4?consumer.exclusive=true");
             MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
 
@@ -330,8 +327,7 @@ public class ExclusiveConsumerTest extends TestCase {
             assertNotNull(exclusiveConsumer.receive(100));
             assertNull(fallbackConsumer.receive(100));
 
-            // Close the exclusive consumer to verify the non-exclusive consumer
-            // takes over
+            // Close the exclusive consumer to verify the non-exclusive consumer takes over
             exclusiveConsumer.close();
 
             producer.send(msg);
@@ -339,19 +335,16 @@ public class ExclusiveConsumerTest extends TestCase {
             // Verify other non-exclusive consumer receices the message.
             assertNotNull(fallbackConsumer.receive(100));
 
-            // Create exclusive consumer to determine if it will start receiving
-            // the messages.
+            // Create exclusive consumer to determine if it will start receiving the messages.
             exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
 
             producer.send(msg);
             assertNotNull(exclusiveConsumer.receive(100));
             assertNull(fallbackConsumer.receive(100));
-
         } finally {
             fallbackSession.close();
             senderSession.close();
             conn.close();
         }
-
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/02dc6ce9/activemq-unit-tests/src/test/java/org/apache/activemq/JmsConsumerResetActiveListenerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsConsumerResetActiveListenerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsConsumerResetActiveListenerTest.java
index 1a1a958..2114945 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsConsumerResetActiveListenerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsConsumerResetActiveListenerTest.java
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import java.util.Vector;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -32,41 +35,51 @@ import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
-import junit.framework.TestCase;
-
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
 
-public class JmsConsumerResetActiveListenerTest extends TestCase {
+public class JmsConsumerResetActiveListenerTest {
 
     private Connection connection;
     private ActiveMQConnectionFactory factory;
-   
-    protected void setUp() throws Exception {
-        factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+
+    @Rule
+    public final TestName name = new TestName();
+
+    @Before
+    public void setUp() throws Exception {
+        factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false&broker.useJmx=false");
         connection = factory.createConnection();
     }
 
-    protected void tearDown() throws Exception {
+    @After
+    public void tearDown() throws Exception {
         if (connection != null) {
             connection.close();
             connection = null;
         }
     }
-    
+
     /**
      * verify the (undefined by spec) behaviour of setting a listener while receiving a message.
-     * 
+     *
      * @throws Exception
      */
+    @Test(timeout = 60000)
     public void testSetListenerFromListener() throws Exception {
         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        Destination dest = session.createQueue("Queue-" + getName());
+        Destination dest = session.createQueue("Queue-" + name.getMethodName());
         final MessageConsumer consumer = session.createConsumer(dest);
-       
+
         final CountDownLatch latch = new CountDownLatch(2);
         final AtomicBoolean first = new AtomicBoolean(true);
         final Vector<Object> results = new Vector<Object>();
         consumer.setMessageListener(new MessageListener() {
 
+            @Override
             public void onMessage(Message message) {
                 if (first.compareAndSet(true, false)) {
                     try {
@@ -83,14 +96,14 @@ public class JmsConsumerResetActiveListenerTest extends TestCase {
         });
 
         connection.start();
-        
+
         MessageProducer producer = session.createProducer(dest);
         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
         producer.send(session.createTextMessage("First"));
         producer.send(session.createTextMessage("Second"));
-        
+
         assertTrue("we did not timeout", latch.await(5, TimeUnit.SECONDS));
-        
+
         assertEquals("we have a result", 2, results.size());
         Object result = results.get(0);
         assertTrue(result instanceof TextMessage);
@@ -99,22 +112,24 @@ public class JmsConsumerResetActiveListenerTest extends TestCase {
         assertTrue(result instanceof TextMessage);
         assertEquals("result is first", "Second", ((TextMessage)result).getText());
     }
-    
+
     /**
      * and a listener on a new consumer, just in case.
       *
      * @throws Exception
      */
+    @Test(timeout = 60000)
     public void testNewConsumerSetListenerFromListener() throws Exception {
         final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        final Destination dest = session.createQueue("Queue-" + getName());
+        final Destination dest = session.createQueue("Queue-" + name.getMethodName());
         final MessageConsumer consumer = session.createConsumer(dest);
-       
+
         final CountDownLatch latch = new CountDownLatch(2);
         final AtomicBoolean first = new AtomicBoolean(true);
         final Vector<Object> results = new Vector<Object>();
         consumer.setMessageListener(new MessageListener() {
 
+            @Override
             public void onMessage(Message message) {
                 if (first.compareAndSet(true, false)) {
                     try {
@@ -132,14 +147,14 @@ public class JmsConsumerResetActiveListenerTest extends TestCase {
         });
 
         connection.start();
-        
+
         MessageProducer producer = session.createProducer(dest);
         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
         producer.send(session.createTextMessage("First"));
         producer.send(session.createTextMessage("Second"));
-        
+
         assertTrue("we did not timeout", latch.await(5, TimeUnit.SECONDS));
-        
+
         assertEquals("we have a result", 2, results.size());
         Object result = results.get(0);
         assertTrue(result instanceof TextMessage);

http://git-wip-us.apache.org/repos/asf/activemq/blob/02dc6ce9/activemq-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java
index 7a219e2..ebe22e2 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java
@@ -16,6 +16,11 @@
  */
 package org.apache.activemq;
 
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import javax.jms.Connection;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -24,76 +29,67 @@ import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.Topic;
 
-/**
- * 
- */
-public class JmsCreateConsumerInOnMessageTest extends TestSupport implements MessageListener {
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+public class JmsCreateConsumerInOnMessageTest {
 
     private Connection connection;
-    private Session publisherSession;
-    private Session consumerSession;
-    private MessageConsumer consumer;
-    private MessageConsumer testConsumer;
-    private MessageProducer producer;
-    private Topic topic;
-    private Object lock = new Object();
-
-    /*
-     * @see junit.framework.TestCase#setUp()
-     */
-    protected void setUp() throws Exception {
-        super.setUp();
-        super.topic = true;
-        connection = createConnection();
-        connection.setClientID("connection:" + getSubject());
-        publisherSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        topic = (Topic)super.createDestination("Test.Topic");
-        consumer = consumerSession.createConsumer(topic);
-        consumer.setMessageListener(this);
-        producer = publisherSession.createProducer(topic);
-        connection.start();
+    private ActiveMQConnectionFactory factory;
+
+    @Rule
+    public final TestName name = new TestName();
+
+    @Before
+    public void setUp() throws Exception {
+        factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false&broker.useJmx=false");
+        connection = factory.createConnection();
     }
 
-    /*
-     * @see junit.framework.TestCase#tearDown()
-     */
-    protected void tearDown() throws Exception {
-        super.tearDown();
-        connection.close();
+    @After
+    public void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
     }
 
     /**
      * Tests if a consumer can be created asynchronusly
-     * 
+     *
      * @throws Exception
      */
+    @Test(timeout = 60000)
     public void testCreateConsumer() throws Exception {
-        Message msg = super.createMessage();
-        producer.send(msg);
-        if (testConsumer == null) {
-            synchronized (lock) {
-                lock.wait(3000);
-            }
-        }
-        assertTrue(testConsumer != null);
-    }
+        final CountDownLatch done = new CountDownLatch(1);
 
-    /**
-     * Use the asynchronous subscription mechanism
-     * 
-     * @param message
-     */
-    public void onMessage(Message message) {
-        try {
-            testConsumer = consumerSession.createConsumer(topic);
-            consumerSession.createProducer(topic);
-            synchronized (lock) {
-                lock.notify();
+        final Session publisherSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Topic topic = publisherSession.createTopic("Test.Topic");
+
+        MessageConsumer consumer = consumerSession.createConsumer(topic);
+        consumer.setMessageListener(new MessageListener() {
+
+            @Override
+            public void onMessage(Message message) {
+                try {
+                    consumerSession.createConsumer(topic);
+                    consumerSession.createProducer(topic);
+                    done.countDown();
+                } catch (Exception ex) {
+                    assertTrue(false);
+                }
             }
-        } catch (Exception ex) {
-            ex.printStackTrace();
-            assertTrue(false);
-        }
+        });
+
+        MessageProducer producer = publisherSession.createProducer(topic);
+        connection.start();
+
+        producer.send(publisherSession.createTextMessage("test"));
+
+        assertTrue("Should have finished onMessage", done.await(5, TimeUnit.SECONDS));
     }
 }