You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/02/26 22:20:36 UTC

[1/7] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5617

Repository: activemq
Updated Branches:
  refs/heads/master 79568f16c -> eaf773da5


https://issues.apache.org/jira/browse/AMQ-5617

Remove this older test as the test in JMSClientTest ->
testProduceConsumer covers this accross all four transport variants.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7a8085c0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7a8085c0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7a8085c0

Branch: refs/heads/master
Commit: 7a8085c0a49c0cac865ea4a6a91777c49790664d
Parents: 79568f1
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Feb 26 14:35:51 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Feb 26 14:35:51 2015 -0500

----------------------------------------------------------------------
 .../transport/amqp/bugs/AMQ4753Test.java        | 73 --------------------
 1 file changed, 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/7a8085c0/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4753Test.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4753Test.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4753Test.java
deleted file mode 100644
index 4258039..0000000
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4753Test.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.bugs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.transport.amqp.AmqpTestSupport;
-import org.apache.activemq.transport.amqp.JmsClientContext;
-import org.junit.Test;
-
-public class AMQ4753Test extends AmqpTestSupport {
-
-    @Override
-    protected boolean isUseTcpConnector() {
-        return false;
-    }
-
-    @Override
-    protected boolean isUseNioPlusSslConnector() {
-        return true;
-    }
-
-    @Test(timeout = 120 * 1000)
-    public void testAmqpNioPlusSslSendReceive() throws JMSException {
-        Connection connection = JmsClientContext.INSTANCE.createConnection(amqpNioPlusSslURI);
-        runSimpleSendReceiveTest(connection);
-    }
-
-    public void runSimpleSendReceiveTest(Connection connection) throws JMSException{
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue =session.createQueue("txqueue");
-        MessageProducer producer = session.createProducer(queue);
-        TextMessage message = session.createTextMessage();
-        String messageText = "hello  sent at " + new java.util.Date().toString();
-        message.setText(messageText);
-        producer.send(message);
-
-        // Get the message we just sent
-        MessageConsumer consumer = session.createConsumer(queue);
-        connection.start();
-        Message receivedMessage = consumer.receive(5000);
-        assertNotNull(receivedMessage);
-        assertTrue(receivedMessage instanceof TextMessage);
-        TextMessage textMessage = (TextMessage) receivedMessage;
-        assertEquals(messageText, textMessage.getText());
-        connection.close();
-    }
-}


[7/7] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5617

Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5617

Fix test failure and give it a more meaningful name.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/eaf773da
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/eaf773da
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/eaf773da

Branch: refs/heads/master
Commit: eaf773da578a2827f9fe87be05676148ac31e90e
Parents: 276ef15
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Feb 26 16:06:33 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Feb 26 16:06:33 2015 -0500

----------------------------------------------------------------------
 .../java/org/apache/activemq/transport/amqp/JMSClientTest.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/eaf773da/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index 0cdb895..3801168 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -886,7 +886,7 @@ public class JMSClientTest extends JMSClientTestSupport {
     }
 
     @Test(timeout=30*1000)
