You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by bu...@apache.org on 2019/01/04 17:55:00 UTC

[cxf] branch master updated: cxf-rt-transports-jms: improve test stability

This is an automated email from the ASF dual-hosted git repository.

buhhunyx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/master by this push:
     new 7225b3a  cxf-rt-transports-jms: improve test stability
7225b3a is described below

commit 7225b3a8c2a52218d2c2efdd0daa1556ffbf321b
Author: amarkevich <am...@talend.com>
AuthorDate: Thu Dec 27 16:37:58 2018 +0300

    cxf-rt-transports-jms: improve test stability
---
 .../cxf/transport/jms/AbstractJMSTester.java       | 164 +++++++++++++--------
 .../apache/cxf/transport/jms/JMSConduitTest.java   |  11 +-
 .../cxf/transport/jms/JMSConfigFactoryTest.java    |   4 +-
 .../cxf/transport/jms/JMSDestinationTest.java      | 131 ++++------------
 .../MessageIdAsCorrelationIdJMSConduitTest.java    |  31 ++--
 .../jms/PooledConnectionTempQueueTest.java         |  29 ++--
 .../cxf/transport/jms/RequestResponseTest.java     |  54 +------
 .../jms/uri/URIConfiguredConduitTest.java          |  18 +--
 .../transport/jms/util/MessageListenerTest.java    |  40 ++---
 9 files changed, 190 insertions(+), 292 deletions(-)

diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
index 9340ef9..37acc66 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
@@ -18,16 +18,16 @@
  */
 package org.apache.cxf.transport.jms;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Reader;
-import java.io.StringReader;
 import java.io.Writer;
 import java.net.URL;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
 import javax.xml.namespace.QName;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
@@ -39,36 +39,35 @@ import org.apache.cxf.helpers.IOUtils;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.service.Service;
 import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.testutil.common.TestUtil;
 import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.transport.MessageObserver;
