You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/09/23 20:20:26 UTC

[02/27] Initial drop of donated AMQP Client Code.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/FailoverProviderTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/FailoverProviderTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/FailoverProviderTest.java
new file mode 100644
index 0000000..a458ab7
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/FailoverProviderTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.failover;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.qpid.jms.provider.DefaultProviderListener;
+import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.provider.failover.FailoverProvider;
+import org.apache.qpid.jms.provider.failover.FailoverProviderFactory;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+
+/**
+ * Test basic functionality of the FailoverProvider class.
+ */
+public class FailoverProviderTest extends AmqpTestSupport {
+
+    @Test(timeout=60000)
+    public void testFailoverCreate() throws Exception {
+        URI brokerURI = new URI("failover:" + getBrokerAmqpConnectionURI());
+        Provider asyncProvider = FailoverProviderFactory.createAsync(brokerURI);
+        assertNotNull(asyncProvider);
+        FailoverProvider provider = (FailoverProvider) asyncProvider;
+        assertNotNull(provider);
+    }
+
+    @Test(timeout=60000)
+    public void testFailoverURIConfiguration() throws Exception {
+        URI brokerURI = new URI("failover://(" + getBrokerAmqpConnectionURI() + ")" +
+                                "?maxReconnectDelay=1000&useExponentialBackOff=false" +
+                                "&maxReconnectAttempts=10&startupMaxReconnectAttempts=20");
+        Provider asyncProvider = FailoverProviderFactory.createAsync(brokerURI);
+        assertNotNull(asyncProvider);
+        FailoverProvider provider = (FailoverProvider) asyncProvider;
+        assertNotNull(provider);
+
+        assertEquals(1000, provider.getMaxReconnectDelay());
+        assertFalse(provider.isUseExponentialBackOff());
+        assertEquals(10, provider.getMaxReconnectAttempts());
+        assertEquals(20, provider.getStartupMaxReconnectAttempts());
+    }
+
+    @Test(timeout=60000)
+    public void testStartupReconnectAttempts() throws Exception {
+        URI brokerURI = new URI("failover://(amqp://localhost:61616)" +
+                                "?maxReconnectDelay=100&startupMaxReconnectAttempts=5");
+        Provider asyncProvider = FailoverProviderFactory.createAsync(brokerURI);
+        assertNotNull(asyncProvider);
+        FailoverProvider provider = (FailoverProvider) asyncProvider;
+        assertNotNull(provider);
+
+        assertEquals(100, provider.getMaxReconnectDelay());
+        assertEquals(5, provider.getStartupMaxReconnectAttempts());
+
+        final CountDownLatch failed = new CountDownLatch(1);
+
+        provider.setProviderListener(new DefaultProviderListener() {
+
+            @Override
+            public void onConnectionFailure(IOException ex) {
+                failed.countDown();
+            }
+        });
+
+        provider.connect();
+
+        assertTrue(failed.await(2, TimeUnit.SECONDS));
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java
new file mode 100644
index 0000000..d615f3d
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.failover;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.apache.qpid.jms.support.Wait;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Basic tests for the FailoverProvider implementation
+ */
+public class JmsFailoverTest extends AmqpTestSupport {
+
+    @Override
+    protected boolean isPersistent() {
+        return true;
+    }
+
+    @Test(timeout=60000)
+    public void testFailoverConnects() throws Exception {
+        URI brokerURI = new URI(getAmqpFailoverURI());
+        Connection connection = createAmqpConnection(brokerURI);
+        connection.start();
+        connection.close();
+    }
+
+    @Test(timeout=60000)
+    public void testFailoverConnectsWithMultipleURIs() throws Exception {
+        URI brokerURI = new URI("failover://(amqp://127.0.0.1:61616,amqp://localhost:5777," +
+                                getBrokerAmqpConnectionURI() + ")?maxReconnectDelay=500");
+        Connection connection = createAmqpConnection(brokerURI);
+        connection.start();
+        connection.close();
+    }
+
+    @Test(timeout=60000, expected=JMSException.class)
+    public void testStartupReconnectAttempts() throws Exception {
+        URI brokerURI = new URI("failover://(amqp://localhost:61616)" +
+                                "?maxReconnectDelay=1000&startupMaxReconnectAttempts=5");
+        JmsConnectionFactory factory = new JmsConnectionFactory(brokerURI);
+        Connection connection = factory.createConnection();
+        connection.start();
+    }
+
+    @Test(timeout=60000, expected=JMSException.class)
+    public void testStartupReconnectAttemptsMultipleHosts() throws Exception {
+        URI brokerURI = new URI("failover://(amqp://localhost:61616,amqp://localhost:61617)" +
+                                "?maxReconnectDelay=1000&startupMaxReconnectAttempts=5");
+        JmsConnectionFactory factory = new JmsConnectionFactory(brokerURI);
+        Connection connection = factory.createConnection();
+        connection.start();
+    }
+
+    @Test(timeout=60000)
+    public void testStartFailureWithAsyncExceptionListener() throws Exception {
+        URI brokerURI = new URI(getAmqpFailoverURI() + "?maxReconnectDelay=1000&maxReconnectAttempts=5");
+
+        final CountDownLatch failed = new CountDownLatch(1);
+        JmsConnectionFactory factory = new JmsConnectionFactory(brokerURI);
+        factory.setExceptionListener(new ExceptionListener() {
+
+            @Override
+            public void onException(JMSException exception) {
+                LOG.info("Connection got exception: {}", exception.getMessage());
+                failed.countDown();
+            }
+        });
+        Connection connection = factory.createConnection();
+        connection.start();
+
+        stopPrimaryBroker();
+
+        assertTrue("No async exception", failed.await(15, TimeUnit.SECONDS));
+    }
+
+    @SuppressWarnings("unused")
+    @Test(timeout=60000)
+    public void testBasicStateRestoration() throws Exception {
+        URI brokerURI = new URI(getAmqpFailoverURI() + "?maxReconnectDelay=1000");
+
+        Connection connection = createAmqpConnection(brokerURI);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageProducer producer = session.createProducer(queue);
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length);
+        assertEquals(1, brokerService.getAdminView().getQueueProducers().length);
+
+        restartPrimaryBroker();
+
+        assertTrue("Should have a new connection.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerService.getAdminView().getCurrentConnectionsCount() == 1;
+            }
+        }));
+
+        assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length);
+        assertEquals(1, brokerService.getAdminView().getQueueProducers().length);
+
+        connection.close();
+    }
+
+    @SuppressWarnings("unused")
+    @Test(timeout=60000)
+    public void testDurableSubscriberRestores() throws Exception {
+        URI brokerURI = new URI(getAmqpFailoverURI() + "?maxReconnectDelay=1000");
+
+        Connection connection = createAmqpConnection(brokerURI);
+        connection.setClientID(name.getMethodName());
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic(name.getMethodName());
+        MessageConsumer consumer = session.createDurableSubscriber(topic, name.getMethodName());
+
+        assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
+
+        restartPrimaryBroker();
+
+        assertTrue("Should have a new connection.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerService.getAdminView().getCurrentConnectionsCount() == 1;
+            }
+        }));
+
+        assertTrue("Should have no inactive subscribers.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 0;
+            }
+        }));
+
+        assertTrue("Should have one durable sub.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerService.getAdminView().getDurableTopicSubscribers().length == 1;
+            }
+        }));
+
+        connection.close();
+    }
+
+    @Test(timeout=90000)
+    public void testBadFirstURIConnectsAndProducerWorks() throws Exception {
+        URI brokerURI = new URI("failover://(amqp://localhost:61616," +
+                                             getBrokerAmqpConnectionURI() + ")?maxReconnectDelay=1000");
+
+        Connection connection = createAmqpConnection(brokerURI);
+        connection.start();
+
+        final int MSG_COUNT = 10;
+        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+        final MessageProducer producer = session.createProducer(queue);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        final CountDownLatch failed = new CountDownLatch(1);
+
+        assertEquals(1, brokerService.getAdminView().getQueueProducers().length);
+
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            producer.send(session.createTextMessage("Message: " + i));
+        }
+
+        final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+
+        assertTrue("Should have all messages sent.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return proxy.getQueueSize() == MSG_COUNT;
+            }
+        }));
+
+        assertFalse(failed.getCount() == 0);
+        connection.close();
+    }
+
+    // TODO - FIXME
+    @Ignore("Test currently not working")
+    @Test(timeout=90000)
+    public void testProducerBlocksAndRecovers() throws Exception {
+        URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() +")?maxReconnectDelay=1000");
+
+        Connection connection = createAmqpConnection(brokerURI);
+        connection.start();
+
+        final int MSG_COUNT = 10;
+        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+        final MessageProducer producer = session.createProducer(queue);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        final CountDownLatch failed = new CountDownLatch(1);
+
+        assertEquals(1, brokerService.getAdminView().getQueueProducers().length);
+
+        Thread producerThread = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    for (int i = 0; i < MSG_COUNT; ++i) {
+                        producer.send(session.createTextMessage("Message: " + i));
+                        TimeUnit.SECONDS.sleep(1);
+                    }
+                } catch (Exception e) {
+                }
+            }
+        });
+        producerThread.start();
+
+        TimeUnit.SECONDS.sleep(3);
+        stopPrimaryBroker();
+        TimeUnit.SECONDS.sleep(3);
+        restartPrimaryBroker();
+
+        assertTrue("Should have a new connection.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerService.getAdminView().getCurrentConnectionsCount() == 1;
+            }
+        }));
+
+        assertEquals(1, brokerService.getAdminView().getQueueProducers().length);
+
+        final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+
+        assertTrue("Should have all messages sent.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return proxy.getQueueSize() == MSG_COUNT;
+            }
+        }));
+
+        assertFalse(failed.getCount() == 0);
+        connection.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsOfflineBehaviorTests.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsOfflineBehaviorTests.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsOfflineBehaviorTests.java
new file mode 100644
index 0000000..bc971f6
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsOfflineBehaviorTests.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.failover;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.apache.qpid.jms.support.Wait;
+import org.junit.Test;
+
+/**
+ * Test various client behaviors when the connection has gone offline.
+ */
+public class JmsOfflineBehaviorTests extends AmqpTestSupport {
+
+    @Test(timeout=60000)
+    public void testConnectionCloseDoesNotBlock() throws Exception {
+        URI brokerURI = new URI(getAmqpFailoverURI());
+        Connection connection = createAmqpConnection(brokerURI);
+        connection.start();
+        stopPrimaryBroker();
+        connection.close();
+    }
+
+    @Test(timeout=60000)
+    public void testSessionCloseDoesNotBlock() throws Exception {
+        URI brokerURI = new URI(getAmqpFailoverURI());
+        Connection connection = createAmqpConnection(brokerURI);
+        connection.start();
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        stopPrimaryBroker();
+        session.close();
+        connection.close();
+    }
+
+    @Test(timeout=60000)
+    public void testClientAckDoesNotBlock() throws Exception {
+        URI brokerURI = new URI(getAmqpFailoverURI());
+        Connection connection = createAmqpConnection(brokerURI);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(queue);
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Test"));
+
+        Message message = consumer.receive(5000);
+        assertNotNull(message);
+        stopPrimaryBroker();
+        message.acknowledge();
+
+        connection.close();
+    }
+
+    @Test(timeout=60000)
+    public void testProducerCloseDoesNotBlock() throws Exception {
+        URI brokerURI = new URI(getAmqpFailoverURI());
+        Connection connection = createAmqpConnection(brokerURI);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageProducer producer = session.createProducer(queue);
+
+        stopPrimaryBroker();
+        producer.close();
+        connection.close();
+    }
+
+    @Test(timeout=60000)
+    public void testConsumerCloseDoesNotBlock() throws Exception {
+        URI brokerURI = new URI(getAmqpFailoverURI());
+        Connection connection = createAmqpConnection(brokerURI);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        stopPrimaryBroker();
+        consumer.close();
+        connection.close();
+    }
+
+    @SuppressWarnings("unused")
+    @Test(timeout=60000)
+    public void testSessionCloseWithOpenResourcesDoesNotBlock() throws Exception {
+        URI brokerURI = new URI(getAmqpFailoverURI());
+        Connection connection = createAmqpConnection(brokerURI);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(queue);
+        MessageProducer producer = session.createProducer(queue);
+
+        stopPrimaryBroker();
+        session.close();
+        connection.close();
+    }
+
+    @Test(timeout=60000)
+    public void testGetRemoteURI() throws Exception {
+
+        startNewBroker();
+
+        URI brokerURI = new URI(getAmqpFailoverURI() + "randomize=false");
+        Connection connection = createAmqpConnection(brokerURI);
+        connection.start();
+
+        JmsConnection jmsConnection = (JmsConnection) connection;
+        final Provider provider = jmsConnection.getProvider();
+
+        URI connectedURI = provider.getRemoteURI();
+        assertNotNull(connectedURI);
+
+        final List<URI> brokers = getBrokerURIs();
+        assertEquals(brokers.get(0), connectedURI);
+
+        stopPrimaryBroker();
+
+        assertTrue("Should connect to secondary URI.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return provider.getRemoteURI().equals(brokers.get(1));
+            }
+        }));
+
+        connection.close();
+    }
+
+    @SuppressWarnings("unused")
+    @Test(timeout=60000)
+    public void testClosedReourcesAreNotRestored() throws Exception {
+        URI brokerURI = new URI(getAmqpFailoverURI() + "?maxReconnectDelay=500");
+        Connection connection = createAmqpConnection(brokerURI);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(queue);
+        MessageProducer producer = session.createProducer(queue);
+
+        assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length);
+        assertEquals(1, brokerService.getAdminView().getQueueProducers().length);
+
+        stopPrimaryBroker();
+        session.close();
+        TimeUnit.SECONDS.sleep(2);
+        restartPrimaryBroker();
+
+        assertTrue("Should have a new connection.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerService.getAdminView().getCurrentConnectionsCount() == 1;
+            }
+        }));
+
+        assertEquals(0, brokerService.getAdminView().getQueueSubscribers().length);
+        assertEquals(0, brokerService.getAdminView().getQueueProducers().length);
+
+        connection.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/joram/ActiveMQAdmin.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/joram/ActiveMQAdmin.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/joram/ActiveMQAdmin.java
new file mode 100644
index 0000000..f1e7db1
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/joram/ActiveMQAdmin.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.joram;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Hashtable;
+
+import javax.jms.ConnectionFactory;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.JmsQueue;
+import org.apache.qpid.jms.JmsTopic;
+import org.objectweb.jtests.jms.admin.Admin;
+
+/**
+ *
+ */
+public class ActiveMQAdmin implements Admin {
+
+    Context context;
+    {
+        try {
+            // Use the jetty JNDI context since it's mutable.
+            final Hashtable<String, String> env = new Hashtable<String, String>();
+            env.put("java.naming.factory.initial", "org.eclipse.jetty.jndi.InitialContextFactory");
+            env.put("java.naming.factory.url.pkgs", "org.eclipse.jetty.jndi");
+            ;
+            context = new InitialContext(env);
+        } catch (NamingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        return BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false"));
+    }
+
+    @Override
+    public String getName() {
+        return getClass().getName();
+    }
+
+    static BrokerService broker;
+    static int port;
+
+    @Override
+    public void startServer() throws Exception {
+        if (broker != null) {
+            stopServer();
+        }
+        if (System.getProperty("basedir") == null) {
+            File file = new File(".");
+            System.setProperty("basedir", file.getAbsolutePath());
+        }
+        broker = createBroker();
+        TransportConnector connector = broker.addConnector(getConnectorURI());
+        broker.start();
+        port = connector.getConnectUri().getPort();
+    }
+
+    protected String getConnectorURI() {
+        return "amqp://localhost:0";
+    }
+
+    @Override
+    public void stopServer() throws Exception {
+        broker.stop();
+        broker = null;
+    }
+
+    @Override
+    public void start() throws Exception {
+    }
+
+    @Override
+    public void stop() throws Exception {
+    }
+
+    @Override
+    public Context createContext() throws NamingException {
+        return context;
+    }
+
+    @Override
+    public void createQueue(String name) {
+        try {
+            context.bind(name, new JmsQueue(name));
+        } catch (NamingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void createTopic(String name) {
+        try {
+            context.bind(name, new JmsTopic(name));
+        } catch (NamingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void deleteQueue(String name) {
+        try {
+            context.unbind(name);
+        } catch (NamingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void deleteTopic(String name) {
+        try {
+            context.unbind(name);
+        } catch (NamingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void createConnectionFactory(String name) {
+        try {
+            final ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + port);
+            context.bind(name, factory);
+        } catch (NamingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void deleteConnectionFactory(String name) {
+        try {
+            context.unbind(name);
+        } catch (NamingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void createQueueConnectionFactory(String name) {
+        createConnectionFactory(name);
+    }
+
+    @Override
+    public void createTopicConnectionFactory(String name) {
+        createConnectionFactory(name);
+    }
+
+    @Override
+    public void deleteQueueConnectionFactory(String name) {
+        deleteConnectionFactory(name);
+    }
+
+    @Override
+    public void deleteTopicConnectionFactory(String name) {
+        deleteConnectionFactory(name);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/joram/JoramJmsTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/joram/JoramJmsTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/joram/JoramJmsTest.java
new file mode 100644
index 0000000..3e59b69
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/joram/JoramJmsTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.joram;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import org.objectweb.jtests.jms.conform.connection.ConnectionTest;
+import org.objectweb.jtests.jms.conform.connection.TopicConnectionTest;
+import org.objectweb.jtests.jms.conform.message.MessageBodyTest;
+import org.objectweb.jtests.jms.conform.message.MessageDefaultTest;
+import org.objectweb.jtests.jms.conform.message.headers.MessageHeaderTest;
+import org.objectweb.jtests.jms.conform.message.properties.MessagePropertyConversionTest;
+import org.objectweb.jtests.jms.conform.message.properties.MessagePropertyTest;
+import org.objectweb.jtests.jms.conform.queue.TemporaryQueueTest;
+import org.objectweb.jtests.jms.conform.selector.SelectorSyntaxTest;
+import org.objectweb.jtests.jms.conform.selector.SelectorTest;
+import org.objectweb.jtests.jms.conform.session.QueueSessionTest;
+import org.objectweb.jtests.jms.conform.topic.TemporaryTopicTest;
+
+public class JoramJmsTest extends TestCase {
+
+    public static Test suite() {
+        TestSuite suite = new TestSuite();
+
+        // TODO: Fix these tests..
+        // Fails due to
+        // https://issues.apache.org/jira/browse/PROTON-154
+        // suite.addTestSuite(TopicSessionTest.class);
+
+//        suite.addTestSuite(MessageTypeTest.class);
+//        suite.addTestSuite(UnifiedSessionTest.class);
+//        suite.addTestSuite(JMSXPropertyTest.class);
+//        suite.addTestSuite(SessionTest.class);
+
+//        suite.addTestSuite(QueueBrowserTest.class);
+        suite.addTestSuite(QueueSessionTest.class);
+        suite.addTestSuite(SelectorSyntaxTest.class);
+        suite.addTestSuite(SelectorTest.class);
+        suite.addTestSuite(MessageHeaderTest.class);
+        suite.addTestSuite(TemporaryTopicTest.class);
+        suite.addTestSuite(TemporaryQueueTest.class);
+        suite.addTestSuite(TopicConnectionTest.class);
+        suite.addTestSuite(ConnectionTest.class);
+        suite.addTestSuite(MessageBodyTest.class);
+        suite.addTestSuite(MessageDefaultTest.class);
+        suite.addTestSuite(MessagePropertyConversionTest.class);
+        suite.addTestSuite(MessagePropertyTest.class);
+
+        return suite;
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsAnonymousProducerTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsAnonymousProducerTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsAnonymousProducerTest.java
new file mode 100644
index 0000000..04b954a
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsAnonymousProducerTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.producer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+
+/**
+ * Test JMS Anonymous Producer functionality.
+ */
+public class JmsAnonymousProducerTest extends AmqpTestSupport {
+
+    @Test(timeout = 60000)
+    public void testCreateProducer() throws Exception {
+        connection = createAmqpConnection();
+        assertNotNull(connection);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        session.createProducer(null);
+
+        assertTrue(brokerService.getAdminView().getTotalProducerCount() == 0);
+    }
+
+    @Test(timeout = 60000)
+    public void testAnonymousSend() throws Exception {
+        connection = createAmqpConnection();
+        assertNotNull(connection);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+        assertNotNull(session);
+        MessageProducer producer = session.createProducer(null);
+
+        Message message = session.createMessage();
+        producer.send(queue, message);
+
+        QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(1, proxy.getQueueSize());
+    }
+
+    @Test(timeout = 60000)
+    public void testAnonymousSendToMultipleDestinations() throws Exception {
+        connection = createAmqpConnection();
+        assertNotNull(connection);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue1 = session.createQueue(name.getMethodName() + 1);
+        Queue queue2 = session.createQueue(name.getMethodName() + 2);
+        Queue queue3 = session.createQueue(name.getMethodName() + 3);
+        assertNotNull(session);
+        MessageProducer producer = session.createProducer(null);
+
+        Message message = session.createMessage();
+        producer.send(queue1, message);
+        producer.send(queue2, message);
+        producer.send(queue3, message);
+
+        QueueViewMBean proxy = getProxyToQueue(name.getMethodName() + 1);
+        assertEquals(1, proxy.getQueueSize());
+        proxy = getProxyToQueue(name.getMethodName() + 2);
+        assertEquals(1, proxy.getQueueSize());
+        proxy = getProxyToQueue(name.getMethodName() + 3);
+        assertEquals(1, proxy.getQueueSize());
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerClosedTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerClosedTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerClosedTest.java
new file mode 100644
index 0000000..a7b8cdf
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerClosedTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.producer;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+
+/**
+ * Test the contract of MessageProducer that has been closed.
+ */
+public class JmsMessageProducerClosedTest extends AmqpTestSupport {
+
+    protected MessageProducer producer;
+    protected Message message;
+    protected Destination destination;
+
+    protected MessageProducer createProducer() throws Exception {
+        connection = createAmqpConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        message = session.createMessage();
+        destination = session.createTopic("test");
+        MessageProducer producer = session.createProducer(destination);
+        producer.close();
+        return producer;
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        producer = createProducer();
+    }
+
+    @Test(timeout=30000)
+    public void testClose() throws JMSException {
+        producer.close();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testSetDisableMessageIDFails() throws JMSException {
+        producer.setDisableMessageID(true);
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testGetDisableMessageIDFails() throws JMSException {
+        producer.getDisableMessageID();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testSetDisableMessageTimestampFails() throws JMSException {
+        producer.setDisableMessageTimestamp(false);
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testGetDisableMessageTimestampFails() throws JMSException {
+        producer.getDisableMessageTimestamp();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testSetDeliveryModeFails() throws JMSException {
+        producer.setDeliveryMode(1);
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testGetDeliveryModeFails() throws JMSException {
+        producer.getDeliveryMode();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testSetPriorityFails() throws JMSException {
+        producer.setPriority(1);
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testGetPriorityFails() throws JMSException {
+        producer.getPriority();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testSetTimeToLiveFails() throws JMSException {
+        producer.setTimeToLive(1);
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testGetTimeToLiveFails() throws JMSException {
+        producer.getTimeToLive();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testGetDestinationFails() throws JMSException {
+        producer.getDestination();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testSendFails() throws JMSException {
+        producer.send(message);
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testSendWithDestinationFails() throws JMSException {
+        producer.send(destination, message);
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testSendWithModePriorityTTLFails() throws JMSException {
+        producer.send(message, 1, 3, 111);
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testSendWithDestinationModePriorityTTLFails() throws JMSException {
+        producer.send(destination, message, 1, 3, 111);
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerFailedTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerFailedTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerFailedTest.java
new file mode 100644
index 0000000..b6f5011
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerFailedTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.producer;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.support.Wait;
+
+/**
+ * Tests the MessageProducer method contract when it's connection has failed.
+ */
+public class JmsMessageProducerFailedTest extends JmsMessageProducerClosedTest {
+
+    @Override
+    protected MessageProducer createProducer() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+        connection = createAmqpConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        message = session.createMessage();
+        destination = session.createQueue("test");
+        MessageProducer producer = session.createProducer(destination);
+        connection.setExceptionListener(new ExceptionListener() {
+
+            @Override
+            public void onException(JMSException exception) {
+                latch.countDown();
+            }
+        });
+        connection.start();
+        stopPrimaryBroker();
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        final JmsConnection jmsConnection = (JmsConnection) connection;
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return !jmsConnection.isConnected();
+            }
+        }));
+        return producer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerTest.java
new file mode 100644
index 0000000..ceb2d53
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerTest.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.producer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSSecurityException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class JmsMessageProducerTest extends AmqpTestSupport {
+
+    @Test(timeout = 60000)
+    public void testCreateMessageProducer() throws Exception {
+        connection = createAmqpConnection();
+        assertNotNull(connection);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(name.getMethodName());
+        session.createProducer(queue);
+
+        QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(0, proxy.getQueueSize());
+    }
+
+    @Test
+    public void testSendWorksWhenConnectionNotStarted() throws Exception {
+        connection = createAmqpConnection();
+        assertNotNull(connection);
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageProducer producer = session.createProducer(queue);
+
+        QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(0, proxy.getQueueSize());
+
+        Message message = session.createMessage();
+        producer.send(message);
+
+        assertEquals(1, proxy.getQueueSize());
+    }
+
+    @Test
+    public void testSendWorksAfterConnectionStopped() throws Exception {
+        connection = createAmqpConnection();
+        assertNotNull(connection);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageProducer producer = session.createProducer(queue);
+
+        QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(0, proxy.getQueueSize());
+        connection.stop();
+
+        Message message = session.createMessage();
+        producer.send(message);
+
+        assertEquals(1, proxy.getQueueSize());
+    }
+
+    @Test
+    public void testPersistentSendsAreMarkedPersistent() throws Exception {
+        connection = createAmqpConnection();
+        assertNotNull(connection);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageProducer producer = session.createProducer(queue);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+        QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(0, proxy.getQueueSize());
+
+        Message message = session.createMessage();
+        producer.send(message);
+
+        assertEquals(1, proxy.getQueueSize());
+
+        MessageConsumer consumer = session.createConsumer(queue);
+        message = consumer.receive(5000);
+        assertNotNull(message);
+        assertTrue(message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT);
+    }
+
+    @Test
+    public void testProducerWithNoTTLSendsMessagesWithoutTTL() throws Exception {
+        connection = createAmqpConnection();
+        assertNotNull(connection);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageProducer producer = session.createProducer(queue);
+
+        QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(0, proxy.getQueueSize());
+
+        Message message = session.createMessage();
+        producer.send(message);
+
+        assertEquals(1, proxy.getQueueSize());
+
+        MessageConsumer consumer = session.createConsumer(queue);
+        message = consumer.receive(5000);
+        assertNotNull(message);
+        assertEquals(0, message.getJMSExpiration());
+    }
+
+    private String createLargeString(int sizeInBytes) {
+        byte[] base = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
+        StringBuilder builder = new StringBuilder();
+        for (int i = 0; i < sizeInBytes; i++) {
+            builder.append(base[i % base.length]);
+        }
+
+        LOG.debug("Created string with size : " + builder.toString().getBytes().length + " bytes");
+        return builder.toString();
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testSendLargeMessage() throws Exception {
+        connection = createAmqpConnection();
+        assertNotNull(connection);
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        String queueName = name.toString();
+        Queue queue = session.createQueue(queueName);
+
+        MessageProducer producer = session.createProducer(queue);
+        int messageSize = 1024 * 1024;
+        String messageText = createLargeString(messageSize);
+        Message m = session.createTextMessage(messageText);
+        LOG.debug("Sending message of {} bytes on queue {}", messageSize, queueName);
+        producer.send(m);
+
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        Message message = consumer.receive();
+        assertNotNull(message);
+        assertTrue(message instanceof TextMessage);
+        TextMessage textMessage = (TextMessage) message;
+        LOG.debug(">>>> Received message of length {}", textMessage.getText().length());
+        assertEquals(messageSize, textMessage.getText().length());
+        assertEquals(messageText, textMessage.getText());
+    }
+
+    @Test(timeout=90000, expected=JMSSecurityException.class)
+    public void testProducerNotAuthorized() throws Exception{
+        connection = createAmqpConnection("guest", "password");
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("USERS." + name.getMethodName());
+        session.createProducer(queue);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsProduceMessageTypesTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsProduceMessageTypesTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsProduceMessageTypesTest.java
new file mode 100644
index 0000000..5ba9f9e
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsProduceMessageTypesTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.producer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import javax.jms.BytesMessage;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+
+/**
+ * Test basic MessageProducer functionality.
+ */
+public class JmsProduceMessageTypesTest extends AmqpTestSupport {
+
+    @Test(timeout = 60000)
+    public void testSendJMSMessage() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageProducer producer = session.createProducer(queue);
+        Message message = session.createMessage();
+        producer.send(message);
+        QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(1, proxy.getQueueSize());
+    }
+
+    @Test(timeout = 60000)
+    public void testSendJMSBytesMessage() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+
+        String payload = "TEST";
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageProducer producer = session.createProducer(queue);
+        BytesMessage message = session.createBytesMessage();
+        message.writeUTF(payload);
+        producer.send(message);
+        QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(1, proxy.getQueueSize());
+
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message received = consumer.receive(5000);
+        assertNotNull(received);
+        assertTrue(received instanceof BytesMessage);
+        BytesMessage bytes = (BytesMessage) received;
+        assertEquals(payload, bytes.readUTF());
+    }
+
+    @Test(timeout = 60000)
+    public void testSendJMSMapMessage() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageProducer producer = session.createProducer(queue);
+        MapMessage message = session.createMapMessage();
+        message.setBoolean("Boolean", false);
+        message.setString("STRING", "TEST");
+        producer.send(message);
+        QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(1, proxy.getQueueSize());
+
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message received = consumer.receive(5000);
+        assertNotNull(received);
+        assertTrue(received instanceof MapMessage);
+        MapMessage map = (MapMessage) received;
+        assertEquals("TEST", map.getString("STRING"));
+        assertEquals(false, map.getBooleanProperty("Boolean"));
+    }
+
+    @Test(timeout = 60000)
+    public void testSendJMSStreamMessage() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+
+        String payload = "TEST";
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageProducer producer = session.createProducer(queue);
+        StreamMessage message = session.createStreamMessage();
+        message.writeString(payload);
+        producer.send(message);
+        QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(1, proxy.getQueueSize());
+
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message received = consumer.receive(5000);
+        assertNotNull(received);
+        assertTrue(received instanceof StreamMessage);
+        StreamMessage stream = (StreamMessage) received;
+        assertEquals(payload, stream.readString());
+    }
+
+    @Test(timeout = 60000)
+    public void testSendJMSTextMessage() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+
+        String payload = "TEST";
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageProducer producer = session.createProducer(queue);
+        TextMessage message = session.createTextMessage("TEST");
+        producer.send(message);
+        QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(1, proxy.getQueueSize());
+
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message received = consumer.receive(5000);
+        assertNotNull(received);
+        assertTrue(received instanceof TextMessage);
+        TextMessage text = (TextMessage) received;
+        assertEquals(payload, text.getText());
+    }
+
+    @Test(timeout = 60000)
+    public void testSendJMSObjectMessage() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageProducer producer = session.createProducer(queue);
+        ObjectMessage message = session.createObjectMessage("TEST");
+        producer.send(message);
+        QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(1, proxy.getQueueSize());
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsQueueSenderTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsQueueSenderTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsQueueSenderTest.java
new file mode 100644
index 0000000..e4c0910
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsQueueSenderTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.producer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+
+/**
+ * Test basic QueueSender functionality.
+ */
+public class JmsQueueSenderTest extends AmqpTestSupport {
+
+    @Test
+    public void testCreateQueueSender() throws Exception {
+        JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+        QueueConnection connection = factory.createQueueConnection();
+        assertNotNull(connection);
+
+        QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(name.getMethodName());
+        QueueSender sender = session.createSender(queue);
+        assertNotNull(sender);
+
+        QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(0, proxy.getQueueSize());
+        connection.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsTopicPublisherTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsTopicPublisherTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsTopicPublisherTest.java
new file mode 100644
index 0000000..80c4215
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsTopicPublisherTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.producer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+
+import org.apache.activemq.broker.jmx.TopicViewMBean;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+
+/**
+ * test basic TopicPublisher functionality.
+ */
+public class JmsTopicPublisherTest extends AmqpTestSupport {
+
+    @Test
+    public void testCreateTopicPublisher() throws Exception {
+        JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+        TopicConnection connection = factory.createTopicConnection();
+        assertNotNull(connection);
+
+        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Topic topic = session.createTopic(name.getMethodName());
+        TopicPublisher publisher = session.createPublisher(topic);
+        assertNotNull(publisher);
+
+        TopicViewMBean proxy = getProxyToTopic(name.getMethodName());
+        assertEquals(0, proxy.getEnqueueCount());
+        connection.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/AmqpTestSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/AmqpTestSupport.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/AmqpTestSupport.java
new file mode 100644
index 0000000..0e1d247
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/AmqpTestSupport.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.support;
+
+import java.net.URI;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AmqpTestSupport extends QpidJmsTestSupport {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(AmqpTestSupport.class);
+
+    protected boolean isAmqpDiscovery() {
+        return false;
+    }
+
+    protected String getAmqpTransformer() {
+        return "jms";
+    }
+
+    protected int getSocketBufferSize() {
+        return 64 * 1024;
+    }
+
+    protected int getIOBufferSize() {
+        return 8 * 1024;
+    }
+
+    @Override
+    protected void addAdditionalConnectors(BrokerService brokerService, Map<String, Integer> portMap) throws Exception {
+        int port = 0;
+        if (portMap.containsKey("amqp")) {
+            port = portMap.get("amqp");
+        }
+        TransportConnector connector = brokerService.addConnector(
+            "amqp://0.0.0.0:" + port + "?transport.transformer=" + getAmqpTransformer() +
+            "&transport.socketBufferSize=" + getSocketBufferSize() + "&ioBufferSize=" + getIOBufferSize());
+        connector.setName("amqp");
+        if (isAmqpDiscovery()) {
+            connector.setDiscoveryUri(new URI("multicast://default"));
+        }
+        port = connector.getPublishableConnectURI().getPort();
+        LOG.debug("Using amqp port: {}", port);
+    }
+
+    public String getAmqpConnectionURIOptions() {
+        return "";
+    }
+
+    public URI getBrokerAmqpConnectionURI() {
+        try {
+            String uri = "amqp://127.0.0.1:" +
+                brokerService.getTransportConnectorByName("amqp").getPublishableConnectURI().getPort();
+
+            if (!getAmqpConnectionURIOptions().isEmpty()) {
+                uri = uri + "?" + getAmqpConnectionURIOptions();
+            }
+
+            return new URI(uri);
+        } catch (Exception e) {
+            throw new RuntimeException();
+        }
+    }
+
+    public String getAmqpFailoverURI() throws Exception {
+        StringBuilder uri = new StringBuilder();
+        uri.append("failover://(");
+        uri.append(brokerService.getTransportConnectorByName("amqp").getPublishableConnectString());
+
+        for (BrokerService broker : brokers) {
+            uri.append(",");
+            uri.append(broker.getTransportConnectorByName("amqp").getPublishableConnectString());
+        }
+
+        uri.append(")");
+
+        return uri.toString();
+    }
+
+    public Connection createAmqpConnection() throws Exception {
+        return createAmqpConnection(getBrokerAmqpConnectionURI());
+    }
+
+    public Connection createAmqpConnection(String username, String password) throws Exception {
+        return createAmqpConnection(getBrokerAmqpConnectionURI(), username, password);
+    }
+
+    public Connection createAmqpConnection(URI brokerURI) throws Exception {
+        return createAmqpConnection(brokerURI, null, null);
+    }
+
+    public Connection createAmqpConnection(URI brokerURI, String username, String password) throws Exception {
+        ConnectionFactory factory = createAmqpConnectionFactory(brokerURI, username, password);
+        return factory.createConnection();
+    }
+
+    public ConnectionFactory createAmqpConnectionFactory() throws Exception {
+        return createAmqpConnectionFactory(getBrokerAmqpConnectionURI(), null, null);
+    }
+
+    public ConnectionFactory createAmqpConnectionFactory(URI brokerURI) throws Exception {
+        return createAmqpConnectionFactory(brokerURI, null, null);
+    }
+
+    public ConnectionFactory createAmqpConnectionFactory(String username, String password) throws Exception {
+        return createAmqpConnectionFactory(getBrokerAmqpConnectionURI(), username, password);
+    }
+
+    public ConnectionFactory createAmqpConnectionFactory(URI brokerURI, String username, String password) throws Exception {
+        JmsConnectionFactory factory = new JmsConnectionFactory(brokerURI);
+        factory.setForceAsyncSend(isForceAsyncSends());
+        factory.setAlwaysSyncSend(isAlwaysSyncSend());
+        factory.setMessagePrioritySupported(isMessagePrioritySupported());
+        factory.setSendAcksAsync(isSendAcksAsync());
+        if (username != null) {
+            factory.setUsername(username);
+        }
+        if (password != null) {
+            factory.setPassword(password);
+        }
+        return factory;
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org