-    public void simpleDurableTopicTest() throws Exception {
+    public void testDurableTopicStateAfterSubscriberClosed() throws Exception {
         String durableClientId = getDestinationName() + "-ClientId";
         String durableSubscriberName = getDestinationName() + "-SubscriptionName";
 
@@ -896,7 +896,7 @@ public class JMSClientTest extends JMSClientTestSupport {
         LOG.debug(">>>> At Start, durable Subscribers {} inactiveDurableSubscribers {}", durableSubscribersAtStart, inactiveSubscribersAtStart);
 
         TopicConnection subscriberConnection =
-            JMSClientContext.INSTANCE.createTopicConnection(amqpURI, "admin", "password");
+            JMSClientContext.INSTANCE.createTopicConnection(getBrokerURI(), "admin", "password");
         subscriberConnection.setClientID(durableClientId);
         TopicSession subscriberSession = subscriberConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
         Topic topic = subscriberSession.createTopic(getDestinationName());


[3/7] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5617

Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5617

Absorbe these into the general JMSClientTest that covers all four
transport types.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4fa420bb
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4fa420bb
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4fa420bb

Branch: refs/heads/master
Commit: 4fa420bbd198167281cacf96313fa5a04da055ae
Parents: 12202c9
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Feb 26 14:48:57 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Feb 26 14:48:57 2015 -0500

----------------------------------------------------------------------
 .../activemq/transport/amqp/AMQ4696Test.java    |  68 ---------
 .../activemq/transport/amqp/JMSClientTest.java  |  38 +++++
 .../transport/amqp/bugs/AMQ4914Test.java        | 109 --------------
 .../transport/amqp/bugs/AMQ5256Test.java        | 149 -------------------
 4 files changed, 38 insertions(+), 326 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/4fa420bb/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4696Test.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4696Test.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4696Test.java
deleted file mode 100644
index 677374e..0000000
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4696Test.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-
-import org.apache.activemq.broker.jmx.BrokerView;
-import org.junit.Test;
-
-public class AMQ4696Test extends AmqpTestSupport {
-
-    @Test(timeout=30*1000)
-    public void simpleDurableTopicTest() throws Exception {
-        String TOPIC_NAME = "AMQ4696Test" + System.currentTimeMillis();
-        String durableClientId = "AMQPDurableTopicTestClient";
-        String durableSubscriberName = "durableSubscriberName";
-
-        BrokerView adminView = this.brokerService.getAdminView();
-        int durableSubscribersAtStart = adminView.getDurableTopicSubscribers().length;
-        int inactiveSubscribersAtStart = adminView.getInactiveDurableTopicSubscribers().length;
-        LOG.debug(">>>> At Start, durable Subscribers {} inactiveDurableSubscribers {}", durableSubscribersAtStart, inactiveSubscribersAtStart);
-
-        TopicConnection subscriberConnection =
-            JmsClientContext.INSTANCE.createTopicConnection(amqpURI, "admin", "password");
-        subscriberConnection.setClientID(durableClientId);
-        TopicSession subscriberSession = subscriberConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
-        Topic topic = subscriberSession.createTopic(TOPIC_NAME);
-        TopicSubscriber messageConsumer = subscriberSession.createDurableSubscriber(topic, durableSubscriberName);
-
-        assertNotNull(messageConsumer);
-
-        int durableSubscribers = adminView.getDurableTopicSubscribers().length;
-        int inactiveSubscribers = adminView.getInactiveDurableTopicSubscribers().length;
-        LOG.debug(">>>> durable Subscribers after creation {} inactiveDurableSubscribers {}", durableSubscribers, inactiveSubscribers);
-        assertEquals("Wrong number of durable subscribers after first subscription", 1, (durableSubscribers - durableSubscribersAtStart));
-        assertEquals("Wrong number of inactive durable subscribers after first subscription", 0, (inactiveSubscribers - inactiveSubscribersAtStart));
-
-        subscriberConnection.close();
-        subscriberConnection = null;
-
-        durableSubscribers = adminView.getDurableTopicSubscribers().length;
-        inactiveSubscribers = adminView.getInactiveDurableTopicSubscribers().length;
-        LOG.debug(">>>> durable Subscribers after close {} inactiveDurableSubscribers {}", durableSubscribers, inactiveSubscribers);
-        assertEquals("Wrong number of durable subscribers after close", 0, (durableSubscribersAtStart));
-        assertEquals("Wrong number of inactive durable subscribers after close", 1, (inactiveSubscribers - inactiveSubscribersAtStart));
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/4fa420bb/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index 34f20ba..250053a 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -45,7 +45,11 @@ import javax.jms.QueueBrowser;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
 
+import org.apache.activemq.broker.jmx.BrokerView;
 import org.apache.activemq.broker.jmx.BrokerViewMBean;
 import org.apache.activemq.broker.jmx.ConnectorViewMBean;
 import org.apache.activemq.broker.jmx.QueueViewMBean;
@@ -881,6 +885,40 @@ public class JMSClientTest extends JMSClientTestSupport {
         assertEquals(messageText, textMessage.getText());
     }
 
+    @Test(timeout=30*1000)
+    public void simpleDurableTopicTest() throws Exception {
+        String durableClientId = getDestinationName() + "-ClientId";
+        String durableSubscriberName = getDestinationName() + "-SubscriptionName";
+
+        BrokerView adminView = this.brokerService.getAdminView();
+        int durableSubscribersAtStart = adminView.getDurableTopicSubscribers().length;
+        int inactiveSubscribersAtStart = adminView.getInactiveDurableTopicSubscribers().length;
+        LOG.debug(">>>> At Start, durable Subscribers {} inactiveDurableSubscribers {}", durableSubscribersAtStart, inactiveSubscribersAtStart);
+
+        TopicConnection subscriberConnection =
+            JmsClientContext.INSTANCE.createTopicConnection(amqpURI, "admin", "password");
+        subscriberConnection.setClientID(durableClientId);
+        TopicSession subscriberSession = subscriberConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+        Topic topic = subscriberSession.createTopic(getDestinationName());
+        TopicSubscriber messageConsumer = subscriberSession.createDurableSubscriber(topic, durableSubscriberName);
+
+        assertNotNull(messageConsumer);
+
+        int durableSubscribers = adminView.getDurableTopicSubscribers().length;
+        int inactiveSubscribers = adminView.getInactiveDurableTopicSubscribers().length;
+        LOG.debug(">>>> durable Subscribers after creation {} inactiveDurableSubscribers {}", durableSubscribers, inactiveSubscribers);
+        assertEquals("Wrong number of durable subscribers after first subscription", 1, (durableSubscribers - durableSubscribersAtStart));
+        assertEquals("Wrong number of inactive durable subscribers after first subscription", 0, (inactiveSubscribers - inactiveSubscribersAtStart));
+
+        subscriberConnection.close();
+
+        durableSubscribers = adminView.getDurableTopicSubscribers().length;
+        inactiveSubscribers = adminView.getInactiveDurableTopicSubscribers().length;
+        LOG.debug(">>>> durable Subscribers after close {} inactiveDurableSubscribers {}", durableSubscribers, inactiveSubscribers);
+        assertEquals("Wrong number of durable subscribers after close", 0, (durableSubscribersAtStart));
+        assertEquals("Wrong number of inactive durable subscribers after close", 1, (inactiveSubscribers - inactiveSubscribersAtStart));
+    }
+
     @Test(timeout=30000)
     public void testDurableConsumerUnsubscribe() throws Exception {
         ActiveMQAdmin.enableJMSFrameTracing();

http://git-wip-us.apache.org/repos/asf/activemq/blob/4fa420bb/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4914Test.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4914Test.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4914Test.java
deleted file mode 100644
index d171ea9..0000000
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4914Test.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.bugs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.transport.amqp.AmqpTestSupport;
-import org.apache.activemq.transport.amqp.JmsClientContext;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4914Test extends AmqpTestSupport {
-
-    @Rule
-    public TestName testName = new TestName();
-
-    protected static final Logger LOG = LoggerFactory.getLogger(AMQ4914Test.class);
-
-    private String createLargeString(int sizeInBytes) {
-        byte[] base = {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
-        StringBuilder builder = new StringBuilder();
-        for (int i = 0; i < sizeInBytes; i++) {
-            builder.append(base[i % base.length]);
-        }
-
-        LOG.debug("Created string with size : " + builder.toString().getBytes().length + " bytes");
-        return builder.toString();
-    }
-
-    @Test(timeout = 60 * 1000)
-    public void testSendSmallerMessages() throws JMSException {
-        for (int i = 512; i <= (8 * 1024); i += 512) {
-            doTestSendLargeMessage(i);
-        }
-    }
-
-    @Test(timeout = 60 * 1000)
-    public void testSendFixedSizedMessages() throws JMSException {
-        doTestSendLargeMessage(65536);
-        doTestSendLargeMessage(65536 * 2);
-        doTestSendLargeMessage(65536 * 4);
-    }
-
-    @Test(timeout = 60 * 1000)
-    public void testSendHugeMessage() throws JMSException {
-        doTestSendLargeMessage(1024 * 1024 * 10);
-    }
-
-    public void doTestSendLargeMessage(int expectedSize) throws JMSException{
-        LOG.info("doTestSendLargeMessage called with expectedSize " + expectedSize);
-        String payload = createLargeString(expectedSize);
-        assertEquals(expectedSize, payload.getBytes().length);
-
-        Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI);
-        long startTime = System.currentTimeMillis();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(testName.getMethodName());
-        MessageProducer producer = session.createProducer(queue);
-        TextMessage message = session.createTextMessage();
-        message.setText(payload);
-        producer.send(message);
-        long endTime = System.currentTimeMillis();
-        LOG.info("Returned from send after {} ms", endTime - startTime);
-
-        startTime = System.currentTimeMillis();
-        MessageConsumer consumer = session.createConsumer(queue);
-        connection.start();
-        LOG.info("Calling receive");
-        Message receivedMessage = consumer.receive();
-        assertNotNull(receivedMessage);
-        assertTrue(receivedMessage instanceof TextMessage);
-        TextMessage receivedTextMessage = (TextMessage) receivedMessage;
-        assertNotNull(receivedMessage);
-        endTime = System.currentTimeMillis();
-        LOG.info("Returned from receive after {} ms", endTime - startTime);
-        String receivedText = receivedTextMessage.getText();
-        assertEquals(expectedSize, receivedText.getBytes().length);
-        assertEquals(payload, receivedText);
-        connection.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/4fa420bb/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ5256Test.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ5256Test.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ5256Test.java
deleted file mode 100644
index 989e501..0000000
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ5256Test.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.bugs;
-
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-
-import org.apache.activemq.transport.amqp.AmqpTestSupport;
-import org.apache.activemq.transport.amqp.JmsClientContext;
-import org.junit.Test;
-
-public class AMQ5256Test extends AmqpTestSupport {
-
-    @Override
-    protected boolean isUseTcpConnector() {
-        return true;
-    }
-
-    @Override
-    protected boolean isUseSslConnector() {
-        return true;
-    }
-
-    @Override
-    protected boolean isUseNioConnector() {
-        return true;
-    }
-
-    @Override
-    protected boolean isUseNioPlusSslConnector() {
-        return true;
-    }
-
-    @Test(timeout = 60000)
-    public void testParallelConnectPlain() throws Exception {
-        final int numThreads = 40;
-        ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
-        for (int i = 0; i < numThreads; i++) {
-            executorService.execute(new Runnable() {
-                @Override
-                public void run() {
-
-                    try {
-                        Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI, "admin", "password");
-                        connection.start();
-                        connection.close();
-                    } catch (JMSException e) {
-                        e.printStackTrace();
-                    }
-                }
-            });
-        }
-
-        executorService.shutdown();
-        assertTrue("executor done on time", executorService.awaitTermination(30, TimeUnit.SECONDS));
-    }
-
-    @Test(timeout = 60000)
-    public void testParallelConnectNio() throws Exception {
-        final int numThreads = 40;
-        ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
-        for (int i = 0; i < numThreads; i++) {
-            executorService.execute(new Runnable() {
-                @Override
-                public void run() {
-
-                    try {
-                        Connection connection = JmsClientContext.INSTANCE.createConnection(amqpNioURI, "admin", "password");
-                        connection.start();
-                        connection.close();
-                    } catch (JMSException e) {
-                        e.printStackTrace();
-                    }
-                }
-            });
-        }
-
-        executorService.shutdown();
-        assertTrue("executor done on time", executorService.awaitTermination(30, TimeUnit.SECONDS));
-    }
-
-    @Test(timeout = 60000)
-    public void testParallelConnectSsl() throws Exception {
-        final int numThreads = 40;
-        ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
-        for (int i = 0; i < numThreads; i++) {
-            executorService.execute(new Runnable() {
-                @Override
-                public void run() {
-
-                    try {
-                        Connection connection = JmsClientContext.INSTANCE.createConnection(amqpSslURI, "admin", "password");
-                        connection.start();
-                        connection.close();
-                    } catch (JMSException e) {
-                        e.printStackTrace();
-                    }
-                }
-            });
-        }
-
-        executorService.shutdown();
-        assertTrue("executor done on time", executorService.awaitTermination(30, TimeUnit.SECONDS));
-    }
-
-    @Test(timeout = 60000)
-    public void testParallelConnectNioPlusSsl() throws Exception {
-        final int numThreads = 40;
-        ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
-        for (int i = 0; i < numThreads; i++) {
-            executorService.execute(new Runnable() {
-                @Override
-                public void run() {
-
-                    try {
-                        Connection connection = JmsClientContext.INSTANCE.createConnection(amqpNioPlusSslURI, "admin", "password");
-                        connection.start();
-                        connection.close();
-                    } catch (JMSException e) {
-                        e.printStackTrace();
-                    }
-                }
-            });
-        }
-
-        executorService.shutdown();
-        assertTrue("executor done on time", executorService.awaitTermination(30, TimeUnit.SECONDS));
-    }
-}


[6/7] activemq git commit: remove a warning.

Posted by ta...@apache.org.
remove a warning.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/276ef150
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/276ef150
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/276ef150

Branch: refs/heads/master
Commit: 276ef15024d43038a5db23ed9044309a691104f2
Parents: 240278d
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Feb 26 15:23:26 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Feb 26 15:23:26 2015 -0500

----------------------------------------------------------------------
 .../transport/amqp/message/JMSMappingOutboundTransformerTest.java   | 1 -
 1 file changed, 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/276ef150/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
index ac92d8f..63d7842 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
@@ -17,7 +17,6 @@
 package org.apache.activemq.transport.amqp.message;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;


[2/7] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5617

Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5617

Rename these and move to the general tests folder as they cover specific
use cases that are applicable beyond the issue the were created for.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/12202c97
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/12202c97
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/12202c97

Branch: refs/heads/master
Commit: 12202c9702452b36a6cea66fc5516c94ebdd60b9
Parents: 7a8085c
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Feb 26 14:47:12 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Feb 26 14:47:12 2015 -0500

----------------------------------------------------------------------
 .../amqp/JMSLargeMessageSendRecvTest.java       | 107 ++++++++++++++
 .../transport/amqp/JMSParallelConnectTest.java  | 147 +++++++++++++++++++
 2 files changed, 254 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/12202c97/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java
new file mode 100644
index 0000000..0abdad2
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JMSLargeMessageSendRecvTest extends AmqpTestSupport {
+
+    @Rule
+    public TestName testName = new TestName();
+
+    protected static final Logger LOG = LoggerFactory.getLogger(JMSLargeMessageSendRecvTest.class);
+
+    private String createLargeString(int sizeInBytes) {
+        byte[] base = {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
+        StringBuilder builder = new StringBuilder();
+        for (int i = 0; i < sizeInBytes; i++) {
+            builder.append(base[i % base.length]);
+        }
+
+        LOG.debug("Created string with size : " + builder.toString().getBytes().length + " bytes");
+        return builder.toString();
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testSendSmallerMessages() throws JMSException {
+        for (int i = 512; i <= (8 * 1024); i += 512) {
+            doTestSendLargeMessage(i);
+        }
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testSendFixedSizedMessages() throws JMSException {
+        doTestSendLargeMessage(65536);
+        doTestSendLargeMessage(65536 * 2);
+        doTestSendLargeMessage(65536 * 4);
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testSendHugeMessage() throws JMSException {
+        doTestSendLargeMessage(1024 * 1024 * 10);
+    }
+
+    public void doTestSendLargeMessage(int expectedSize) throws JMSException{
+        LOG.info("doTestSendLargeMessage called with expectedSize " + expectedSize);
+        String payload = createLargeString(expectedSize);
+        assertEquals(expectedSize, payload.getBytes().length);
+
+        Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI);
+        long startTime = System.currentTimeMillis();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(testName.getMethodName());
+        MessageProducer producer = session.createProducer(queue);
+        TextMessage message = session.createTextMessage();
+        message.setText(payload);
+        producer.send(message);
+        long endTime = System.currentTimeMillis();
+        LOG.info("Returned from send after {} ms", endTime - startTime);
+
+        startTime = System.currentTimeMillis();
+        MessageConsumer consumer = session.createConsumer(queue);
+        connection.start();
+        LOG.info("Calling receive");
+        Message receivedMessage = consumer.receive();
+        assertNotNull(receivedMessage);
+        assertTrue(receivedMessage instanceof TextMessage);
+        TextMessage receivedTextMessage = (TextMessage) receivedMessage;
+        assertNotNull(receivedMessage);
+        endTime = System.currentTimeMillis();
+        LOG.info("Returned from receive after {} ms", endTime - startTime);
+        String receivedText = receivedTextMessage.getText();
+        assertEquals(expectedSize, receivedText.getBytes().length);
+        assertEquals(payload, receivedText);
+        connection.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/12202c97/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSParallelConnectTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSParallelConnectTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSParallelConnectTest.java
new file mode 100644
index 0000000..d9debb5
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSParallelConnectTest.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+
+import org.junit.Test;
+
+public class JMSParallelConnectTest extends AmqpTestSupport {
+
+    @Override
+    protected boolean isUseTcpConnector() {
+        return true;
+    }
+
+    @Override
+    protected boolean isUseSslConnector() {
+        return true;
+    }
+
+    @Override
+    protected boolean isUseNioConnector() {
+        return true;
+    }
+
+    @Override
+    protected boolean isUseNioPlusSslConnector() {
+        return true;
+    }
+
+    @Test(timeout = 60000)
+    public void testParallelConnectPlain() throws Exception {
+        final int numThreads = 40;
+        ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+        for (int i = 0; i < numThreads; i++) {
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+
+                    try {
+                        Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI, "admin", "password");
+                        connection.start();
+                        connection.close();
+                    } catch (JMSException e) {
+                        e.printStackTrace();
+                    }
+                }
+            });
+        }
+
+        executorService.shutdown();
+        assertTrue("executor done on time", executorService.awaitTermination(30, TimeUnit.SECONDS));
+    }
+
+    @Test(timeout = 60000)
+    public void testParallelConnectNio() throws Exception {
+        final int numThreads = 40;
+        ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+        for (int i = 0; i < numThreads; i++) {
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+
+                    try {
+                        Connection connection = JmsClientContext.INSTANCE.createConnection(amqpNioURI, "admin", "password");
+                        connection.start();
+                        connection.close();
+                    } catch (JMSException e) {
+                        e.printStackTrace();
+                    }
+                }
+            });
+        }
+
+        executorService.shutdown();
+        assertTrue("executor done on time", executorService.awaitTermination(30, TimeUnit.SECONDS));
+    }
+
+    @Test(timeout = 60000)
+    public void testParallelConnectSsl() throws Exception {
+        final int numThreads = 40;
+        ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+        for (int i = 0; i < numThreads; i++) {
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+
+                    try {
+                        Connection connection = JmsClientContext.INSTANCE.createConnection(amqpSslURI, "admin", "password");
+                        connection.start();
+                        connection.close();
+                    } catch (JMSException e) {
+                        e.printStackTrace();
+                    }
+                }
+            });
+        }
+
+        executorService.shutdown();
+        assertTrue("executor done on time", executorService.awaitTermination(30, TimeUnit.SECONDS));
+    }
+
+    @Test(timeout = 60000)
+    public void testParallelConnectNioPlusSsl() throws Exception {
+        final int numThreads = 40;
+        ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+        for (int i = 0; i < numThreads; i++) {
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+
+                    try {
+                        Connection connection = JmsClientContext.INSTANCE.createConnection(amqpNioPlusSslURI, "admin", "password");
+                        connection.start();
+                        connection.close();
+                    } catch (JMSException e) {
+                        e.printStackTrace();
+                    }
+                }
+            });
+        }
+
+        executorService.shutdown();
+        assertTrue("executor done on time", executorService.awaitTermination(30, TimeUnit.SECONDS));
+    }
+}