-import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.cxf.wsdl11.WSDLServiceFactory;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public abstract class AbstractJMSTester {
     protected static final String WSDL = "/jms_test.wsdl";
     protected static final String SERVICE_NS = "http://cxf.apache.org/hello_world_jms";
     protected static final int MAX_RECEIVE_TIME = 10;
-    protected static final String MESSAGE_CONTENT = "HelloWorld";
     protected static Bus bus;
     protected static ActiveMQConnectionFactory cf1;
     protected static ConnectionFactory cf;
     protected static BrokerService broker;
+    private static final String MESSAGE_CONTENT = "HelloWorld";
 
     protected enum ExchangePattern { oneway, requestReply };
 
-    protected EndpointReferenceType target;
-    protected Message inMessage;
-    protected Message destMessage;
+    private final AtomicReference<Message> inMessage = new AtomicReference<>();
+    private final AtomicReference<Message> destMessage = new AtomicReference<>();
 
     @BeforeClass
     public static void startSerices() throws Exception {
@@ -92,12 +91,12 @@ public abstract class AbstractJMSTester {
         broker.stop();
     }
 
-    protected EndpointInfo setupServiceInfo(String serviceName, String portName) {
+    protected static EndpointInfo setupServiceInfo(String serviceName, String portName) {
         return setupServiceInfo(SERVICE_NS, WSDL, serviceName, portName);
     }
 
-    protected EndpointInfo setupServiceInfo(String ns, String wsdl, String serviceName, String portName) {
-        URL wsdlUrl = getClass().getResource(wsdl);
+    protected static EndpointInfo setupServiceInfo(String ns, String wsdl, String serviceName, String portName) {
+        URL wsdlUrl = AbstractJMSTester.class.getResource(wsdl);
         if (wsdlUrl == null) {
             throw new IllegalArgumentException("Wsdl file not found on class path " + wsdl);
         }
@@ -113,64 +112,72 @@ public abstract class AbstractJMSTester {
     protected MessageObserver createMessageObserver() {
         return new MessageObserver() {
             public void onMessage(Message m) {
-                Exchange exchange = new ExchangeImpl();
-                exchange.setInMessage(m);
-                m.setExchange(exchange);
-                destMessage = m;
+//                Exchange exchange = new ExchangeImpl();
+//                exchange.setInMessage(m);
+//                m.setExchange(exchange);
+                destMessage.set(m);
+                synchronized (destMessage) {
+                    destMessage.notifyAll();
+                }
             }
         };
     }
 
-    protected void sendMessageAsync(Conduit conduit, Message message) throws IOException {
+    protected static void sendMessageAsync(Conduit conduit, Message message) throws IOException {
         sendoutMessage(conduit, message, false, false);
     }
 
-    protected void sendMessageSync(Conduit conduit, Message message) throws IOException {
+    protected static void sendMessageSync(Conduit conduit, Message message) throws IOException {
         sendoutMessage(conduit, message, false, true);
     }
 
-    protected void sendMessage(Conduit conduit, Message message, boolean synchronous) throws IOException {
+    protected static void sendMessage(Conduit conduit, Message message, boolean synchronous) throws IOException {
         sendoutMessage(conduit, message, false, synchronous);
     }
 
-    protected void sendOneWayMessage(Conduit conduit, Message message) throws IOException {
+    protected static void sendOneWayMessage(Conduit conduit, Message message) throws IOException {
         sendoutMessage(conduit, message, true, true);
     }
 
-    private void sendoutMessage(Conduit conduit,
+    private static void sendoutMessage(Conduit conduit,
                                   Message message,
                                   boolean isOneWay,
                                   boolean synchronous) throws IOException {
-
-        Exchange exchange = new ExchangeImpl();
+        final Exchange exchange = new ExchangeImpl();
         exchange.setOneWay(isOneWay);
         exchange.setSynchronous(synchronous);
         message.setExchange(exchange);
         exchange.setOutMessage(message);
         conduit.prepare(message);
-        OutputStream os = message.getContent(OutputStream.class);
-        Writer writer = message.getContent(Writer.class);
-        assertTrue("The OutputStream and Writer should not both be null ", os != null || writer != null);
-        if (os != null) {
-            os.write(MESSAGE_CONTENT.getBytes()); // TODO encoding
-            os.close();
-        } else {
-            writer.write(MESSAGE_CONTENT);
-            writer.close();
+        try (OutputStream os = message.getContent(OutputStream.class)) {
+            if (os != null) {
+                os.write(MESSAGE_CONTENT.getBytes()); // TODO encoding
+                return;
+            }
+        }
+        try (Writer writer = message.getContent(Writer.class)) {
+            if (writer != null) {
+                writer.write(MESSAGE_CONTENT);
+                return;
+            }
         }
+        fail("The OutputStream and Writer should not both be null");
     }
 
-    protected JMSConduit setupJMSConduit(EndpointInfo ei) throws IOException {
+    protected static JMSConduit setupJMSConduit(EndpointInfo ei) throws IOException {
         JMSConfiguration jmsConfig = JMSConfigFactory.createFromEndpointInfo(bus, ei, null);
         jmsConfig.setConnectionFactory(cf);
-        return new JMSConduit(target, jmsConfig, bus);
+        return new JMSConduit(null, jmsConfig, bus);
     }
 
     protected JMSConduit setupJMSConduitWithObserver(EndpointInfo ei) throws IOException {
         JMSConduit jmsConduit = setupJMSConduit(ei);
         MessageObserver observer = new MessageObserver() {
             public void onMessage(Message m) {
-                inMessage = m;
+                inMessage.set(m);
+                synchronized (inMessage) {
+                    inMessage.notifyAll();
+                }
             }
         };
         jmsConduit.setMessageObserver(observer);
@@ -183,58 +190,85 @@ public abstract class AbstractJMSTester {
         return new JMSDestination(bus, ei, jmsConfig);
     }
 
-    protected String getContent(Message message) {
-        ByteArrayInputStream bis = (ByteArrayInputStream)message.getContent(InputStream.class);
+    protected static Message createMessage() {
+        return createMessage(null);
+    }
+
+    protected static Message createMessage(String correlationId) {
+        Message outMessage = new MessageImpl();
+        JMSMessageHeadersType header = new JMSMessageHeadersType();
+        header.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+        header.setJMSPriority(1);
+        header.setTimeToLive(1000L);
+        outMessage.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, header);
+        outMessage.put(Message.ENCODING, "US-ASCII");
+        return outMessage;
+    }
+
+    protected static void verifyReceivedMessage(Message message) {
         String response = "<not found>";
+        InputStream bis = message.getContent(InputStream.class);
         if (bis != null) {
-            byte[] bytes = new byte[bis.available()];
             try {
+                byte[] bytes = new byte[bis.available()];
                 bis.read(bytes);
+                response = IOUtils.newStringFromBytes(bytes);
             } catch (IOException ex) {
-                assertFalse("Read the Destination recieved Message error ", false);
-                ex.printStackTrace();
+                fail("Read the Destination recieved Message error: " + ex.getMessage());
             }
-            response = IOUtils.newStringFromBytes(bytes);
         } else {
-            StringReader reader = (StringReader)message.getContent(Reader.class);
+            Reader reader = message.getContent(Reader.class);
             char[] buffer = new char[5000];
             try {
                 int i = reader.read(buffer);
                 response = new String(buffer, 0, i);
             } catch (IOException e) {
-                assertFalse("Read the Destination recieved Message error ", false);
-                e.printStackTrace();
+                fail("Read the Destination recieved Message error: " + e.getMessage());
             }
         }
-        return response;
+        assertEquals("The response content should be equal", MESSAGE_CONTENT, response);
     }
 
-    protected void waitForReceiveInMessage() {
-        int waitTime = 0;
-        while (inMessage == null && waitTime < MAX_RECEIVE_TIME * 10) {
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException e) {
-                // do nothing here
+    protected static void verifyHeaders(Message msgIn, Message msgOut) {
+        JMSMessageHeadersType outHeader = (JMSMessageHeadersType)msgOut
+            .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
+
+        JMSMessageHeadersType inHeader = (JMSMessageHeadersType)msgIn
+            .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
+
+        verifyJmsHeaderEquality(outHeader, inHeader);
+
+    }
+
+    protected static void verifyJmsHeaderEquality(JMSMessageHeadersType outHeader, JMSMessageHeadersType inHeader) {
+        assertEquals("The inMessage and outMessage JMS Header's JMSPriority should be equals", outHeader
+            .getJMSPriority(), inHeader.getJMSPriority());
+        assertEquals("The inMessage and outMessage JMS Header's JMSDeliveryMode should be equals", outHeader
+                     .getJMSDeliveryMode(), inHeader.getJMSDeliveryMode());
+        assertEquals("The inMessage and outMessage JMS Header's JMSType should be equals", outHeader
+            .getJMSType(), inHeader.getJMSType());
+    }
+
+
+    protected Message waitForReceiveInMessage() throws InterruptedException {
+        if (null == inMessage.get()) {
+            synchronized (inMessage) {
+                inMessage.wait(MAX_RECEIVE_TIME * 1000L);
             }
-            waitTime++;
+            assertNotNull("Can't receive the Conduit Message in " + MAX_RECEIVE_TIME + " seconds", inMessage.get());
         }
-        assertTrue("Can't receive the Conduit Message in " + MAX_RECEIVE_TIME + " seconds",
-                   inMessage != null);
+        return inMessage.getAndSet(null);
     }
 
-    protected void waitForReceiveDestMessage() {
-        int waitTime = 0;
-        while (destMessage == null && waitTime < MAX_RECEIVE_TIME * 10) {
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException e) {
-                // do nothing here
+    protected Message waitForReceiveDestMessage() throws InterruptedException {
+        if (null == destMessage.get()) {
+            synchronized (destMessage) {
+                destMessage.wait(MAX_RECEIVE_TIME * 1000L);
             }
-            waitTime++;
+            assertNotNull("Can't receive the Destination message in " + MAX_RECEIVE_TIME + " seconds",
+                    destMessage.get());
         }
-        assertNotNull("Can't receive the Destination message in " + MAX_RECEIVE_TIME
-                   + " seconds", destMessage);
+        return destMessage.getAndSet(null);
     }
 
 }
\ No newline at end of file
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
index 6a6ee45..bf34bd3 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
@@ -36,8 +36,7 @@ public class JMSConduitTest extends AbstractJMSTester {
 
     @Test
     public void testGetConfiguration() throws Exception {
-        EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/hello_world_jms", WSDL,
-                         "HelloWorldQueueBinMsgService", "HelloWorldQueueBinMsgPort");
+        EndpointInfo ei = setupServiceInfo("HelloWorldQueueBinMsgService", "HelloWorldQueueBinMsgPort");
         JMSConduit conduit = setupJMSConduit(ei);
         assertEquals("Can't get the right ClientReceiveTimeout", 500L, conduit.getJmsConfig()
             .getReceiveTimeout().longValue());
@@ -46,8 +45,7 @@ public class JMSConduitTest extends AbstractJMSTester {
 
     @Test
     public void testPrepareSend() throws Exception {
-        EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/hello_world_jms", WSDL,
-                         "HelloWorldService", "HelloWorldPort");
+        EndpointInfo ei = setupServiceInfo("HelloWorldService", "HelloWorldPort");
 
         JMSConduit conduit = setupJMSConduit(ei);
         Message message = new MessageImpl();
@@ -66,12 +64,11 @@ public class JMSConduitTest extends AbstractJMSTester {
      */
     @Test
     public void testTimeoutOnReceive() throws Exception {
-        EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/hello_world_jms", WSDL,
-                         "HelloWorldServiceLoop", "HelloWorldPortLoop");
+        EndpointInfo ei = setupServiceInfo("HelloWorldServiceLoop", "HelloWorldPortLoop");
 
         JMSConduit conduit = setupJMSConduitWithObserver(ei);
         // If the system is extremely fast. The message could still get through
-        conduit.getJmsConfig().setReceiveTimeout(Long.valueOf(1));
+        conduit.getJmsConfig().setReceiveTimeout(1L);
         Message message = new MessageImpl();
         try {
             sendMessageSync(conduit, message);
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConfigFactoryTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConfigFactoryTest.java
index c15940f..f785a0f 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConfigFactoryTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConfigFactoryTest.java
@@ -39,7 +39,7 @@ public class JMSConfigFactoryTest extends AbstractJMSTester {
     @Test
     public void testUsernameAndPassword() throws Exception {
         EndpointInfo ei = setupServiceInfo("HelloWorldService", "HelloWorldPort");
-        JMSConfiguration config = JMSConfigFactory.createFromEndpointInfo(bus, ei, target);
+        JMSConfiguration config = JMSConfigFactory.createFromEndpointInfo(bus, ei, null);
         Assert.assertEquals("User name does not match.", "testUser", config.getUserName());
         Assert.assertEquals("Password does not match.", "testPassword", config.getPassword());
     }
@@ -92,7 +92,7 @@ public class JMSConfigFactoryTest extends AbstractJMSTester {
     @Test
     public void testMessageSelectorIsSet() {
         EndpointInfo ei = setupServiceInfo("HelloWorldSelectorService", "HelloWorldPort");
-        JMSConfiguration config = JMSConfigFactory.createFromEndpointInfo(bus, ei, target);
+        JMSConfiguration config = JMSConfigFactory.createFromEndpointInfo(bus, ei, null);
         Assert.assertEquals("customJMSAttribute=helloWorld", config.getMessageSelector());
     }
 }
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
index f216f6b..22a770c 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
@@ -19,12 +19,6 @@
 
 package org.apache.cxf.transport.jms;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Reader;
-import java.io.StringReader;
-
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
@@ -33,7 +27,6 @@ import javax.jms.JMSException;
 import javax.jms.Queue;
 import javax.jms.Topic;
 
-import org.apache.cxf.helpers.IOUtils;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.Message;
@@ -49,7 +42,6 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -68,17 +60,15 @@ public class JMSDestinationTest extends AbstractJMSTester {
 
     @Test
     public void testDurableSubscriber() throws Exception {
-        destMessage = null;
         EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", "HelloWorldPubSubPort");
         JMSConduit conduit = setupJMSConduitWithObserver(ei);
-        Message outMessage = new MessageImpl();
-        setupMessageHeader(outMessage);
+        Message outMessage = createMessage();
         JMSDestination destination = setupJMSDestination(ei);
         destination.setMessageObserver(createMessageObserver());
         // The JMSBroker (ActiveMQ 5.x) need to take some time to setup the DurableSubscriber
-        Thread.sleep(500);
+        Thread.sleep(500L);
         sendOneWayMessage(conduit, outMessage);
-        waitForReceiveDestMessage();
+        Message destMessage = waitForReceiveDestMessage();
 
         assertNotNull("The destiantion should have got the message ", destMessage);
         verifyReceivedMessage(destMessage);
@@ -86,7 +76,7 @@ public class JMSDestinationTest extends AbstractJMSTester {
         conduit.close();
         destination.shutdown();
     }
-    
+
     @Test(expected = InvalidClientIDException.class)
     public void testDurableInvalidClientId() throws Throwable {
         Connection con = cf1.createConnection();
@@ -94,7 +84,6 @@ public class JMSDestinationTest extends AbstractJMSTester {
         try {
             con.setClientID("testClient");
             con.start();
-            destMessage = null;
             EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", "HelloWorldPubSubPort");
             JMSConfiguration jmsConfig = JMSConfigFactory.createFromEndpointInfo(bus, ei, null);
             jmsConfig.setDurableSubscriptionClientId("testClient");
@@ -117,12 +106,11 @@ public class JMSDestinationTest extends AbstractJMSTester {
         destination.setMessageObserver(createMessageObserver());
 
         JMSConduit conduit = setupJMSConduitWithObserver(ei);
-        Message outMessage = new MessageImpl();
-        setupMessageHeader(outMessage);
+        Message outMessage = createMessage();
 
         sendOneWayMessage(conduit, outMessage);
         // wait for the message to be get from the destination
-        waitForReceiveDestMessage();
+        Message destMessage = waitForReceiveDestMessage();
         // just verify the Destination inMessage
         assertNotNull("The destiantion should have got the message ", destMessage);
         verifyReceivedMessage(destMessage);
@@ -131,51 +119,17 @@ public class JMSDestinationTest extends AbstractJMSTester {
         destination.shutdown();
     }
 
-    private void setupMessageHeader(Message outMessage, String correlationId, String replyTo) {
+    private static void setupMessageHeader(Message outMessage, String correlationId, String replyTo) {
         JMSMessageHeadersType header = new JMSMessageHeadersType();
         header.setJMSCorrelationID(correlationId);
         header.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
         header.setJMSPriority(1);
-        header.setTimeToLive(1000);
-        header.setJMSReplyTo(replyTo != null ? replyTo : null);
+        header.setTimeToLive(1000L);
+        header.setJMSReplyTo(replyTo);
         outMessage.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, header);
         outMessage.put(Message.ENCODING, "US-ASCII");
     }
 
-    private void setupMessageHeader(Message outMessage) {
-        setupMessageHeader(outMessage, "Destination test", null);
-    }
-
-    private void setupMessageHeader(Message outMessage, String correlationId) {
-        setupMessageHeader(outMessage, correlationId, null);
-    }
-
-    private void verifyReceivedMessage(Message message) {
-        ByteArrayInputStream bis = (ByteArrayInputStream)message.getContent(InputStream.class);
-        String response = "<not found>";
-        if (bis != null) {
-            byte[] bytes = new byte[bis.available()];
-            try {
-                bis.read(bytes);
-            } catch (IOException ex) {
-                assertFalse("Read the Destination recieved Message error ", false);
-                ex.printStackTrace();
-            }
-            response = IOUtils.newStringFromBytes(bytes);
-        } else {
-            StringReader reader = (StringReader)message.getContent(Reader.class);
-            char[] buffer = new char[5000];
-            try {
-                int i = reader.read(buffer);
-                response = new String(buffer, 0, i);
-            } catch (IOException e) {
-                assertFalse("Read the Destination recieved Message error ", false);
-                e.printStackTrace();
-            }
-        }
-        assertEquals("The response content should be equal", AbstractJMSTester.MESSAGE_CONTENT, response);
-    }
-
     private void verifyRequestResponseHeaders(Message msgIn, Message msgOut) {
         JMSMessageHeadersType outHeader = (JMSMessageHeadersType)msgOut
             .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
@@ -188,27 +142,6 @@ public class JMSDestinationTest extends AbstractJMSTester {
             .get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
 
         verifyJmsHeaderEquality(outHeader, inHeader);
-
-    }
-
-    private void verifyHeaders(Message msgIn, Message msgOut) {
-        JMSMessageHeadersType outHeader = (JMSMessageHeadersType)msgOut
-            .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
-
-        JMSMessageHeadersType inHeader = (JMSMessageHeadersType)msgIn
-            .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
-
-        verifyJmsHeaderEquality(outHeader, inHeader);
-
-    }
-
-    private void verifyJmsHeaderEquality(JMSMessageHeadersType outHeader, JMSMessageHeadersType inHeader) {
-        assertEquals("The inMessage and outMessage JMS Header's JMSPriority should be equals", outHeader
-            .getJMSPriority(), inHeader.getJMSPriority());
-        assertEquals("The inMessage and outMessage JMS Header's JMSDeliveryMode should be equals", outHeader
-                     .getJMSDeliveryMode(), inHeader.getJMSDeliveryMode());
-        assertEquals("The inMessage and outMessage JMS Header's JMSType should be equals", outHeader
-            .getJMSType(), inHeader.getJMSType());
     }
 
     @Test
@@ -233,8 +166,7 @@ public class JMSDestinationTest extends AbstractJMSTester {
         JMSConduit conduit = setupJMSConduitWithObserver(ei);
         conduit.getJmsConfig().setCreateSecurityContext(createSecurityContext);
 
-        final Message outMessage = new MessageImpl();
-        setupMessageHeader(outMessage, null);
+        final Message outMessage = createMessage();
         final JMSDestination destination = setupJMSDestination(ei);
 
 
@@ -263,19 +195,17 @@ public class JMSDestinationTest extends AbstractJMSTester {
         // wait for the message to be got from the destination,
         // create the thread to handler the Destination incoming message
 
-        waitForReceiveInMessage();
-        verifyReceivedMessage(inMessage);
+        verifyReceivedMessage(waitForReceiveInMessage());
         // wait for a while for the jms session recycling
 
-        inMessage = null;
         // Send a second message to check for an issue
         // Where the session was closed the second time
         sendMessageSync(conduit, outMessage);
-        waitForReceiveInMessage();
+        Message inMessage = waitForReceiveInMessage();
         verifyReceivedMessage(inMessage);
 
         // wait for a while for the jms session recycling
-        Thread.sleep(1000);
+//        Thread.sleep(1000L);
         conduit.close();
         destination.shutdown();
 
@@ -289,8 +219,7 @@ public class JMSDestinationTest extends AbstractJMSTester {
 
         // set up the conduit send to be true
         JMSConduit conduit = setupJMSConduitWithObserver(ei);
-        final Message outMessage = new MessageImpl();
-        setupMessageHeader(outMessage, null);
+        final Message outMessage = createMessage();
 
         JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
             .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
@@ -325,7 +254,7 @@ public class JMSDestinationTest extends AbstractJMSTester {
         // wait for the message to be got from the destination,
         // create the thread to handler the Destination incoming message
 
-        waitForReceiveInMessage();
+        Message inMessage = waitForReceiveInMessage();
         verifyReceivedMessage(inMessage);
 
         verifyRequestResponseHeaders(inMessage, outMessage);
@@ -336,7 +265,7 @@ public class JMSDestinationTest extends AbstractJMSTester {
         // TODO we need to check the SOAP JMS transport properties here
 
         // wait for a while for the jms session recycling
-        Thread.sleep(1000);
+//        Thread.sleep(1000L);
         conduit.close();
         destination.shutdown();
     }
@@ -371,10 +300,9 @@ public class JMSDestinationTest extends AbstractJMSTester {
         destination.setMessageObserver(createMessageObserver());
         // set up the conduit send to be true
         JMSConduit conduit = setupJMSConduitWithObserver(ei);
-        final Message outMessage = new MessageImpl();
-        setupMessageHeader(outMessage, null);
+        final Message outMessage = createMessage();
         sendOneWayMessage(conduit, outMessage);
-        waitForReceiveDestMessage();
+        Message destMessage = waitForReceiveDestMessage();
         SecurityContext securityContext = destMessage.get(SecurityContext.class);
 
         conduit.close();
@@ -390,30 +318,26 @@ public class JMSDestinationTest extends AbstractJMSTester {
         /* 1. Test that replyTo destination set in WSDL is NOT used
          * in spec compliant mode */
 
-        destMessage = null;
         EndpointInfo ei = setupServiceInfo(
                          "HWStaticReplyQBinMsgService", "HWStaticReplyQBinMsgPort");
         JMSConduit conduit = setupJMSConduitWithObserver(ei);
-        Message outMessage = new MessageImpl();
-        setupMessageHeader(outMessage);
+        Message outMessage = createMessage();
         JMSDestination destination = setupJMSDestination(ei);
         destination.setMessageObserver(createMessageObserver());
         sendOneWayMessage(conduit, outMessage);
-        waitForReceiveDestMessage();
+        Message destMessage = waitForReceiveDestMessage();
         // just verify the Destination inMessage
         assertNotNull("The destination should have got the message ", destMessage);
         verifyReplyToNotSet(destMessage);
-        destMessage = null;
 
         /* 2. Test that replyTo destination set in WSDL IS used
          * in spec non-compliant mode */
 
         sendOneWayMessage(conduit, outMessage);
-        waitForReceiveDestMessage();
+        destMessage = waitForReceiveDestMessage();
         assertNotNull("The destination should have got the message ", destMessage);
         String exName = getQueueName(conduit.getJmsConfig().getReplyDestination());
         verifyReplyToSet(destMessage, Queue.class, exName);
-        destMessage = null;
 
         /* 3. Test that replyTo destination provided via invocation context
          * overrides the value set in WSDL and IS used in spec non-compliant mode */
@@ -422,34 +346,31 @@ public class JMSDestinationTest extends AbstractJMSTester {
         exName += ".context";
         setupMessageHeader(outMessage, "cidValue", contextReplyTo);
         sendOneWayMessage(conduit, outMessage);
-        waitForReceiveDestMessage();
+        destMessage = waitForReceiveDestMessage();
         assertNotNull("The destiantion should have got the message ", destMessage);
         verifyReplyToSet(destMessage, Queue.class, exName);
-        destMessage = null;
 
         /* 4. Test that replyTo destination provided via invocation context
          * and the value set in WSDL are NOT used in spec non-compliant mode
          * when JMSConstants.JMS_SET_REPLY_TO == false */
 
-        setupMessageHeader(outMessage);
+        setupMessageHeader(outMessage, null, null);
         outMessage.put(JMSConstants.JMS_SET_REPLY_TO, Boolean.FALSE);
         sendOneWayMessage(conduit, outMessage);
-        waitForReceiveDestMessage();
+        destMessage = waitForReceiveDestMessage();
         assertNotNull("The destiantion should have got the message ", destMessage);
         verifyReplyToNotSet(destMessage);
-        destMessage = null;
 
         /* 5. Test that replyTo destination set in WSDL IS used in spec non-compliant
          * mode when JMSConstants.JMS_SET_REPLY_TO == true */
 
-        setupMessageHeader(outMessage);
+        setupMessageHeader(outMessage, null, null);
         outMessage.put(JMSConstants.JMS_SET_REPLY_TO, Boolean.TRUE);
         sendOneWayMessage(conduit, outMessage);
-        waitForReceiveDestMessage();
+        destMessage = waitForReceiveDestMessage();
         assertNotNull("The destiantion should have got the message ", destMessage);
         exName = getQueueName(conduit.getJmsConfig().getReplyDestination());
         verifyReplyToSet(destMessage, Queue.class, exName);
-        destMessage = null;
 
         conduit.close();
         destination.shutdown();
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/MessageIdAsCorrelationIdJMSConduitTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/MessageIdAsCorrelationIdJMSConduitTest.java
index 9d3e37a..3df49fb 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/MessageIdAsCorrelationIdJMSConduitTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/MessageIdAsCorrelationIdJMSConduitTest.java
@@ -21,7 +21,6 @@ package org.apache.cxf.transport.jms;
 import javax.jms.ConnectionFactory;
 
 import org.apache.activemq.pool.PooledConnectionFactory;
-import org.apache.cxf.Bus;
 import org.apache.cxf.BusFactory;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.ExchangeImpl;
@@ -30,9 +29,11 @@ import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.transport.jms.util.TestReceiver;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 
-import org.junit.Assert;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
 /**
  * Checks if a CXF client works correlates requests and responses correctly if the server sets the message id
  * as correlation id on the response message
@@ -40,7 +41,8 @@ import org.junit.Test;
 public class MessageIdAsCorrelationIdJMSConduitTest {
     private static final String SERVICE_QUEUE = "test";
     private static final String BROKER_URI = "vm://localhost?broker.persistent=false";
-    private ConnectionFactory connectionFactory;
+
+    private ConnectionFactory connectionFactory = new PooledConnectionFactory(BROKER_URI);
 
     @Test
     public void testSendReceiveWithTempReplyQueue() throws Exception {
@@ -52,13 +54,9 @@ public class MessageIdAsCorrelationIdJMSConduitTest {
         sendAndReceive(true, "testreply");
     }
 
-    public void sendAndReceive(boolean synchronous, String replyDestination) throws Exception {
-        BusFactory bf = BusFactory.newInstance();
-        Bus bus = bf.createBus();
-        BusFactory.setDefaultBus(bus);
+    private void sendAndReceive(boolean synchronous, String replyDestination) throws InterruptedException {
         EndpointReferenceType target = new EndpointReferenceType();
 
-        connectionFactory = new PooledConnectionFactory(BROKER_URI);
         TestReceiver receiver = new TestReceiver(connectionFactory, SERVICE_QUEUE, true);
         receiver.runAsync();
 
@@ -68,7 +66,7 @@ public class MessageIdAsCorrelationIdJMSConduitTest {
         jmsConfig.setUseConduitIdSelector(false);
         jmsConfig.setReplyDestination(replyDestination);
 
-        JMSConduit conduit = new JMSConduit(target, jmsConfig, bus);
+        JMSConduit conduit = new JMSConduit(target, jmsConfig, BusFactory.getDefaultBus());
         Exchange exchange = new ExchangeImpl();
         exchange.setSynchronous(synchronous);
         Message message = new MessageImpl();
@@ -76,21 +74,16 @@ public class MessageIdAsCorrelationIdJMSConduitTest {
         conduit.sendExchange(exchange, "Request");
         waitForAsyncReply(exchange);
         receiver.close();
-        if (exchange.getInMessage() == null) {
-            throw new RuntimeException("No reply received within 2 seconds");
-        }
+        assertNotNull("No reply received within 2 seconds", exchange.getInMessage());
         JMSMessageHeadersType inHeaders = (JMSMessageHeadersType)exchange.getInMessage()
             .get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
-        Assert.assertEquals(receiver.getRequestMessageId(), inHeaders.getJMSCorrelationID());
+        assertEquals(receiver.getRequestMessageId(), inHeaders.getJMSCorrelationID());
         conduit.close();
-        bus.shutdown(true);
     }
 
-    private void waitForAsyncReply(Exchange exchange) throws InterruptedException {
-        int count = 0;
-        while (exchange.getInMessage() == null && count <= 20) {
-            Thread.sleep(100);
-            count++;
+    private static void waitForAsyncReply(Exchange exchange) throws InterruptedException {
+        for (int count = 0; exchange.getInMessage() == null && count <= 20; count++) {
+            Thread.sleep(100L);
         }
     }
 
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledConnectionTempQueueTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledConnectionTempQueueTest.java
index a60ef7a..8cccc26 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledConnectionTempQueueTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledConnectionTempQueueTest.java
@@ -18,8 +18,6 @@
  */
 package org.apache.cxf.transport.jms;
 
-import java.util.concurrent.Executors;
-
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
@@ -32,16 +30,17 @@ import javax.jms.TextMessage;
 
 import org.apache.activemq.pool.PooledConnectionFactory;
 
-import org.junit.Assert;
 import org.junit.Test;
 
+import static org.junit.Assert.assertNotNull;
+
 public class PooledConnectionTempQueueTest {
 
     protected static final String SERVICE_QUEUE = "queue1";
 
     @Test
     public void testTempQueueIssue() throws JMSException, InterruptedException {
-        final PooledConnectionFactory cf = new PooledConnectionFactory("vm://localhost?broker.persistent=false");
+        final ConnectionFactory cf = new PooledConnectionFactory("vm://localhost?broker.persistent=false");
 
         Connection con1 = cf.createConnection();
         con1.start();
@@ -49,20 +48,18 @@ public class PooledConnectionTempQueueTest {
         // This order seems to matter to reproduce the issue
         con1.close();
 
-        Executors.newSingleThreadExecutor().execute(new Runnable() {
-            public void run() {
-                try {
-                    receiveAndRespondWithMessageIdAsCorrelationId(cf, SERVICE_QUEUE);
-                } catch (JMSException e) {
-                    e.printStackTrace();
-                }
+        new Thread(() -> {
+            try {
+                receiveAndRespondWithMessageIdAsCorrelationId(cf, SERVICE_QUEUE);
+            } catch (Exception e) {
+                e.printStackTrace();
             }
-        });
+        }).start();
 
         sendWithReplyToTemp(cf, SERVICE_QUEUE);
     }
 
-    private void sendWithReplyToTemp(ConnectionFactory cf, String serviceQueue) throws JMSException,
+    private static void sendWithReplyToTemp(ConnectionFactory cf, String serviceQueue) throws JMSException,
         InterruptedException {
         Connection con = cf.createConnection();
         con.start();
@@ -74,11 +71,11 @@ public class PooledConnectionTempQueueTest {
         producer.send(msg);
 
         // This sleep also seems to matter
-        Thread.sleep(500);
+        Thread.sleep(500L);
 
         MessageConsumer consumer = session.createConsumer(tempQueue);
         Message replyMsg = consumer.receive();
-        Assert.assertNotNull(replyMsg);
+        assertNotNull(replyMsg);
         //System.out.println(replyMsg.getJMSCorrelationID());
 
         consumer.close();
@@ -88,7 +85,7 @@ public class PooledConnectionTempQueueTest {
         con.close();
     }
 
-    public void receiveAndRespondWithMessageIdAsCorrelationId(ConnectionFactory connectionFactory,
+    public static void receiveAndRespondWithMessageIdAsCorrelationId(ConnectionFactory connectionFactory,
                                                               String queueName) throws JMSException {
         Connection con = connectionFactory.createConnection();
         Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java
index ab77644..053e78e 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java
@@ -21,8 +21,6 @@ package org.apache.cxf.transport.jms;
 
 import java.io.IOException;
 
-import javax.jms.DeliveryMode;
-
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.Message;
@@ -33,36 +31,8 @@ import org.apache.cxf.transport.MessageObserver;
 
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-
 public class RequestResponseTest extends AbstractJMSTester {
 
-    private void verifyReceivedMessage(Message message) {
-        String response = getContent(message);
-        assertEquals("The response content should be equal", AbstractJMSTester.MESSAGE_CONTENT, response);
-    }
-
-    private void verifyHeaders(Message msgIn, Message msgOut) {
-        JMSMessageHeadersType outHeader = (JMSMessageHeadersType)msgOut
-            .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
-
-        JMSMessageHeadersType inHeader = (JMSMessageHeadersType)msgIn
-            .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
-
-        verifyJmsHeaderEquality(outHeader, inHeader);
-
-    }
-
-    private void verifyJmsHeaderEquality(JMSMessageHeadersType outHeader, JMSMessageHeadersType inHeader) {
-        assertEquals("The inMessage and outMessage JMS Header's JMSPriority should be equals", outHeader
-            .getJMSPriority(), inHeader.getJMSPriority());
-        assertEquals("The inMessage and outMessage JMS Header's JMSDeliveryMode should be equals", outHeader
-                     .getJMSDeliveryMode(), inHeader.getJMSDeliveryMode());
-        assertEquals("The inMessage and outMessage JMS Header's JMSType should be equals", outHeader
-            .getJMSType(), inHeader.getJMSType());
-    }
-
-
     @Test
     public void testRequestQueueResponseTempQueue() throws Exception {
         EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/jms_simple", "/wsdl/jms_spec_testsuite.wsdl",
@@ -94,19 +64,8 @@ public class RequestResponseTest extends AbstractJMSTester {
         sendAndReceiveMessages(ei, false);
     }
 
-    private Message createMessage() {
-        Message outMessage = new MessageImpl();
-        JMSMessageHeadersType header = new JMSMessageHeadersType();
-        header.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
-        header.setJMSPriority(1);
-        header.setTimeToLive(1000);
-        outMessage.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, header);
-        outMessage.put(Message.ENCODING, "US-ASCII");
-        return outMessage;
-    }
-
-    protected void sendAndReceiveMessages(EndpointInfo ei, boolean synchronous) throws IOException {
-        inMessage = null;
+    private void sendAndReceiveMessages(EndpointInfo ei, boolean synchronous)
+            throws IOException, InterruptedException {
         // set up the conduit send to be true
         JMSConduit conduit = setupJMSConduitWithObserver(ei);
         final Message outMessage = createMessage();
@@ -120,9 +79,8 @@ public class RequestResponseTest extends AbstractJMSTester {
                 verifyReceivedMessage(m);
                 verifyHeaders(m, outMessage);
                 // setup the message for
-                Conduit backConduit;
                 try {
-                    backConduit = destination.getBackChannel(m);
+                    Conduit backConduit = destination.getBackChannel(m);
                     // wait for the message to be got from the conduit
                     Message replyMessage = new MessageImpl();
                     sendOneWayMessage(backConduit, replyMessage);
@@ -138,13 +96,11 @@ public class RequestResponseTest extends AbstractJMSTester {
             // wait for the message to be got from the destination,
             // create the thread to handler the Destination incoming message
 
-            waitForReceiveInMessage();
-            verifyReceivedMessage(inMessage);
+            verifyReceivedMessage(waitForReceiveInMessage());
         } finally {
             conduit.close();
             destination.shutdown();
         }
     }
 
-
-}
\ No newline at end of file
+}
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/uri/URIConfiguredConduitTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/uri/URIConfiguredConduitTest.java
index b97a265..d9f9e9c 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/uri/URIConfiguredConduitTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/uri/URIConfiguredConduitTest.java
@@ -36,10 +36,12 @@ import org.apache.cxf.transport.jms.JMSMessageHeadersType;
 import org.apache.cxf.transport.jms.util.TestReceiver;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
 /**
  * Checks if a CXF client works correlates requests and responses correctly if the server sets the message id
  * as correlation id on the response message
@@ -102,20 +104,16 @@ public class URIConfiguredConduitTest {
 
         waitForAsyncReply(exchange);
         receiver.close();
-        if (exchange.getInMessage() == null) {
-            throw new RuntimeException("No reply received within 2 seconds");
-        }
+        assertNotNull("No reply received within 2 seconds", exchange.getInMessage());
         JMSMessageHeadersType inHeaders = (JMSMessageHeadersType)exchange.getInMessage()
             .get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
-        Assert.assertEquals(receiver.getRequestMessageId(), inHeaders.getJMSCorrelationID());
+        assertEquals(receiver.getRequestMessageId(), inHeaders.getJMSCorrelationID());
         conduit.close();
     }
 
-    private void waitForAsyncReply(Exchange exchange) throws InterruptedException {
-        int count = 0;
-        while (exchange.getInMessage() == null && count <= 20) {
-            Thread.sleep(100);
-            count++;
+    private static void waitForAsyncReply(Exchange exchange) throws InterruptedException {
+        for (int count = 0; exchange.getInMessage() == null && count <= 20; count++) {
+            Thread.sleep(100L);
         }
     }
 
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
index ccdb450..937f0b5 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
@@ -40,7 +40,6 @@ import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
 import org.awaitility.Awaitility;
 
 import org.easymock.Capture;
-import org.junit.Assert;
 import org.junit.Test;
 
 
@@ -50,6 +49,7 @@ import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.newCapture;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
 
 public class MessageListenerTest {
 
@@ -77,7 +77,7 @@ public class MessageListenerTest {
         Awaitility.await().until(() -> !container.isRunning());
         verify(exListener);
         JMSException ex = captured.getValue();
-        Assert.assertEquals("The connection is already closed", ex.getMessage());
+        assertEquals("The connection is already closed", ex.getMessage());
     }
     
     @Test
@@ -106,7 +106,7 @@ public class MessageListenerTest {
         verify(exListener);
         JMSException ex = captured.getValue();
         // Closing the pooled connection will result in a NPE when using it
-        Assert.assertEquals("Wrapped exception. null", ex.getMessage());
+        assertEquals("Wrapped exception. null", ex.getMessage());
     }
 
     @Test
@@ -147,14 +147,14 @@ public class MessageListenerTest {
         container.setAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
         container.start();
 
-        assertNumMessagesInQueue("At the start the queue should be empty", connection, dest, 0, 0);
+        assertNumMessagesInQueue("At the start the queue should be empty", connection, dest, 0, 0L);
 
         sendMessage(connection, dest, OK);
-        assertNumMessagesInQueue("This message should be committed", connection, dest, 0, 1000);
+        assertNumMessagesInQueue("This message should be committed", connection, dest, 0, 1000L);
 
         sendMessage(connection, dest, FAIL);
         assertNumMessagesInQueue("Even when an exception occurs the message should be committed", connection,
-                                 dest, 0, 1000);
+                                 dest, 0, 1000L);
 
         container.stop();
         connection.close();
@@ -178,17 +178,17 @@ public class MessageListenerTest {
     private void testTransactionalBehaviour(Connection connection, Queue dest) throws JMSException,
         InterruptedException {
         Queue dlq = JMSUtil.createQueue(connection, "ActiveMQ.DLQ");
-        assertNumMessagesInQueue("At the start the queue should be empty", connection, dest, 0, 0);
-        assertNumMessagesInQueue("At the start the DLQ should be empty", connection, dlq, 0, 0);
+        assertNumMessagesInQueue("At the start the queue should be empty", connection, dest, 0, 0L);
+        assertNumMessagesInQueue("At the start the DLQ should be empty", connection, dlq, 0, 0L);
 
         sendMessage(connection, dest, OK);
-        assertNumMessagesInQueue("This message should be committed", connection, dest, 0, 1000);
+        assertNumMessagesInQueue("This message should be committed", connection, dest, 0, 1000L);
 
         sendMessage(connection, dest, FAILFIRST);
-        assertNumMessagesInQueue("Should succeed on second try", connection, dest, 0, 2000);
+        assertNumMessagesInQueue("Should succeed on second try", connection, dest, 0, 2000L);
 
         sendMessage(connection, dest, FAIL);
-        assertNumMessagesInQueue("Should be rolled back", connection, dlq, 1, 2500);
+        assertNumMessagesInQueue("Should be rolled back", connection, dlq, 1, 2500L);
     }
 
     private Connection createConnection(String name) throws JMSException {
@@ -227,25 +227,27 @@ public class MessageListenerTest {
         }
         consumer.close();
         session.close();
-        assertNumMessagesInQueue("", connection, dest, 0, 0);
+        assertNumMessagesInQueue("", connection, dest, 0, 0L);
     }
 
-    private void assertNumMessagesInQueue(String message, Connection connection, Queue queue,
-                                          int expectedNum, int timeout) throws JMSException,
+    private static void assertNumMessagesInQueue(String message, Connection connection, Queue queue,
+                                          int expectedNum, long timeout) throws JMSException,
         InterruptedException {
         long startTime = System.currentTimeMillis();
         int actualNum;
         do {
             actualNum = JMSUtil.getNumMessages(connection, queue);
-
+            if (actualNum == expectedNum) {
+                break;
+            }
             //System.out.println("Messages in queue " + queue.getQueueName() + ": " + actualNum
             //                   + ", expecting: " + expectedNum);
-            Thread.sleep(100);
+            Thread.sleep(100L);
         } while ((System.currentTimeMillis() - startTime < timeout) && expectedNum != actualNum);
-        Assert.assertEquals(message + " -> number of messages on queue", expectedNum, actualNum);
+        assertEquals(message + " -> number of messages on queue", expectedNum, actualNum);
     }
 
-    private void sendMessage(Connection connection, Destination dest, String content) throws JMSException,
+    private static void sendMessage(Connection connection, Destination dest, String content) throws JMSException,
         InterruptedException {
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageProducer prod = session.createProducer(dest);
@@ -253,7 +255,7 @@ public class MessageListenerTest {
         prod.send(message);
         prod.close();
         session.close();
-        Thread.sleep(500); // Give receiver some time to process
+//        Thread.sleep(500L); // Give receiver some time to process
     }
 
     private static final class TestMessageListener implements MessageListener {