You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/05/12 21:24:07 UTC

[2/3] git commit: Pull out JMS client common test bits into a test support class.

Pull out JMS client common test bits into a test support class.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/1dd34a13
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/1dd34a13
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/1dd34a13

Branch: refs/heads/trunk
Commit: 1dd34a13b258048bb54e27b1c01f14f66fa29ef0
Parents: 683fcda
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon May 12 15:16:03 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon May 12 15:16:03 2014 -0400

----------------------------------------------------------------------
 .../transport/amqp/AmqpTestSupport.java         |   4 +
 .../activemq/transport/amqp/JMSClientTest.java  | 197 ++++++-------------
 .../transport/amqp/JMSClientTestSupport.java    |  78 ++++++++
 3 files changed, 143 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/1dd34a13/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
index 2524300..6f1c31a 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
@@ -47,11 +47,15 @@ import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.spring.SpringSslContext;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class AmqpTestSupport {
 
+    @Rule public TestName name = new TestName();
+
     protected static final Logger LOG = LoggerFactory.getLogger(AmqpTestSupport.class);
     protected BrokerService brokerService;
     protected Vector<Throwable> exceptions = new Vector<Throwable>();

http://git-wip-us.apache.org/repos/asf/activemq/blob/1dd34a13/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index bd69444..be6e023 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -46,21 +46,18 @@ import org.apache.activemq.broker.jmx.ConnectorViewMBean;
 import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
 import org.apache.activemq.util.Wait;
-import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TestName;
 import org.objectweb.jtests.jms.framework.TestConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class JMSClientTest extends AmqpTestSupport {
+public class JMSClientTest extends JMSClientTestSupport {
+
     protected static final Logger LOG = LoggerFactory.getLogger(JMSClientTest.class);
-    @Rule public TestName name = new TestName();
-    java.util.logging.Logger frameLoggger = java.util.logging.Logger.getLogger("FRM");
 
+    protected java.util.logging.Logger frameLoggger = java.util.logging.Logger.getLogger("FRM");
 
     @Override
     @Before
@@ -82,10 +79,10 @@ public class JMSClientTest extends AmqpTestSupport {
     public void testProducerConsume() throws Exception {
         ActiveMQAdmin.enableJMSFrameTracing();
 
-        Connection connection = createConnection();
+        connection = createConnection();
         {
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            Queue queue = session.createQueue(name.toString());
+            Queue queue = session.createQueue(getDestinationName());
             MessageProducer p = session.createProducer(queue);
 
             TextMessage message = session.createTextMessage();
@@ -104,18 +101,17 @@ public class JMSClientTest extends AmqpTestSupport {
             assertNotNull(msg);
             assertTrue(msg instanceof TextMessage);
         }
-        connection.close();
     }
 
     @Test(timeout=30000)
     public void testAnonymousProducerConsume() throws Exception {
         ActiveMQAdmin.enableJMSFrameTracing();
 
-        Connection connection = createConnection();
+        connection = createConnection();
         {
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            Queue queue1 = session.createQueue(name.toString() + "1");
-            Queue queue2 = session.createQueue(name.toString() + "2");
+            Queue queue1 = session.createQueue(getDestinationName() + "1");
+            Queue queue2 = session.createQueue(getDestinationName() + "2");
             MessageProducer p = session.createProducer(null);
 
             TextMessage message = session.createTextMessage();
@@ -138,7 +134,6 @@ public class JMSClientTest extends AmqpTestSupport {
                 consumer.close();
             }
         }
-        connection.close();
     }
 
     @Test
@@ -146,12 +141,12 @@ public class JMSClientTest extends AmqpTestSupport {
         ActiveMQAdmin.enableJMSFrameTracing();
         final int msgCount = 1;
 
-        Connection connection = createConnection();
+        connection = createConnection();
         Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(name.toString());
+        Queue queue = session.createQueue(getDestinationName());
         sendMessages(connection, queue, msgCount);
 
-        QueueViewMBean queueView = getProxyToQueue(name.toString());
+        QueueViewMBean queueView = getProxyToQueue(getDestinationName());
         LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
         assertEquals(msgCount, queueView.getQueueSize());
 
@@ -168,8 +163,6 @@ public class JMSClientTest extends AmqpTestSupport {
 
         LOG.info("Queue size after session commit is: {}", queueView.getQueueSize());
         assertEquals(0, queueView.getQueueSize());
-
-        connection.close();
     }
 
     @Test(timeout=30000)
@@ -178,12 +171,12 @@ public class JMSClientTest extends AmqpTestSupport {
         ActiveMQAdmin.enableJMSFrameTracing();
         final int msgCount = 1;
 
-        Connection connection = createConnection();
+        connection = createConnection();
         Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(name.toString());
+        Queue queue = session.createQueue(getDestinationName());
         sendMessages(connection, queue, msgCount);
 
-        QueueViewMBean queueView = getProxyToQueue(name.toString());
+        QueueViewMBean queueView = getProxyToQueue(getDestinationName());
         LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
         assertEquals(msgCount, queueView.getQueueSize());
 
@@ -213,7 +206,6 @@ public class JMSClientTest extends AmqpTestSupport {
         assertEquals(0, queueView.getQueueSize());
 
         session.close();
-        connection.close();
     }
 
     @Test(timeout=60000)
@@ -222,12 +214,12 @@ public class JMSClientTest extends AmqpTestSupport {
         ActiveMQAdmin.enableJMSFrameTracing();
         final int msgCount = 500;
 
-        Connection connection = createConnection();
+        connection = createConnection();
         Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(name.toString());
+        Queue queue = session.createQueue(getDestinationName());
         sendMessages(connection, queue, msgCount);
 
-        QueueViewMBean queueView = getProxyToQueue(name.toString());
+        QueueViewMBean queueView = getProxyToQueue(getDestinationName());
         LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
         assertEquals(msgCount, queueView.getQueueSize());
 
@@ -249,8 +241,6 @@ public class JMSClientTest extends AmqpTestSupport {
             session.close();
         }
 
-        connection.close();
-
         LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
         assertEquals(0, queueView.getQueueSize());
     }
@@ -260,10 +250,10 @@ public class JMSClientTest extends AmqpTestSupport {
     public void testSelectors() throws Exception{
         ActiveMQAdmin.enableJMSFrameTracing();
 
-        Connection connection = createConnection();
+        connection = createConnection();
         {
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            Queue queue = session.createQueue(name.toString());
+            Queue queue = session.createQueue(getDestinationName());
             MessageProducer p = session.createProducer(queue);
 
             TextMessage message = session.createTextMessage();
@@ -291,15 +281,14 @@ public class JMSClientTest extends AmqpTestSupport {
             assertTrue(msg instanceof TextMessage);
             assertEquals("hello + 9", ((TextMessage) msg).getText());
         }
-        connection.close();
     }
 
     @Test(timeout=30000)
     public void testProducerThrowsWhenBrokerStops() throws Exception {
 
-        Connection connection = createConnection();
+        connection = createConnection();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(name.toString());
+        Queue queue = session.createQueue(getDestinationName());
         connection.start();
 
         MessageProducer producer = session.createProducer(queue);
@@ -332,10 +321,9 @@ public class JMSClientTest extends AmqpTestSupport {
 
     @Test(timeout=30000)
     public void testProducerCreateThrowsWhenBrokerStops() throws Exception {
-
-        Connection connection = createConnection();
+        connection = createConnection();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(name.toString());
+        Queue queue = session.createQueue(getDestinationName());
         connection.start();
 
         Thread stopper = new Thread(new Runnable() {
@@ -364,10 +352,9 @@ public class JMSClientTest extends AmqpTestSupport {
 
     @Test(timeout=30000)
     public void testConsumerCreateThrowsWhenBrokerStops() throws Exception {
-
-        Connection connection = createConnection();
+        connection = createConnection();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(name.toString());
+        Queue queue = session.createQueue(getDestinationName());
         connection.start();
 
         MessageProducer producer = session.createProducer(queue);
@@ -387,10 +374,9 @@ public class JMSClientTest extends AmqpTestSupport {
 
     @Test(timeout=90000)
     public void testConsumerReceiveNoWaitThrowsWhenBrokerStops() throws Exception {
-
-        Connection connection = createConnection();
+        connection = createConnection();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(name.toString());
+        Queue queue = session.createQueue(getDestinationName());
         connection.start();
 
         MessageConsumer consumer=session.createConsumer(queue);
@@ -419,10 +405,9 @@ public class JMSClientTest extends AmqpTestSupport {
 
     @Test(timeout=60000)
     public void testConsumerReceiveTimedThrowsWhenBrokerStops() throws Exception {
-
-        Connection connection = createConnection();
+        connection = createConnection();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(name.toString());
+        Queue queue = session.createQueue(getDestinationName());
         connection.start();
 
         MessageConsumer consumer=session.createConsumer(queue);
@@ -450,10 +435,9 @@ public class JMSClientTest extends AmqpTestSupport {
 
     @Test(timeout=30000)
     public void testConsumerReceiveReturnsBrokerStops() throws Exception {
-
-        Connection connection = createConnection();
+        connection = createConnection();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(name.toString());
+        Queue queue = session.createQueue(getDestinationName());
         connection.start();
 
         MessageConsumer consumer=session.createConsumer(queue);
@@ -479,10 +463,9 @@ public class JMSClientTest extends AmqpTestSupport {
 
     @Test(timeout=30000)
     public void testBrokerRestartWontHangConnectionClose() throws Exception {
-
-        Connection connection = createConnection();
+        connection = createConnection();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(name.toString());
+        Queue queue = session.createQueue(getDestinationName());
         connection.start();
 
         MessageProducer producer = session.createProducer(queue);
@@ -503,12 +486,10 @@ public class JMSClientTest extends AmqpTestSupport {
 
     @Test(timeout=120000)
     public void testProduceAndConsumeLargeNumbersOfMessages() throws JMSException {
-
         int count = 2000;
-
-        Connection connection = createConnection();
+        connection = createConnection();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(name.toString());
+        Queue queue = session.createQueue(getDestinationName());
         connection.start();
 
         MessageProducer producer= session.createProducer(queue);
@@ -532,22 +513,17 @@ public class JMSClientTest extends AmqpTestSupport {
     @Test(timeout=30000)
     public void testSyncSends() throws Exception {
         ActiveMQAdmin.enableJMSFrameTracing();
-        Connection connection = null;
-        try {
-            connection = createConnection(true);
-            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            Queue queue = session.createQueue(name.toString());
-            connection.start();
-            MessageProducer producer = session.createProducer(queue);
-            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-            Message toSend = session.createTextMessage("Sample text");
-            producer.send(toSend);
-            MessageConsumer consumer = session.createConsumer(queue);
-            Message received = consumer.receive(5000);
-            assertNotNull(received);
-        } finally {
-            connection.close();
-        }
+        connection = createConnection(true);
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName());
+        connection.start();
+        MessageProducer producer = session.createProducer(queue);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        Message toSend = session.createTextMessage("Sample text");
+        producer.send(toSend);
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message received = consumer.receive(5000);
+        assertNotNull(received);
     }
 
     @Test(timeout=30000)
@@ -556,10 +532,10 @@ public class JMSClientTest extends AmqpTestSupport {
         final CountDownLatch latch = new CountDownLatch(1);
         final AtomicReference<Message> received = new AtomicReference<Message>();
 
-        Connection connection = createConnection();
+        connection = createConnection();
         {
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            Topic topic = session.createTopic(name.toString());
+            Topic topic = session.createTopic(getDestinationName());
             MessageConsumer consumer = session.createDurableSubscriber(topic, "DurbaleTopic");
             consumer.setMessageListener(new MessageListener() {
 
@@ -582,17 +558,16 @@ public class JMSClientTest extends AmqpTestSupport {
             assertNotNull("Should have received a message by now.", received.get());
             assertTrue("Should be an instance of TextMessage", received.get() instanceof TextMessage);
         }
-        connection.close();
     }
 
     @Test(timeout=30000)
     public void testDurableConsumerSync() throws Exception {
         ActiveMQAdmin.enableJMSFrameTracing();
 
-        Connection connection = createConnection();
+        connection = createConnection();
         {
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            Topic topic = session.createTopic(name.toString());
+            Topic topic = session.createTopic(getDestinationName());
             final MessageConsumer consumer = session.createDurableSubscriber(topic, "DurbaleTopic");
             MessageProducer producer = session.createProducer(topic);
             producer.setDeliveryMode(DeliveryMode.PERSISTENT);
@@ -615,7 +590,6 @@ public class JMSClientTest extends AmqpTestSupport {
             assertNotNull("Should have received a message by now.", msg.get());
             assertTrue("Should be an instance of TextMessage", msg.get() instanceof TextMessage);
         }
-        connection.close();
     }
 
     @Test(timeout=30000)
@@ -624,10 +598,10 @@ public class JMSClientTest extends AmqpTestSupport {
         final CountDownLatch latch = new CountDownLatch(1);
         final AtomicReference<Message> received = new AtomicReference<Message>();
 
-        Connection connection = createConnection();
+        connection = createConnection();
         {
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            Topic topic = session.createTopic(name.toString());
+            Topic topic = session.createTopic(getDestinationName());
             MessageConsumer consumer = session.createConsumer(topic);
             consumer.setMessageListener(new MessageListener() {
 
@@ -657,10 +631,10 @@ public class JMSClientTest extends AmqpTestSupport {
     public void testTopicConsumerSync() throws Exception {
         ActiveMQAdmin.enableJMSFrameTracing();
 
-        Connection connection = createConnection();
+        connection = createConnection();
         {
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            Topic topic = session.createTopic(name.toString());
+            Topic topic = session.createTopic(getDestinationName());
             final MessageConsumer consumer = session.createConsumer(topic);
             MessageProducer producer = session.createProducer(topic);
             producer.setDeliveryMode(DeliveryMode.PERSISTENT);
@@ -683,7 +657,6 @@ public class JMSClientTest extends AmqpTestSupport {
             assertNotNull("Should have received a message by now.", msg.get());
             assertTrue("Should be an instance of TextMessage", msg.get() instanceof TextMessage);
         }
-        connection.close();
     }
 
     @Test(timeout=60000)
@@ -723,7 +696,7 @@ public class JMSClientTest extends AmqpTestSupport {
     public void testExecptionListenerCalledOnBrokerStop() throws Exception {
         ActiveMQAdmin.enableJMSFrameTracing();
 
-        Connection connection = createConnection();
+        connection = createConnection();
         connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         connection.start();
 
@@ -757,9 +730,9 @@ public class JMSClientTest extends AmqpTestSupport {
     public void testSessionTransactedCommit() throws JMSException, InterruptedException {
         ActiveMQAdmin.enableJMSFrameTracing();
 
-        Connection connection = createConnection();
+        connection = createConnection();
         Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(name.toString());
+        Queue queue = session.createQueue(getDestinationName());
 
         connection.start();
 
@@ -785,16 +758,15 @@ public class JMSClientTest extends AmqpTestSupport {
         }
 
         session.close();
-        connection.close();
     }
 
     @Test(timeout=30000)
     public void testSessionTransactedRollback() throws JMSException, InterruptedException {
         ActiveMQAdmin.enableJMSFrameTracing();
 
-        Connection connection = createConnection();
+        connection = createConnection();
         Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(name.toString());
+        Queue queue = session.createQueue(getDestinationName());
 
         connection.start();
 
@@ -813,7 +785,6 @@ public class JMSClientTest extends AmqpTestSupport {
         assertNull(m);
 
         session.close();
-        connection.close();
     }
 
     private String createLargeString(int sizeInBytes) {
@@ -829,9 +800,9 @@ public class JMSClientTest extends AmqpTestSupport {
 
     @Test(timeout = 60 * 1000)
     public void testSendLargeMessage() throws JMSException, InterruptedException {
-        Connection connection = createConnection();
+        connection = createConnection();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        String queueName = name.toString();
+        String queueName = getDestinationName();
         Queue queue = session.createQueue(queueName);
 
         MessageProducer producer=session.createProducer(queue);
@@ -842,7 +813,6 @@ public class JMSClientTest extends AmqpTestSupport {
         producer.send(m);
 
         MessageConsumer  consumer=session.createConsumer(queue);
-
         Message message = consumer.receive();
         assertNotNull(message);
         assertTrue(message instanceof TextMessage);
@@ -851,49 +821,4 @@ public class JMSClientTest extends AmqpTestSupport {
         assertEquals(messageSize, textMessage.getText().length());
         assertEquals(messageText, textMessage.getText());
     }
-
-    private Connection createConnection() throws JMSException {
-        return createConnection(name.toString(), false, false);
-    }
-
-    private Connection createConnection(boolean syncPublish) throws JMSException {
-        return createConnection(name.toString(), syncPublish, false);
-    }
-
-    private Connection createConnection(String clientId) throws JMSException {
-        return createConnection(clientId, false, false);
-    }
-
-    /**
-     * Can be overridden in subclasses to test against a different transport suchs as NIO.
-     *
-     * @return the port to connect to on the Broker.
-     */
-    protected int getBrokerPort() {
-        return port;
-    }
-
-    protected Connection createConnection(String clientId, boolean syncPublish, boolean useSsl) throws JMSException {
-
-        int brokerPort = getBrokerPort();
-        LOG.debug("Creating connection on port {}", brokerPort);
-        final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", brokerPort, "admin", "password", null, useSsl);
-
-        factory.setSyncPublish(syncPublish);
-        factory.setTopicPrefix("topic://");
-        factory.setQueuePrefix("queue://");
-
-        final Connection connection = factory.createConnection();
-        if (clientId != null && !clientId.isEmpty()) {
-            connection.setClientID(clientId);
-        }
-        connection.setExceptionListener(new ExceptionListener() {
-            @Override
-            public void onException(JMSException exception) {
-                exception.printStackTrace();
-            }
-        });
-        connection.start();
-        return connection;
-    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/1dd34a13/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java
new file mode 100644
index 0000000..3629b1e
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java
@@ -0,0 +1,78 @@
+package org.apache.activemq.transport.amqp;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+
+import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
+import org.junit.After;
+
+public class JMSClientTestSupport extends AmqpTestSupport {
+
+    protected Connection connection;
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (Exception e) {
+            }
+        }
+
+        super.tearDown();
+    }
+
+    /**
+     * @return the proper destination name to use for each test method invocation.
+     */
+    protected String getDestinationName() {
+        return name.getMethodName();
+    }
+
+    /**
+     * Can be overridden in subclasses to test against a different transport suchs as NIO.
+     *
+     * @return the port to connect to on the Broker.
+     */
+    protected int getBrokerPort() {
+        return port;
+    }
+
+    protected Connection createConnection() throws JMSException {
+        return createConnection(name.toString(), false, false);
+    }
+
+    protected Connection createConnection(boolean syncPublish) throws JMSException {
+        return createConnection(name.toString(), syncPublish, false);
+    }
+
+    protected Connection createConnection(String clientId) throws JMSException {
+        return createConnection(clientId, false, false);
+    }
+
+    protected Connection createConnection(String clientId, boolean syncPublish, boolean useSsl) throws JMSException {
+
+        int brokerPort = getBrokerPort();
+        LOG.debug("Creating connection on port {}", brokerPort);
+        final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", brokerPort, "admin", "password", null, useSsl);
+
+        factory.setSyncPublish(syncPublish);
+        factory.setTopicPrefix("topic://");
+        factory.setQueuePrefix("queue://");
+
+        final Connection connection = factory.createConnection();
+        if (clientId != null && !clientId.isEmpty()) {
+            connection.setClientID(clientId);
+        }
+        connection.setExceptionListener(new ExceptionListener() {
+            @Override
+            public void onException(JMSException exception) {
+                exception.printStackTrace();
+            }
+        });
+        connection.start();
+        return connection;
+    }
+}