[5/7] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5617

Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5617

Consolidate remaining dirct JMS client type usages to the context.
Rename some tests so be consistent.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/240278db
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/240278db
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/240278db

Branch: refs/heads/master
Commit: 240278dbef63816cce91f34e1cd53cd16618fd46
Parents: 8f0bf60
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Feb 26 15:22:45 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Feb 26 15:22:45 2015 -0500

----------------------------------------------------------------------
 .../activemq/transport/amqp/AMQ4563Test.java    |   6 +-
 .../transport/amqp/AmqpTransformerTest.java     |   6 +-
 .../transport/amqp/JMSClientContext.java        | 164 ++++++++++++++
 .../amqp/JMSClientRequestResponseTest.java      | 212 +++++++++++++++++++
 .../transport/amqp/JMSClientSimpleAuthTest.java | 165 +++++++++++++++
 .../activemq/transport/amqp/JMSClientTest.java  |   2 +-
 .../amqp/JMSConcurrentConsumersTest.java        |   4 +-
 .../amqp/JMSLargeMessageSendRecvTest.java       |   2 +-
 .../transport/amqp/JMSMessageGroupsTest.java    | 102 +++++++++
 .../transport/amqp/JMSParallelConnectTest.java  |   8 +-
 .../transport/amqp/JmsClientContext.java        | 136 ------------
 .../amqp/JmsClientRequestResponseTest.java      | 212 -------------------
 .../transport/amqp/JmsMessageGroupsTest.java    | 102 ---------
 .../transport/amqp/SimpleAMQPAuthTest.java      | 190 -----------------
 14 files changed, 657 insertions(+), 654 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/240278db/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java
index 63f3216..88e0a44 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java
@@ -76,7 +76,7 @@ public class AMQ4563Test extends AmqpTestSupport {
         ActiveMQAdmin.enableJMSFrameTracing();
         assertTrue(brokerService.isPersistent());
 
-        Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI);
+        Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI);
         connection.start();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Queue queue = session.createQueue(name.getMethodName());
@@ -122,7 +122,7 @@ public class AMQ4563Test extends AmqpTestSupport {
         ActiveMQAdmin.enableJMSFrameTracing();
         assertTrue(brokerService.isPersistent());
 
-        Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI);
+        Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI);
         connection.start();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Queue queue = session.createQueue(name.getMethodName());
@@ -152,7 +152,7 @@ public class AMQ4563Test extends AmqpTestSupport {
     }
 
     private int readAllMessages(String queueName, String selector) throws JMSException {
-        Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI);
+        Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI);
         connection.start();
 
         try {

http://git-wip-us.apache.org/repos/asf/activemq/blob/240278db/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
index 65d38a4..0c2c6f7 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
@@ -57,7 +57,7 @@ public class AmqpTransformerTest {
         startBrokerWithAmqpTransport(String.format(AMQP_URL, "?transport.transformer=native"));
 
         // send "text message" with AMQP JMS API
-        Connection amqpConnection = JmsClientContext.INSTANCE.createConnection(amqpConnectionURI);
+        Connection amqpConnection = JMSClientContext.INSTANCE.createConnection(amqpConnectionURI);
         amqpConnection.start();
 
         Session amqpSession = amqpConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -104,7 +104,7 @@ public class AmqpTransformerTest {
         startBrokerWithAmqpTransport(String.format(AMQP_URL, "?transport.transformer=raw"));
 
         // send "text message" with AMQP JMS API
-        Connection amqpConnection = JmsClientContext.INSTANCE.createConnection(amqpConnectionURI);
+        Connection amqpConnection = JMSClientContext.INSTANCE.createConnection(amqpConnectionURI);
         amqpConnection.start();
 
         Session amqpSession = amqpConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -156,7 +156,7 @@ public class AmqpTransformerTest {
         startBrokerWithAmqpTransport(String.format(AMQP_URL, "?transport.transformer=jms"));
 
         // send "text message" with AMQP JMS API
-        Connection amqpConnection = JmsClientContext.INSTANCE.createConnection(amqpConnectionURI);
+        Connection amqpConnection = JMSClientContext.INSTANCE.createConnection(amqpConnectionURI);
         amqpConnection.start();
 
         Session amqpSession = amqpConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);

http://git-wip-us.apache.org/repos/asf/activemq/blob/240278db/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientContext.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientContext.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientContext.java
new file mode 100644
index 0000000..81b0399
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientContext.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import java.net.URI;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.QueueConnection;
+import javax.jms.TopicConnection;
+
+import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Context used for AMQP JMS Clients to create connection instances.
+ */
+public class JMSClientContext {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JMSClientContext.class);
+
+    public static final JMSClientContext INSTANCE = new JMSClientContext();
+
+    //----- Plain JMS Connection Create methods ------------------------------//
+
+    public Connection createConnection(URI remoteURI) throws JMSException {
+        return createConnection(remoteURI, null, null, true);
+    }
+
+    public Connection createConnection(URI remoteURI, String username, String password) throws JMSException {
+        return createConnection(remoteURI, username, password, null, true);
+    }
+
+    public Connection createConnection(URI remoteURI, String username, String password, boolean syncPublish) throws JMSException {
+        return createConnection(remoteURI, username, password, null, syncPublish);
+    }
+
+    public Connection createConnection(URI remoteURI, String username, String password, String clientId) throws JMSException {
+        return createConnection(remoteURI, username, password, clientId, true);
+    }
+
+    public Connection createConnection(URI remoteURI, String username, String password, String clientId, boolean syncPublish) throws JMSException {
+        ConnectionFactoryImpl factory = createConnectionFactory(remoteURI, username, password, clientId, syncPublish);
+
+        Connection connection = factory.createConnection();
+        connection.setExceptionListener(new ExceptionListener() {
+            @Override
+            public void onException(JMSException exception) {
+                LOG.error("Unexpected exception ", exception);
+                exception.printStackTrace();
+            }
+        });
+
+        return connection;
+    }
+
+    //----- JMS TopicConnection Create methods -------------------------------//
+
+    public TopicConnection createTopicConnection(URI remoteURI) throws JMSException {
+        return createTopicConnection(remoteURI, null, null, true);
+    }
+
+    public TopicConnection createTopicConnection(URI remoteURI, String username, String password) throws JMSException {
+        return createTopicConnection(remoteURI, username, password, null, true);
+    }
+
+    public TopicConnection createTopicConnection(URI remoteURI, String username, String password, boolean syncPublish) throws JMSException {
+        return createTopicConnection(remoteURI, username, password, null, syncPublish);
+    }
+
+    public TopicConnection createTopicConnection(URI remoteURI, String username, String password, String clientId) throws JMSException {
+        return createTopicConnection(remoteURI, username, password, clientId, true);
+    }
+
+    public TopicConnection createTopicConnection(URI remoteURI, String username, String password, String clientId, boolean syncPublish) throws JMSException {
+        ConnectionFactoryImpl factory = createConnectionFactory(remoteURI, username, password, clientId, syncPublish);
+
+        TopicConnection connection = factory.createTopicConnection();
+        connection.setExceptionListener(new ExceptionListener() {
+            @Override
+            public void onException(JMSException exception) {
+                LOG.error("Unexpected exception ", exception);
+                exception.printStackTrace();
+            }
+        });
+
+        return connection;
+    }
+
+    //----- JMS QueueConnection Create methods -------------------------------//
+
+    public QueueConnection createQueueConnection(URI remoteURI) throws JMSException {
+        return createQueueConnection(remoteURI, null, null, true);
+    }
+
+    public QueueConnection createQueueConnection(URI remoteURI, String username, String password) throws JMSException {
+        return createQueueConnection(remoteURI, username, password, null, true);
+    }
+
+    public QueueConnection createQueueConnection(URI remoteURI, String username, String password, boolean syncPublish) throws JMSException {
+        return createQueueConnection(remoteURI, username, password, null, syncPublish);
+    }
+
+    public QueueConnection createQueueConnection(URI remoteURI, String username, String password, String clientId) throws JMSException {
+        return createQueueConnection(remoteURI, username, password, clientId, true);
+    }
+
+    public QueueConnection createQueueConnection(URI remoteURI, String username, String password, String clientId, boolean syncPublish) throws JMSException {
+        ConnectionFactoryImpl factory = createConnectionFactory(remoteURI, username, password, clientId, syncPublish);
+
+        QueueConnection connection = factory.createQueueConnection();
+        connection.setExceptionListener(new ExceptionListener() {
+            @Override
+            public void onException(JMSException exception) {
+                LOG.error("Unexpected exception ", exception);
+                exception.printStackTrace();
+            }
+        });
+
+        return connection;
+    }
+
+    //------ Internal Implementation bits ------------------------------------//
+
+    private ConnectionFactoryImpl createConnectionFactory(
+        URI remoteURI, String username, String password, String clientId, boolean syncPublish) {
+
+        boolean useSSL = remoteURI.getScheme().toLowerCase().contains("ssl");
+
+        LOG.debug("In createConnectionFactory using port {} ssl? {}", remoteURI.getPort(), useSSL);
+
+        ConnectionFactoryImpl factory =
+            new ConnectionFactoryImpl(remoteURI.getHost(), remoteURI.getPort(), username, password, clientId, useSSL);
+
+        if (useSSL) {
+            factory.setKeyStorePath(System.getProperty("javax.net.ssl.trustStore"));
+            factory.setKeyStorePassword("password");
+            factory.setTrustStorePath(System.getProperty("javax.net.ssl.keyStore"));
+            factory.setTrustStorePassword("password");
+        }
+
+        factory.setTopicPrefix("topic://");
+        factory.setQueuePrefix("queue://");
+        factory.setSyncPublish(syncPublish);
+
+        return factory;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/240278db/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientRequestResponseTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientRequestResponseTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientRequestResponseTest.java
new file mode 100644
index 0000000..7d8a44f
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientRequestResponseTest.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.List;
+import java.util.Vector;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JMSClientRequestResponseTest extends AmqpTestSupport implements MessageListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JMSClientRequestResponseTest.class);
+
+    @Rule public TestName name = new TestName();
+
+    private Connection requestorConnection;
+    private Destination requestDestination;
+    private Session requestorSession;
+
+    private Connection responderConnection;
+    private MessageProducer responseProducer;
+    private Session responderSession;
+    private Destination replyDestination;
+
+    private final List<JMSException> failures = new Vector<JMSException>();
+    private boolean dynamicallyCreateProducer;
+    private final boolean useAsyncConsumer = true;
+    private Thread syncThread;
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        if (requestorConnection != null) {
+            try {
+                requestorConnection.close();
+            } catch (Exception e) {}
+        }
+        if (responderConnection != null) {
+            try {
+                responderConnection.close();
+            } catch (Exception e) {}
+        }
+
+        if (syncThread != null) {
+            syncThread.join(5000);
+        }
+
+        super.tearDown();
+    }
+
+    private void doSetupConnections(boolean topic) throws Exception {
+        responderConnection = createConnection(name.getMethodName() + "-responder");
+        responderSession = responderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        if (topic) {
+            requestDestination = responderSession.createTopic(name.getMethodName());
+        } else {
+            requestDestination = responderSession.createQueue(name.getMethodName());
+        }
+        responseProducer = responderSession.createProducer(null);
+
+        final MessageConsumer requestConsumer = responderSession.createConsumer(requestDestination);
+        if (useAsyncConsumer) {
+            requestConsumer.setMessageListener(this);
+        } else {
+            syncThread = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    syncConsumeLoop(requestConsumer);
+                }
+            });
+            syncThread.start();
+        }
+        responderConnection.start();
+
+        requestorConnection = createConnection(name.getMethodName() + "-requestor");
+        requestorSession = requestorConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        if (topic) {
+            replyDestination = requestorSession.createTemporaryTopic();
+        } else {
+            replyDestination = requestorSession.createTemporaryQueue();
+        }
+        requestorConnection.start();
+    }
+
+    @Test(timeout=60000)
+    public void testRequestResponseToTempQueue() throws Exception {
+        doSetupConnections(false);
+        doTestRequestResponse();
+    }
+
+    @Test(timeout=60000)
+    public void testRequestResponseToTempTopic() throws Exception {
+        doSetupConnections(true);
+        doTestRequestResponse();
+    }
+
+    private void doTestRequestResponse() throws Exception {
+
+        MessageProducer requestProducer = requestorSession.createProducer(requestDestination);
+        MessageConsumer replyConsumer = requestorSession.createConsumer(replyDestination);
+
+        TextMessage requestMessage = requestorSession.createTextMessage("SomeRequest");
+        requestMessage.setJMSReplyTo(replyDestination);
+        requestProducer.send(requestMessage);
+
+        LOG.info("Sent request to destination: {}", requestDestination.toString());
+
+        Message msg = replyConsumer.receive(10000);
+
+        if (msg instanceof TextMessage) {
+            TextMessage replyMessage = (TextMessage)msg;
+            LOG.info("Received reply.");
+            LOG.info(replyMessage.toString());
+            assertTrue("Wrong message content", replyMessage.getText().startsWith("response"));
+        } else {
+            fail("Should have received a reply by now");
+        }
+        replyConsumer.close();
+
+        assertEquals("Should not have had any failures: " + failures, 0, failures.size());
+    }
+
+    private Connection createConnection(String clientId) throws JMSException {
+        return JMSClientContext.INSTANCE.createConnection(amqpURI, "admin", "password", clientId);
+    }
+
+    protected void syncConsumeLoop(MessageConsumer requestConsumer) {
+        try {
+            Message message = requestConsumer.receive(5000);
+            if (message != null) {
+                onMessage(message);
+            } else {
+                LOG.error("No message received");
+            }
+        } catch (JMSException e) {
+            onException(e);
+        }
+    }
+
+    @Override
+    public void onMessage(Message message) {
+        try {
+            TextMessage requestMessage = (TextMessage)message;
+
+            LOG.info("Received request.");
+            LOG.info(requestMessage.toString());
+
+            Destination replyDestination = requestMessage.getJMSReplyTo();
+            if (replyDestination instanceof Topic) {
+                LOG.info("Reply destination is: {}", ((Topic)replyDestination).getTopicName());
+            } else {
+                LOG.info("Reply destination is: {}", ((Queue)replyDestination).getQueueName());
+            }
+
+            TextMessage replyMessage = responderSession.createTextMessage("response for: " + requestMessage.getText());
+            replyMessage.setJMSCorrelationID(requestMessage.getJMSMessageID());
+
+            if (dynamicallyCreateProducer) {
+                responseProducer = responderSession.createProducer(replyDestination);
+                responseProducer.send(replyMessage);
+            } else {
+                responseProducer.send(replyDestination, replyMessage);
+            }
+
+            LOG.info("Sent reply.");
+            LOG.info(replyMessage.toString());
+        } catch (JMSException e) {
+            onException(e);
+        }
+    }
+
+    protected void onException(JMSException e) {
+        LOG.info("Caught: " + e);
+        e.printStackTrace();
+        failures.add(e);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/240278db/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSimpleAuthTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSimpleAuthTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSimpleAuthTest.java
new file mode 100644
index 0000000..064e132
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSimpleAuthTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.URI;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.qpid.amqp_1_0.client.ConnectionClosedException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JMSClientSimpleAuthTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JMSClientSimpleAuthTest.class);
+
+    private final String SIMPLE_AUTH_AMQP_BROKER_XML =
+        "org/apache/activemq/transport/amqp/simple-auth-amqp-broker.xml";
+    private BrokerService brokerService;
+    private URI amqpURI;
+
+    @Before
+    public void setUp() throws Exception {
+        startBroker();
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+            brokerService = null;
+        }
+    }
+
+    @Test(timeout = 10000)
+    public void testNoUserOrPassword() throws Exception {
+        try {
+            Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "", "");
+            connection.start();
+            Thread.sleep(500);
+            connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            fail("Expected JMSException");
+        } catch (JMSException e) {
+            Exception linkedException = e.getLinkedException();
+            if (linkedException != null && linkedException instanceof ConnectionClosedException) {
+                ConnectionClosedException cce = (ConnectionClosedException) linkedException;
+                assertEquals("Error{condition=unauthorized-access,description=User name [null] or password is invalid.}", cce.getRemoteError().toString());
+            } else {
+                LOG.error("Unexpected Exception", e);
+                fail("Unexpected exception: " + e.getMessage());
+            }
+        }
+    }
+
+    @Test(timeout = 10000)
+    public void testUnknownUser() throws Exception {
+        try {
+            Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "nosuchuser", "blah");
+            connection.start();
+            Thread.sleep(500);
+            connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            fail("Expected JMSException");
+        } catch (JMSException e)  {
+            Exception linkedException = e.getLinkedException();
+            if (linkedException != null && linkedException instanceof ConnectionClosedException) {
+                ConnectionClosedException cce = (ConnectionClosedException) linkedException;
+                assertEquals("Error{condition=unauthorized-access,description=User name [nosuchuser] or password is invalid.}", cce.getRemoteError().toString());
+            } else {
+                LOG.error("Unexpected Exception", e);
+                fail("Unexpected exception: " + e.getMessage());
+            }
+        }
+    }
+
+    @Test(timeout = 10000)
+    public void testKnownUserWrongPassword() throws Exception {
+        try {
+            Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "user", "wrongPassword");
+            connection.start();
+            Thread.sleep(500);
+            connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            fail("Expected JMSException");
+        } catch (JMSException e) {
+            Exception linkedException = e.getLinkedException();
+            if (linkedException != null && linkedException instanceof ConnectionClosedException) {
+                ConnectionClosedException cce = (ConnectionClosedException) linkedException;
+                assertEquals("Error{condition=unauthorized-access,description=User name [user] or password is invalid.}", cce.getRemoteError().toString());
+            } else {
+                LOG.error("Unexpected Exception", e);
+                fail("Unexpected exception: " + e.getMessage());
+            }
+        }
+    }
+
+    @Test(timeout = 30000)
+    public void testSendReceive() throws Exception {
+        Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "user", "userPassword");
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("txQueue");
+        MessageProducer p = session.createProducer(queue);
+        TextMessage message = null;
+        message = session.createTextMessage();
+        String messageText = "hello  sent at " + new java.util.Date().toString();
+        message.setText(messageText);
+        p.send(message);
+
+        // Get the message we just sent
+        MessageConsumer consumer = session.createConsumer(queue);
+        connection.start();
+        Message msg = consumer.receive(5000);
+        assertNotNull(msg);
+        assertTrue(msg instanceof TextMessage);
+        TextMessage textMessage = (TextMessage) msg;
+        assertEquals(messageText, textMessage.getText());
+        connection.close();
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        return createBroker(SIMPLE_AUTH_AMQP_BROKER_XML);
+    }
+
+    protected BrokerService createBroker(String uri) throws Exception {
+        LOG.debug(">>>>> Loading broker configuration from the classpath with URI: {}", uri);
+        return BrokerFactory.createBroker(new URI("xbean:" +  uri));
+    }
+
+    public void startBroker() throws Exception {
+        brokerService = createBroker();
+        brokerService.start();
+        amqpURI = brokerService.getTransportConnectorByName("amqp").getPublishableConnectURI();
+        brokerService.waitUntilStarted();
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/activemq/blob/240278db/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index 250053a..0cdb895 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -896,7 +896,7 @@ public class JMSClientTest extends JMSClientTestSupport {
         LOG.debug(">>>> At Start, durable Subscribers {} inactiveDurableSubscribers {}", durableSubscribersAtStart, inactiveSubscribersAtStart);
 
         TopicConnection subscriberConnection =
-            JmsClientContext.INSTANCE.createTopicConnection(amqpURI, "admin", "password");
+            JMSClientContext.INSTANCE.createTopicConnection(amqpURI, "admin", "password");
         subscriberConnection.setClientID(durableClientId);
         TopicSession subscriberSession = subscriberConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
         Topic topic = subscriberSession.createTopic(getDestinationName());

http://git-wip-us.apache.org/repos/asf/activemq/blob/240278db/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSConcurrentConsumersTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSConcurrentConsumersTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSConcurrentConsumersTest.java
index 2d7b34f..62c70cd 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSConcurrentConsumersTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSConcurrentConsumersTest.java
@@ -101,7 +101,7 @@ public class JMSConcurrentConsumersTest extends AmqpTestSupport {
 
     public void doTestSendWithMultipleConsumers(URI remoteURI) throws Exception {
 
-        Connection connection = JmsClientContext.INSTANCE.createConnection(remoteURI, "admin", "password", false);
+        Connection connection = JMSClientContext.INSTANCE.createConnection(remoteURI, "admin", "password", false);
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         String destinationName = "AMQ4920Test" + System.currentTimeMillis();
         Destination destination = session.createTopic(destinationName);
@@ -170,7 +170,7 @@ public class JMSConcurrentConsumersTest extends AmqpTestSupport {
             LOG.debug(consumerName + " starting");
             Connection connection = null;
             try {
-                connection = JmsClientContext.INSTANCE.createConnection(amqpURI, "admin", "admin", false);
+                connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "admin", "admin", false);
                 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                 Destination destination = session.createTopic(destinationName);
                 MessageConsumer consumer = session.createConsumer(destination);

http://git-wip-us.apache.org/repos/asf/activemq/blob/240278db/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java
index 0abdad2..ef6eaba 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java
@@ -77,7 +77,7 @@ public class JMSLargeMessageSendRecvTest extends AmqpTestSupport {
         String payload = createLargeString(expectedSize);
         assertEquals(expectedSize, payload.getBytes().length);
 
-        Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI);
+        Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI);
         long startTime = System.currentTimeMillis();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Queue queue = session.createQueue(testName.getMethodName());

http://git-wip-us.apache.org/repos/asf/activemq/blob/240278db/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSMessageGroupsTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSMessageGroupsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSMessageGroupsTest.java
new file mode 100644
index 0000000..8b1e786
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSMessageGroupsTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JMSMessageGroupsTest extends JMSClientTestSupport {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(JMSMessageGroupsTest.class);
+
+    private static final int ITERATIONS = 10;
+    private static final int MESSAGE_COUNT = 10;
+    private static final int MESSAGE_SIZE = 200 * 1024;
+    private static final int RECEIVE_TIMEOUT = 3000;
+    private static final String JMSX_GROUP_ID = "JmsGroupsTest";
+
+    @Test(timeout = 60000)
+    public void testGroupSeqIsNeverLost() throws Exception {
+        AtomicInteger sequenceCounter = new AtomicInteger();
+
+        for (int i = 0; i < ITERATIONS; ++i) {
+            connection = createConnection();
+            {
+                sendMessagesToBroker(MESSAGE_COUNT, sequenceCounter);
+                readMessagesOnBroker(MESSAGE_COUNT);
+            }
+            connection.close();
+        }
+    }
+
+    protected void readMessagesOnBroker(int count) throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName());
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        for (int i = 0; i < MESSAGE_COUNT; ++i) {
+            Message message = consumer.receive(RECEIVE_TIMEOUT);
+            assertNotNull(message);
+            LOG.debug("Read message #{}: type = {}", i, message.getClass().getSimpleName());
+            String gid = message.getStringProperty("JMSXGroupID");
+            String seq = message.getStringProperty("JMSXGroupSeq");
+            LOG.debug("Message assigned JMSXGroupID := {}", gid);
+            LOG.debug("Message assigned JMSXGroupSeq := {}", seq);
+        }
+
+        consumer.close();
+    }
+
+    protected void sendMessagesToBroker(int count, AtomicInteger sequence) throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName());
+        MessageProducer producer = session.createProducer(queue);
+
+        byte[] buffer = new byte[MESSAGE_SIZE];
+        for (count = 0; count < MESSAGE_SIZE; count++) {
+            String s = String.valueOf(count % 10);
+            Character c = s.charAt(0);
+            int value = c.charValue();
+            buffer[count] = (byte) value;
+        }
+
+        LOG.debug("Sending {} messages to destination: {}", MESSAGE_COUNT, queue);
+        for (int i = 1; i <= MESSAGE_COUNT; i++) {
+            BytesMessage message = session.createBytesMessage();
+            message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+            message.setStringProperty("JMSXGroupID", JMSX_GROUP_ID);
+            message.setIntProperty("JMSXGroupSeq", sequence.incrementAndGet());
+            message.writeBytes(buffer);
+            producer.send(message);
+        }
+
+        producer.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/240278db/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSParallelConnectTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSParallelConnectTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSParallelConnectTest.java
index d9debb5..f02d3a9 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSParallelConnectTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSParallelConnectTest.java
@@ -59,7 +59,7 @@ public class JMSParallelConnectTest extends AmqpTestSupport {
                 public void run() {
 
                     try {
-                        Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI, "admin", "password");
+                        Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "admin", "password");
                         connection.start();
                         connection.close();
                     } catch (JMSException e) {
@@ -83,7 +83,7 @@ public class JMSParallelConnectTest extends AmqpTestSupport {
                 public void run() {
 
                     try {
-                        Connection connection = JmsClientContext.INSTANCE.createConnection(amqpNioURI, "admin", "password");
+                        Connection connection = JMSClientContext.INSTANCE.createConnection(amqpNioURI, "admin", "password");
                         connection.start();
                         connection.close();
                     } catch (JMSException e) {
@@ -107,7 +107,7 @@ public class JMSParallelConnectTest extends AmqpTestSupport {
                 public void run() {
 
                     try {
-                        Connection connection = JmsClientContext.INSTANCE.createConnection(amqpSslURI, "admin", "password");
+                        Connection connection = JMSClientContext.INSTANCE.createConnection(amqpSslURI, "admin", "password");
                         connection.start();
                         connection.close();
                     } catch (JMSException e) {
@@ -131,7 +131,7 @@ public class JMSParallelConnectTest extends AmqpTestSupport {
                 public void run() {
 
                     try {
-                        Connection connection = JmsClientContext.INSTANCE.createConnection(amqpNioPlusSslURI, "admin", "password");
+                        Connection connection = JMSClientContext.INSTANCE.createConnection(amqpNioPlusSslURI, "admin", "password");
                         connection.start();
                         connection.close();
                     } catch (JMSException e) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/240278db/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientContext.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientContext.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientContext.java
deleted file mode 100644
index dd7b699..0000000
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientContext.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp;
-
-import java.net.URI;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.QueueConnection;
-import javax.jms.TopicConnection;
-
-import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Context used for AMQP JMS Clients to create connection instances.
- */
-public class JmsClientContext {
-
-    private static final Logger LOG = LoggerFactory.getLogger(JmsClientContext.class);
-
-    public static final JmsClientContext INSTANCE = new JmsClientContext();
-
-    //----- Plain JMS Connection Create methods ------------------------------//
-
-    public Connection createConnection(URI remoteURI) throws JMSException {
-        return createConnection(remoteURI, null, null, true);
-    }
-
-    public Connection createConnection(URI remoteURI, String username, String password) throws JMSException {
-        return createConnection(remoteURI, username, password, null, true);
-    }
-
-    public Connection createConnection(URI remoteURI, String username, String password, boolean syncPublish) throws JMSException {
-        return createConnection(remoteURI, username, password, null, syncPublish);
-    }
-
-    public Connection createConnection(URI remoteURI, String username, String password, String clientId) throws JMSException {
-        return createConnection(remoteURI, username, password, clientId, true);
-    }
-
-    public Connection createConnection(URI remoteURI, String username, String password, String clientId, boolean syncPublish) throws JMSException {
-        ConnectionFactoryImpl factory = createConnectionFactory(remoteURI, username, password, clientId, syncPublish);
-
-        return factory.createConnection();
-    }
-
-    //----- JMS TopicConnection Create methods -------------------------------//
-
-    public TopicConnection createTopicConnection(URI remoteURI) throws JMSException {
-        return createTopicConnection(remoteURI, null, null, true);
-    }
-
-    public TopicConnection createTopicConnection(URI remoteURI, String username, String password) throws JMSException {
-        return createTopicConnection(remoteURI, username, password, null, true);
-    }
-
-    public TopicConnection createTopicConnection(URI remoteURI, String username, String password, boolean syncPublish) throws JMSException {
-        return createTopicConnection(remoteURI, username, password, null, syncPublish);
-    }
-
-    public TopicConnection createTopicConnection(URI remoteURI, String username, String password, String clientId) throws JMSException {
-        return createTopicConnection(remoteURI, username, password, clientId, true);
-    }
-
-    public TopicConnection createTopicConnection(URI remoteURI, String username, String password, String clientId, boolean syncPublish) throws JMSException {
-        ConnectionFactoryImpl factory = createConnectionFactory(remoteURI, username, password, clientId, syncPublish);
-
-        return factory.createTopicConnection();
-    }
-
-    //----- JMS QueueConnection Create methods -------------------------------//
-
-    public QueueConnection createQueueConnection(URI remoteURI) throws JMSException {
-        return createQueueConnection(remoteURI, null, null, true);
-    }
-
-    public QueueConnection createQueueConnection(URI remoteURI, String username, String password) throws JMSException {
-        return createQueueConnection(remoteURI, username, password, null, true);
-    }
-
-    public QueueConnection createQueueConnection(URI remoteURI, String username, String password, boolean syncPublish) throws JMSException {
-        return createQueueConnection(remoteURI, username, password, null, syncPublish);
-    }
-
-    public QueueConnection createQueueConnection(URI remoteURI, String username, String password, String clientId) throws JMSException {
-        return createQueueConnection(remoteURI, username, password, clientId, true);
-    }
-
-    public QueueConnection createQueueConnection(URI remoteURI, String username, String password, String clientId, boolean syncPublish) throws JMSException {
-        ConnectionFactoryImpl factory = createConnectionFactory(remoteURI, username, password, clientId, syncPublish);
-
-        return factory.createQueueConnection();
-    }
-
-    //------ Internal Implementation bits ------------------------------------//
-
-    private ConnectionFactoryImpl createConnectionFactory(
-        URI remoteURI, String username, String password, String clientId, boolean syncPublish) {
-
-        boolean useSSL = remoteURI.getScheme().toLowerCase().contains("ssl");
-
-        LOG.debug("In createConnectionFactory using port {} ssl? {}", remoteURI.getPort(), useSSL);
-
-        ConnectionFactoryImpl factory =
-            new ConnectionFactoryImpl(remoteURI.getHost(), remoteURI.getPort(), username, password, clientId, useSSL);
-
-        if (useSSL) {
-            factory.setKeyStorePath(System.getProperty("javax.net.ssl.trustStore"));
-            factory.setKeyStorePassword("password");
-            factory.setTrustStorePath(System.getProperty("javax.net.ssl.keyStore"));
-            factory.setTrustStorePassword("password");
-        }
-
-        factory.setTopicPrefix("topic://");
-        factory.setQueuePrefix("queue://");
-        factory.setSyncPublish(syncPublish);
-
-        return factory;
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/240278db/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientRequestResponseTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientRequestResponseTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientRequestResponseTest.java
deleted file mode 100644
index 1446c10..0000000
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientRequestResponseTest.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.List;
-import java.util.Vector;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JmsClientRequestResponseTest extends AmqpTestSupport implements MessageListener {
-
-    private static final Logger LOG = LoggerFactory.getLogger(JmsClientRequestResponseTest.class);
-
-    @Rule public TestName name = new TestName();
-
-    private Connection requestorConnection;
-    private Destination requestDestination;
-    private Session requestorSession;
-
-    private Connection responderConnection;
-    private MessageProducer responseProducer;
-    private Session responderSession;
-    private Destination replyDestination;
-
-    private final List<JMSException> failures = new Vector<JMSException>();
-    private boolean dynamicallyCreateProducer;
-    private final boolean useAsyncConsumer = true;
-    private Thread syncThread;
-
-    @Override
-    @After
-    public void tearDown() throws Exception {
-        if (requestorConnection != null) {
-            try {
-                requestorConnection.close();
-            } catch (Exception e) {}
-        }
-        if (responderConnection != null) {
-            try {
-                responderConnection.close();
-            } catch (Exception e) {}
-        }
-
-        if (syncThread != null) {
-            syncThread.join(5000);
-        }
-
-        super.tearDown();
-    }
-
-    private void doSetupConnections(boolean topic) throws Exception {
-        responderConnection = createConnection(name.getMethodName() + "-responder");
-        responderSession = responderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        if (topic) {
-            requestDestination = responderSession.createTopic(name.getMethodName());
-        } else {
-            requestDestination = responderSession.createQueue(name.getMethodName());
-        }
-        responseProducer = responderSession.createProducer(null);
-
-        final MessageConsumer requestConsumer = responderSession.createConsumer(requestDestination);
-        if (useAsyncConsumer) {
-            requestConsumer.setMessageListener(this);
-        } else {
-            syncThread = new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    syncConsumeLoop(requestConsumer);
-                }
-            });
-            syncThread.start();
-        }
-        responderConnection.start();
-
-        requestorConnection = createConnection(name.getMethodName() + "-requestor");
-        requestorSession = requestorConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        if (topic) {
-            replyDestination = requestorSession.createTemporaryTopic();
-        } else {
-            replyDestination = requestorSession.createTemporaryQueue();
-        }
-        requestorConnection.start();
-    }
-
-    @Test(timeout=60000)
-    public void testRequestResponseToTempQueue() throws Exception {
-        doSetupConnections(false);
-        doTestRequestResponse();
-    }
-
-    @Test(timeout=60000)
-    public void testRequestResponseToTempTopic() throws Exception {
-        doSetupConnections(true);
-        doTestRequestResponse();
-    }
-
-    private void doTestRequestResponse() throws Exception {
-
-        MessageProducer requestProducer = requestorSession.createProducer(requestDestination);
-        MessageConsumer replyConsumer = requestorSession.createConsumer(replyDestination);
-
-        TextMessage requestMessage = requestorSession.createTextMessage("SomeRequest");
-        requestMessage.setJMSReplyTo(replyDestination);
-        requestProducer.send(requestMessage);
-
-        LOG.info("Sent request to destination: {}", requestDestination.toString());
-
-        Message msg = replyConsumer.receive(10000);
-
-        if (msg instanceof TextMessage) {
-            TextMessage replyMessage = (TextMessage)msg;
-            LOG.info("Received reply.");
-            LOG.info(replyMessage.toString());
-            assertTrue("Wrong message content", replyMessage.getText().startsWith("response"));
-        } else {
-            fail("Should have received a reply by now");
-        }
-        replyConsumer.close();
-
-        assertEquals("Should not have had any failures: " + failures, 0, failures.size());
-    }
-
-    private Connection createConnection(String clientId) throws JMSException {
-        return JmsClientContext.INSTANCE.createConnection(amqpURI, "admin", "password", clientId);
-    }
-
-    protected void syncConsumeLoop(MessageConsumer requestConsumer) {
-        try {
-            Message message = requestConsumer.receive(5000);
-            if (message != null) {
-                onMessage(message);
-            } else {
-                LOG.error("No message received");
-            }
-        } catch (JMSException e) {
-            onException(e);
-        }
-    }
-
-    @Override
-    public void onMessage(Message message) {
-        try {
-            TextMessage requestMessage = (TextMessage)message;
-
-            LOG.info("Received request.");
-            LOG.info(requestMessage.toString());
-
-            Destination replyDestination = requestMessage.getJMSReplyTo();
-            if (replyDestination instanceof Topic) {
-                LOG.info("Reply destination is: {}", ((Topic)replyDestination).getTopicName());
-            } else {
-                LOG.info("Reply destination is: {}", ((Queue)replyDestination).getQueueName());
-            }
-
-            TextMessage replyMessage = responderSession.createTextMessage("response for: " + requestMessage.getText());
-            replyMessage.setJMSCorrelationID(requestMessage.getJMSMessageID());
-
-            if (dynamicallyCreateProducer) {
-                responseProducer = responderSession.createProducer(replyDestination);
-                responseProducer.send(replyMessage);
-            } else {
-                responseProducer.send(replyDestination, replyMessage);
-            }
-
-            LOG.info("Sent reply.");
-            LOG.info(replyMessage.toString());
-        } catch (JMSException e) {
-            onException(e);
-        }
-    }
-
-    protected void onException(JMSException e) {
-        LOG.info("Caught: " + e);
-        e.printStackTrace();
-        failures.add(e);
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/240278db/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsMessageGroupsTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsMessageGroupsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsMessageGroupsTest.java
deleted file mode 100644
index b559ca7..0000000
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsMessageGroupsTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp;
-
-import static org.junit.Assert.assertNotNull;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JmsMessageGroupsTest extends JMSClientTestSupport {
-
-    protected static final Logger LOG = LoggerFactory.getLogger(JmsMessageGroupsTest.class);
-
-    private static final int ITERATIONS = 10;
-    private static final int MESSAGE_COUNT = 10;
-    private static final int MESSAGE_SIZE = 200 * 1024;
-    private static final int RECEIVE_TIMEOUT = 3000;
-    private static final String JMSX_GROUP_ID = "JmsGroupsTest";
-
-    @Test(timeout = 60000)
-    public void testGroupSeqIsNeverLost() throws Exception {
-        AtomicInteger sequenceCounter = new AtomicInteger();
-
-        for (int i = 0; i < ITERATIONS; ++i) {
-            connection = createConnection();
-            {
-                sendMessagesToBroker(MESSAGE_COUNT, sequenceCounter);
-                readMessagesOnBroker(MESSAGE_COUNT);
-            }
-            connection.close();
-        }
-    }
-
-    protected void readMessagesOnBroker(int count) throws Exception {
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(getDestinationName());
-        MessageConsumer consumer = session.createConsumer(queue);
-
-        for (int i = 0; i < MESSAGE_COUNT; ++i) {
-            Message message = consumer.receive(RECEIVE_TIMEOUT);
-            assertNotNull(message);
-            LOG.debug("Read message #{}: type = {}", i, message.getClass().getSimpleName());
-            String gid = message.getStringProperty("JMSXGroupID");
-            String seq = message.getStringProperty("JMSXGroupSeq");
-            LOG.debug("Message assigned JMSXGroupID := {}", gid);
-            LOG.debug("Message assigned JMSXGroupSeq := {}", seq);
-        }
-
-        consumer.close();
-    }
-
-    protected void sendMessagesToBroker(int count, AtomicInteger sequence) throws Exception {
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(getDestinationName());
-        MessageProducer producer = session.createProducer(queue);
-
-        byte[] buffer = new byte[MESSAGE_SIZE];
-        for (count = 0; count < MESSAGE_SIZE; count++) {
-            String s = String.valueOf(count % 10);
-            Character c = s.charAt(0);
-            int value = c.charValue();
-            buffer[count] = (byte) value;
-        }
-
-        LOG.debug("Sending {} messages to destination: {}", MESSAGE_COUNT, queue);
-        for (int i = 1; i <= MESSAGE_COUNT; i++) {
-            BytesMessage message = session.createBytesMessage();
-            message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
-            message.setStringProperty("JMSXGroupID", JMSX_GROUP_ID);
-            message.setIntProperty("JMSXGroupSeq", sequence.incrementAndGet());
-            message.writeBytes(buffer);
-            producer.send(message);
-        }
-
-        producer.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/240278db/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SimpleAMQPAuthTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SimpleAMQPAuthTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SimpleAMQPAuthTest.java
deleted file mode 100644
index c9b8c23..0000000
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SimpleAMQPAuthTest.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.net.URI;
-
-import javax.jms.Connection;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.broker.BrokerFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.qpid.amqp_1_0.client.ConnectionClosedException;
-import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SimpleAMQPAuthTest {
-
-    private static final Logger LOG = LoggerFactory.getLogger(SimpleAMQPAuthTest.class);
-
-    private final String SIMPLE_AUTH_AMQP_BROKER_XML =
-        "org/apache/activemq/transport/amqp/simple-auth-amqp-broker.xml";
-    private BrokerService brokerService;
-    private int port;
-
-    @Before
-    public void setUp() throws Exception {
-        startBroker();
-    }
-
-    @After
-    public void stopBroker() throws Exception {
-        if (brokerService != null) {
-            brokerService.stop();
-            brokerService = null;
-        }
-    }
-
-    @Test(timeout = 10000)
-    public void testNoUserOrPassword() throws Exception {
-        try {
-            ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "", "");
-            factory.setQueuePrefix("queue://");
-            factory.setTopicPrefix("topic://");
-
-            Connection connection = factory.createConnection();
-            connection.setExceptionListener(new ExceptionListener() {
-                @Override
-                public void onException(JMSException exception) {
-                    LOG.error("Unexpected exception ", exception);
-                    exception.printStackTrace();
-                }
-            });
-            connection.start();
-            Thread.sleep(500);
-            connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            fail("Expected JMSException");
-        } catch (JMSException e) {
-            Exception linkedException = e.getLinkedException();
-            if (linkedException != null && linkedException instanceof ConnectionClosedException) {
-                ConnectionClosedException cce = (ConnectionClosedException) linkedException;
-                assertEquals("Error{condition=unauthorized-access,description=User name [null] or password is invalid.}", cce.getRemoteError().toString());
-            } else {
-                LOG.error("Unexpected Exception", e);
-                fail("Unexpected exception: " + e.getMessage());
-            }
-        }
-    }
-
-    @Test(timeout = 10000)
-    public void testUnknownUser() throws Exception {
-        try {
-            ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password");
-            factory.setQueuePrefix("queue://");
-            factory.setTopicPrefix("topic://");
-
-            Connection connection = factory.createConnection("nosuchuser", "blah");
-            connection.start();
-            Thread.sleep(500);
-            connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            fail("Expected JMSException");
-        } catch (JMSException e)  {
-            Exception linkedException = e.getLinkedException();
-            if (linkedException != null && linkedException instanceof ConnectionClosedException) {
-                ConnectionClosedException cce = (ConnectionClosedException) linkedException;
-                assertEquals("Error{condition=unauthorized-access,description=User name [nosuchuser] or password is invalid.}", cce.getRemoteError().toString());
-            } else {
-                LOG.error("Unexpected Exception", e);
-                fail("Unexpected exception: " + e.getMessage());
-            }
-        }
-    }
-
-    @Test(timeout = 10000)
-    public void testKnownUserWrongPassword() throws Exception {
-        try {
-            ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password");
-            factory.setQueuePrefix("queue://");
-            factory.setTopicPrefix("topic://");
-
-            Connection connection = factory.createConnection("user", "wrongPassword");
-            connection.start();
-            Thread.sleep(500);
-            connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            fail("Expected JMSException");
-        } catch (JMSException e) {
-            Exception linkedException = e.getLinkedException();
-            if (linkedException != null && linkedException instanceof ConnectionClosedException) {
-                ConnectionClosedException cce = (ConnectionClosedException) linkedException;
-                assertEquals("Error{condition=unauthorized-access,description=User name [user] or password is invalid.}", cce.getRemoteError().toString());
-            } else {
-                LOG.error("Unexpected Exception", e);
-                fail("Unexpected exception: " + e.getMessage());
-            }
-        }
-    }
-
-    @Test(timeout = 30000)
-    public void testSendReceive() throws Exception {
-        ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password");
-        factory.setQueuePrefix("queue://");
-        factory.setTopicPrefix("topic://");
-
-        Connection connection = factory.createConnection("user", "userPassword");
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue("txQueue");
-        MessageProducer p = session.createProducer(queue);
-        TextMessage message = null;
-        message = session.createTextMessage();
-        String messageText = "hello  sent at " + new java.util.Date().toString();
-        message.setText(messageText);
-        p.send(message);
-
-        // Get the message we just sent
-        MessageConsumer consumer = session.createConsumer(queue);
-        connection.start();
-        Message msg = consumer.receive(5000);
-        assertNotNull(msg);
-        assertTrue(msg instanceof TextMessage);
-        TextMessage textMessage = (TextMessage) msg;
-        assertEquals(messageText, textMessage.getText());
-        connection.close();
-    }
-
-    protected BrokerService createBroker() throws Exception {
-        return createBroker(SIMPLE_AUTH_AMQP_BROKER_XML);
-    }
-
-    protected BrokerService createBroker(String uri) throws Exception {
-        LOG.debug(">>>>> Loading broker configuration from the classpath with URI: {}", uri);
-        return BrokerFactory.createBroker(new URI("xbean:" +  uri));
-    }
-
-    public void startBroker() throws Exception {
-        brokerService = createBroker();
-        brokerService.start();
-        port = brokerService.getTransportConnectorByName("amqp").getPublishableConnectURI().getPort();
-        brokerService.waitUntilStarted();
-    }
-}
-


[4/7] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5617

Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5617

Refactor as a general test case that covers the scenario accross all the
transport types.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8f0bf606
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8f0bf606
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8f0bf606

Branch: refs/heads/master
Commit: 8f0bf6060a8f93a5bda57e8f152139dbb9aad8d2
Parents: 4fa420b
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Feb 26 15:06:16 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Feb 26 15:06:16 2015 -0500

----------------------------------------------------------------------
 .../activemq/transport/amqp/AMQ4920Test.java    | 161 --------------
 .../amqp/JMSConcurrentConsumersTest.java        | 213 +++++++++++++++++++
 2 files changed, 213 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/8f0bf606/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java
deleted file mode 100644
index 94123af..0000000
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp;
-
-import static org.junit.Assert.assertEquals;
-
-import java.net.URI;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4920Test extends AmqpTestSupport {
-    private static final Logger LOG = LoggerFactory.getLogger(AMQ4920Test.class);
-    private static final Integer ITERATIONS = 500;
-    private static final Integer CONSUMER_COUNT = 4; // At least 2 consumers are
-                                                     // required to reproduce
-                                                     // the original issue
-    public static final String TEXT_MESSAGE = "TextMessage: ";
-    private final CountDownLatch latch = new CountDownLatch(CONSUMER_COUNT * ITERATIONS);
-    private final CountDownLatch initLatch = new CountDownLatch(CONSUMER_COUNT);
-
-    @Test(timeout = 60000)
-    public void testSendWithMultipleConsumers() throws Exception {
-        Connection connection =
-            JmsClientContext.INSTANCE.createConnection(amqpURI, "admin", "password", false);
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        String destinationName = "AMQ4920Test" + System.currentTimeMillis();
-        Destination destination = session.createTopic(destinationName);
-        connection.start();
-
-        ExecutorService executor = Executors.newCachedThreadPool();
-        for (int i = 0; i < CONSUMER_COUNT; i++) {
-            AMQ4930ConsumerTask consumerTask = new AMQ4930ConsumerTask(initLatch, destinationName, amqpURI, "Consumer-" + i, latch, ITERATIONS);
-            executor.submit(consumerTask);
-        }
-        connection.start();
-
-        // Make sure at least Topic consumers are subscribed before the first send.
-        initLatch.await();
-
-        LOG.debug("At start latch is " + latch.getCount());
-        sendMessages(connection, destination, ITERATIONS, 10);
-        LOG.debug("After send latch is " + latch.getCount());
-
-        latch.await(15, TimeUnit.SECONDS);
-        LOG.debug("After await latch is " + latch.getCount());
-        assertEquals(0, latch.getCount());
-
-        executor.shutdown();
-    }
-
-    public void sendMessages(Connection connection, Destination destination, int count, int sleepInterval) throws Exception {
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(destination);
-
-        for (int i = 0; i < count; i++) {
-            TextMessage message = session.createTextMessage();
-            message.setText(TEXT_MESSAGE + i);
-            LOG.trace("Sending message [" + i + "]");
-            producer.send(message);
-            if (sleepInterval > 0) {
-                Thread.sleep(sleepInterval);
-            }
-        }
-
-        session.close();
-    }
-}
-
-class AMQ4930ConsumerTask implements Callable<Boolean> {
-    protected static final Logger LOG = LoggerFactory.getLogger(AMQ4930ConsumerTask.class);
-    private final String destinationName;
-    private final String consumerName;
-    private final CountDownLatch messagesReceived;
-    private final URI amqpURI;
-    private final int expectedMessageCount;
-    private final CountDownLatch started;
-
-    public AMQ4930ConsumerTask(CountDownLatch started, String destinationName, URI amqpURI, String consumerName, CountDownLatch latch, int expectedMessageCount) {
-        this.started = started;
-        this.destinationName = destinationName;
-        this.amqpURI = amqpURI;
-        this.consumerName = consumerName;
-        this.messagesReceived = latch;
-        this.expectedMessageCount = expectedMessageCount;
-    }
-
-    @Override
-    public Boolean call() throws Exception {
-        LOG.debug(consumerName + " starting");
-        Connection connection = null;
-        try {
-            connection = JmsClientContext.INSTANCE.createConnection(amqpURI, "admin", "admin", false);
-            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            Destination destination = session.createTopic(destinationName);
-            MessageConsumer consumer = session.createConsumer(destination);
-            connection.start();
-
-            started.countDown();
-
-            int receivedCount = 0;
-            while (receivedCount < expectedMessageCount) {
-                Message message = consumer.receive(2000);
-                if (message == null) {
-                    LOG.error("consumer {} got null message on iteration {}", consumerName, receivedCount);
-                    return false;
-                }
-                if (!(message instanceof TextMessage)) {
-                    LOG.error("consumer {} expected text message on iteration {} but got {}", consumerName, receivedCount, message.getClass().getCanonicalName());
-                    return false;
-                }
-                TextMessage tm = (TextMessage) message;
-                if (!tm.getText().equals(AMQ4920Test.TEXT_MESSAGE + receivedCount)) {
-                    LOG.error("consumer {} expected {} got message [{}]", consumerName, receivedCount, tm.getText());
-                    return false;
-                }
-                LOG.trace("consumer {} expected {} got message [{}]", consumerName, receivedCount, tm.getText());
-                messagesReceived.countDown();
-                receivedCount++;
-            }
-        } catch (Exception e) {
-            LOG.error("UnexpectedException in " + consumerName, e);
-        } finally {
-            try {
-                connection.close();
-            } catch (JMSException ignoreMe) {
-            }
-        }
-
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/8f0bf606/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSConcurrentConsumersTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSConcurrentConsumersTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSConcurrentConsumersTest.java
new file mode 100644
index 0000000..2d7b34f
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSConcurrentConsumersTest.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import static org.junit.Assert.assertEquals;
+
+import java.net.URI;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JMSConcurrentConsumersTest extends AmqpTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(JMSConcurrentConsumersTest.class);
+    private static final Integer ITERATIONS = 400;
+    private static final Integer CONSUMER_COUNT = 4; // At least 2 consumers are
+                                                     // required to reproduce
+                                                     // the original issue
+    public static final String TEXT_MESSAGE = "TextMessage: ";
+
+    private CountDownLatch latch;
+    private CountDownLatch initLatch;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+
+        latch = new CountDownLatch(CONSUMER_COUNT * ITERATIONS);
+        initLatch = new CountDownLatch(CONSUMER_COUNT);
+    }
+
+    @Override
+    protected boolean isUseTcpConnector() {
+        return true;
+    }
+
+    @Override
+    protected boolean isUseSslConnector() {
+        return true;
+    }
+
+    @Override
+    protected boolean isUseNioConnector() {
+        return true;
+    }
+
+    @Override
+    protected boolean isUseNioPlusSslConnector() {
+        return true;
+    }
+
+    @Test(timeout = 60000)
+    public void testSendWithMultipleConsumersTCP() throws Exception {
+        doTestSendWithMultipleConsumers(amqpURI);
+    }
+
+    @Test(timeout = 60000)
+    public void testSendWithMultipleConsumersNIO() throws Exception {
+        doTestSendWithMultipleConsumers(amqpNioURI);
+    }
+
+    @Test(timeout = 60000)
+    public void testSendWithMultipleConsumersSSL() throws Exception {
+        doTestSendWithMultipleConsumers(amqpSslURI);
+    }
+
+    @Test(timeout = 60000)
+    public void testSendWithMultipleConsumersNIOPlusSSL() throws Exception {
+        doTestSendWithMultipleConsumers(amqpNioPlusSslURI);
+    }
+
+    public void doTestSendWithMultipleConsumers(URI remoteURI) throws Exception {
+
+        Connection connection = JmsClientContext.INSTANCE.createConnection(remoteURI, "admin", "password", false);
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        String destinationName = "AMQ4920Test" + System.currentTimeMillis();
+        Destination destination = session.createTopic(destinationName);
+        connection.start();
+
+        ExecutorService executor = Executors.newCachedThreadPool();
+        for (int i = 0; i < CONSUMER_COUNT; i++) {
+            ConsumerTask consumerTask = new ConsumerTask(initLatch, destinationName, remoteURI, "Consumer-" + i, latch, ITERATIONS);
+            executor.submit(consumerTask);
+        }
+        connection.start();
+
+        // Make sure at least Topic consumers are subscribed before the first send.
+        initLatch.await();
+
+        LOG.debug("At start latch is " + latch.getCount());
+        sendMessages(connection, destination, ITERATIONS, 10);
+        LOG.debug("After send latch is " + latch.getCount());
+
+        latch.await(15, TimeUnit.SECONDS);
+        LOG.debug("After await latch is " + latch.getCount());
+        assertEquals(0, latch.getCount());
+
+        executor.shutdown();
+    }
+
+    public void sendMessages(Connection connection, Destination destination, int count, int sleepInterval) throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+
+        for (int i = 0; i < count; i++) {
+            TextMessage message = session.createTextMessage();
+            message.setText(TEXT_MESSAGE + i);
+            LOG.trace("Sending message [" + i + "]");
+            producer.send(message);
+            if (sleepInterval > 0) {
+                Thread.sleep(sleepInterval);
+            }
+        }
+
+        session.close();
+    }
+
+    static class ConsumerTask implements Callable<Boolean> {
+
+        protected static final Logger LOG = LoggerFactory.getLogger(ConsumerTask.class);
+
+        private final String destinationName;
+        private final String consumerName;
+        private final CountDownLatch messagesReceived;
+        private final URI amqpURI;
+        private final int expectedMessageCount;
+        private final CountDownLatch started;
+
+        public ConsumerTask(CountDownLatch started, String destinationName, URI amqpURI, String consumerName, CountDownLatch latch, int expectedMessageCount) {
+            this.started = started;
+            this.destinationName = destinationName;
+            this.amqpURI = amqpURI;
+            this.consumerName = consumerName;
+            this.messagesReceived = latch;
+            this.expectedMessageCount = expectedMessageCount;
+        }
+
+        @Override
+        public Boolean call() throws Exception {
+            LOG.debug(consumerName + " starting");
+            Connection connection = null;
+            try {
+                connection = JmsClientContext.INSTANCE.createConnection(amqpURI, "admin", "admin", false);
+                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                Destination destination = session.createTopic(destinationName);
+                MessageConsumer consumer = session.createConsumer(destination);
+                connection.start();
+
+                started.countDown();
+
+                int receivedCount = 0;
+                while (receivedCount < expectedMessageCount) {
+                    Message message = consumer.receive(2000);
+                    if (message == null) {
+                        LOG.error("consumer {} got null message on iteration {}", consumerName, receivedCount);
+                        return false;
+                    }
+                    if (!(message instanceof TextMessage)) {
+                        LOG.error("consumer {} expected text message on iteration {} but got {}", consumerName, receivedCount, message.getClass().getCanonicalName());
+                        return false;
+                    }
+                    TextMessage tm = (TextMessage) message;
+                    if (!tm.getText().equals(TEXT_MESSAGE + receivedCount)) {
+                        LOG.error("consumer {} expected {} got message [{}]", consumerName, receivedCount, tm.getText());
+                        return false;
+                    }
+                    LOG.trace("consumer {} expected {} got message [{}]", consumerName, receivedCount, tm.getText());
+                    messagesReceived.countDown();
+                    receivedCount++;
+                }
+            } catch (Exception e) {
+                LOG.error("UnexpectedException in " + consumerName, e);
+            } finally {
+                try {
+                    connection.close();
+                } catch (JMSException ignoreMe) {
+                }
+            }
+
+            return true;
+        }
+    }
+}