You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/09/23 20:20:26 UTC
[02/27] Initial drop of donated AMQP Client Code.
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/FailoverProviderTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/FailoverProviderTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/FailoverProviderTest.java
new file mode 100644
index 0000000..a458ab7
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/FailoverProviderTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.failover;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.qpid.jms.provider.DefaultProviderListener;
+import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.provider.failover.FailoverProvider;
+import org.apache.qpid.jms.provider.failover.FailoverProviderFactory;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+
+/**
+ * Test basic functionality of the FailoverProvider class.
+ */
+public class FailoverProviderTest extends AmqpTestSupport {
+
+ @Test(timeout=60000)
+ public void testFailoverCreate() throws Exception {
+ URI brokerURI = new URI("failover:" + getBrokerAmqpConnectionURI());
+ Provider asyncProvider = FailoverProviderFactory.createAsync(brokerURI);
+ assertNotNull(asyncProvider);
+ FailoverProvider provider = (FailoverProvider) asyncProvider;
+ assertNotNull(provider);
+ }
+
+ @Test(timeout=60000)
+ public void testFailoverURIConfiguration() throws Exception {
+ URI brokerURI = new URI("failover://(" + getBrokerAmqpConnectionURI() + ")" +
+ "?maxReconnectDelay=1000&useExponentialBackOff=false" +
+ "&maxReconnectAttempts=10&startupMaxReconnectAttempts=20");
+ Provider asyncProvider = FailoverProviderFactory.createAsync(brokerURI);
+ assertNotNull(asyncProvider);
+ FailoverProvider provider = (FailoverProvider) asyncProvider;
+ assertNotNull(provider);
+
+ assertEquals(1000, provider.getMaxReconnectDelay());
+ assertFalse(provider.isUseExponentialBackOff());
+ assertEquals(10, provider.getMaxReconnectAttempts());
+ assertEquals(20, provider.getStartupMaxReconnectAttempts());
+ }
+
+ @Test(timeout=60000)
+ public void testStartupReconnectAttempts() throws Exception {
+ URI brokerURI = new URI("failover://(amqp://localhost:61616)" +
+ "?maxReconnectDelay=100&startupMaxReconnectAttempts=5");
+ Provider asyncProvider = FailoverProviderFactory.createAsync(brokerURI);
+ assertNotNull(asyncProvider);
+ FailoverProvider provider = (FailoverProvider) asyncProvider;
+ assertNotNull(provider);
+
+ assertEquals(100, provider.getMaxReconnectDelay());
+ assertEquals(5, provider.getStartupMaxReconnectAttempts());
+
+ final CountDownLatch failed = new CountDownLatch(1);
+
+ provider.setProviderListener(new DefaultProviderListener() {
+
+ @Override
+ public void onConnectionFailure(IOException ex) {
+ failed.countDown();
+ }
+ });
+
+ provider.connect();
+
+ assertTrue(failed.await(2, TimeUnit.SECONDS));
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java
new file mode 100644
index 0000000..d615f3d
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.failover;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.apache.qpid.jms.support.Wait;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Basic tests for the FailoverProvider implementation
+ */
+public class JmsFailoverTest extends AmqpTestSupport {
+
+ @Override
+ protected boolean isPersistent() {
+ return true;
+ }
+
+ @Test(timeout=60000)
+ public void testFailoverConnects() throws Exception {
+ URI brokerURI = new URI(getAmqpFailoverURI());
+ Connection connection = createAmqpConnection(brokerURI);
+ connection.start();
+ connection.close();
+ }
+
+ @Test(timeout=60000)
+ public void testFailoverConnectsWithMultipleURIs() throws Exception {
+ URI brokerURI = new URI("failover://(amqp://127.0.0.1:61616,amqp://localhost:5777," +
+ getBrokerAmqpConnectionURI() + ")?maxReconnectDelay=500");
+ Connection connection = createAmqpConnection(brokerURI);
+ connection.start();
+ connection.close();
+ }
+
+ @Test(timeout=60000, expected=JMSException.class)
+ public void testStartupReconnectAttempts() throws Exception {
+ URI brokerURI = new URI("failover://(amqp://localhost:61616)" +
+ "?maxReconnectDelay=1000&startupMaxReconnectAttempts=5");
+ JmsConnectionFactory factory = new JmsConnectionFactory(brokerURI);
+ Connection connection = factory.createConnection();
+ connection.start();
+ }
+
+ @Test(timeout=60000, expected=JMSException.class)
+ public void testStartupReconnectAttemptsMultipleHosts() throws Exception {
+ URI brokerURI = new URI("failover://(amqp://localhost:61616,amqp://localhost:61617)" +
+ "?maxReconnectDelay=1000&startupMaxReconnectAttempts=5");
+ JmsConnectionFactory factory = new JmsConnectionFactory(brokerURI);
+ Connection connection = factory.createConnection();
+ connection.start();
+ }
+
+ @Test(timeout=60000)
+ public void testStartFailureWithAsyncExceptionListener() throws Exception {
+ URI brokerURI = new URI(getAmqpFailoverURI() + "?maxReconnectDelay=1000&maxReconnectAttempts=5");
+
+ final CountDownLatch failed = new CountDownLatch(1);
+ JmsConnectionFactory factory = new JmsConnectionFactory(brokerURI);
+ factory.setExceptionListener(new ExceptionListener() {
+
+ @Override
+ public void onException(JMSException exception) {
+ LOG.info("Connection got exception: {}", exception.getMessage());
+ failed.countDown();
+ }
+ });
+ Connection connection = factory.createConnection();
+ connection.start();
+
+ stopPrimaryBroker();
+
+ assertTrue("No async exception", failed.await(15, TimeUnit.SECONDS));
+ }
+
+ @SuppressWarnings("unused")
+ @Test(timeout=60000)
+ public void testBasicStateRestoration() throws Exception {
+ URI brokerURI = new URI(getAmqpFailoverURI() + "?maxReconnectDelay=1000");
+
+ Connection connection = createAmqpConnection(brokerURI);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageProducer producer = session.createProducer(queue);
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length);
+ assertEquals(1, brokerService.getAdminView().getQueueProducers().length);
+
+ restartPrimaryBroker();
+
+ assertTrue("Should have a new connection.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return brokerService.getAdminView().getCurrentConnectionsCount() == 1;
+ }
+ }));
+
+ assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length);
+ assertEquals(1, brokerService.getAdminView().getQueueProducers().length);
+
+ connection.close();
+ }
+
+ @SuppressWarnings("unused")
+ @Test(timeout=60000)
+ public void testDurableSubscriberRestores() throws Exception {
+ URI brokerURI = new URI(getAmqpFailoverURI() + "?maxReconnectDelay=1000");
+
+ Connection connection = createAmqpConnection(brokerURI);
+ connection.setClientID(name.getMethodName());
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic(name.getMethodName());
+ MessageConsumer consumer = session.createDurableSubscriber(topic, name.getMethodName());
+
+ assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
+
+ restartPrimaryBroker();
+
+ assertTrue("Should have a new connection.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return brokerService.getAdminView().getCurrentConnectionsCount() == 1;
+ }
+ }));
+
+ assertTrue("Should have no inactive subscribers.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 0;
+ }
+ }));
+
+ assertTrue("Should have one durable sub.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return brokerService.getAdminView().getDurableTopicSubscribers().length == 1;
+ }
+ }));
+
+ connection.close();
+ }
+
+ @Test(timeout=90000)
+ public void testBadFirstURIConnectsAndProducerWorks() throws Exception {
+ URI brokerURI = new URI("failover://(amqp://localhost:61616," +
+ getBrokerAmqpConnectionURI() + ")?maxReconnectDelay=1000");
+
+ Connection connection = createAmqpConnection(brokerURI);
+ connection.start();
+
+ final int MSG_COUNT = 10;
+ final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(name.getMethodName());
+ final MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ final CountDownLatch failed = new CountDownLatch(1);
+
+ assertEquals(1, brokerService.getAdminView().getQueueProducers().length);
+
+ for (int i = 0; i < MSG_COUNT; ++i) {
+ producer.send(session.createTextMessage("Message: " + i));
+ }
+
+ final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+
+ assertTrue("Should have all messages sent.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return proxy.getQueueSize() == MSG_COUNT;
+ }
+ }));
+
+ assertFalse(failed.getCount() == 0);
+ connection.close();
+ }
+
+ // TODO - FIXME
+ @Ignore("Test currently not working")
+ @Test(timeout=90000)
+ public void testProducerBlocksAndRecovers() throws Exception {
+ URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() +")?maxReconnectDelay=1000");
+
+ Connection connection = createAmqpConnection(brokerURI);
+ connection.start();
+
+ final int MSG_COUNT = 10;
+ final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(name.getMethodName());
+ final MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ final CountDownLatch failed = new CountDownLatch(1);
+
+ assertEquals(1, brokerService.getAdminView().getQueueProducers().length);
+
+ Thread producerThread = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ for (int i = 0; i < MSG_COUNT; ++i) {
+ producer.send(session.createTextMessage("Message: " + i));
+ TimeUnit.SECONDS.sleep(1);
+ }
+ } catch (Exception e) {
+ }
+ }
+ });
+ producerThread.start();
+
+ TimeUnit.SECONDS.sleep(3);
+ stopPrimaryBroker();
+ TimeUnit.SECONDS.sleep(3);
+ restartPrimaryBroker();
+
+ assertTrue("Should have a new connection.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return brokerService.getAdminView().getCurrentConnectionsCount() == 1;
+ }
+ }));
+
+ assertEquals(1, brokerService.getAdminView().getQueueProducers().length);
+
+ final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+
+ assertTrue("Should have all messages sent.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return proxy.getQueueSize() == MSG_COUNT;
+ }
+ }));
+
+ assertFalse(failed.getCount() == 0);
+ connection.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsOfflineBehaviorTests.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsOfflineBehaviorTests.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsOfflineBehaviorTests.java
new file mode 100644
index 0000000..bc971f6
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsOfflineBehaviorTests.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.failover;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.apache.qpid.jms.support.Wait;
+import org.junit.Test;
+
+/**
+ * Test various client behaviors when the connection has gone offline.
+ */
+public class JmsOfflineBehaviorTests extends AmqpTestSupport {
+
+ @Test(timeout=60000)
+ public void testConnectionCloseDoesNotBlock() throws Exception {
+ URI brokerURI = new URI(getAmqpFailoverURI());
+ Connection connection = createAmqpConnection(brokerURI);
+ connection.start();
+ stopPrimaryBroker();
+ connection.close();
+ }
+
+ @Test(timeout=60000)
+ public void testSessionCloseDoesNotBlock() throws Exception {
+ URI brokerURI = new URI(getAmqpFailoverURI());
+ Connection connection = createAmqpConnection(brokerURI);
+ connection.start();
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ stopPrimaryBroker();
+ session.close();
+ connection.close();
+ }
+
+ @Test(timeout=60000)
+ public void testClientAckDoesNotBlock() throws Exception {
+ URI brokerURI = new URI(getAmqpFailoverURI());
+ Connection connection = createAmqpConnection(brokerURI);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(queue);
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Test"));
+
+ Message message = consumer.receive(5000);
+ assertNotNull(message);
+ stopPrimaryBroker();
+ message.acknowledge();
+
+ connection.close();
+ }
+
+ @Test(timeout=60000)
+ public void testProducerCloseDoesNotBlock() throws Exception {
+ URI brokerURI = new URI(getAmqpFailoverURI());
+ Connection connection = createAmqpConnection(brokerURI);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageProducer producer = session.createProducer(queue);
+
+ stopPrimaryBroker();
+ producer.close();
+ connection.close();
+ }
+
+ @Test(timeout=60000)
+ public void testConsumerCloseDoesNotBlock() throws Exception {
+ URI brokerURI = new URI(getAmqpFailoverURI());
+ Connection connection = createAmqpConnection(brokerURI);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ stopPrimaryBroker();
+ consumer.close();
+ connection.close();
+ }
+
+ @SuppressWarnings("unused")
+ @Test(timeout=60000)
+ public void testSessionCloseWithOpenResourcesDoesNotBlock() throws Exception {
+ URI brokerURI = new URI(getAmqpFailoverURI());
+ Connection connection = createAmqpConnection(brokerURI);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(queue);
+ MessageProducer producer = session.createProducer(queue);
+
+ stopPrimaryBroker();
+ session.close();
+ connection.close();
+ }
+
+ @Test(timeout=60000)
+ public void testGetRemoteURI() throws Exception {
+
+ startNewBroker();
+
+ URI brokerURI = new URI(getAmqpFailoverURI() + "randomize=false");
+ Connection connection = createAmqpConnection(brokerURI);
+ connection.start();
+
+ JmsConnection jmsConnection = (JmsConnection) connection;
+ final Provider provider = jmsConnection.getProvider();
+
+ URI connectedURI = provider.getRemoteURI();
+ assertNotNull(connectedURI);
+
+ final List<URI> brokers = getBrokerURIs();
+ assertEquals(brokers.get(0), connectedURI);
+
+ stopPrimaryBroker();
+
+ assertTrue("Should connect to secondary URI.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return provider.getRemoteURI().equals(brokers.get(1));
+ }
+ }));
+
+ connection.close();
+ }
+
+ @SuppressWarnings("unused")
+ @Test(timeout=60000)
+ public void testClosedReourcesAreNotRestored() throws Exception {
+ URI brokerURI = new URI(getAmqpFailoverURI() + "?maxReconnectDelay=500");
+ Connection connection = createAmqpConnection(brokerURI);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(queue);
+ MessageProducer producer = session.createProducer(queue);
+
+ assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length);
+ assertEquals(1, brokerService.getAdminView().getQueueProducers().length);
+
+ stopPrimaryBroker();
+ session.close();
+ TimeUnit.SECONDS.sleep(2);
+ restartPrimaryBroker();
+
+ assertTrue("Should have a new connection.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return brokerService.getAdminView().getCurrentConnectionsCount() == 1;
+ }
+ }));
+
+ assertEquals(0, brokerService.getAdminView().getQueueSubscribers().length);
+ assertEquals(0, brokerService.getAdminView().getQueueProducers().length);
+
+ connection.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/joram/ActiveMQAdmin.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/joram/ActiveMQAdmin.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/joram/ActiveMQAdmin.java
new file mode 100644
index 0000000..f1e7db1
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/joram/ActiveMQAdmin.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.joram;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Hashtable;
+
+import javax.jms.ConnectionFactory;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.JmsQueue;
+import org.apache.qpid.jms.JmsTopic;
+import org.objectweb.jtests.jms.admin.Admin;
+
+/**
+ *
+ */
+public class ActiveMQAdmin implements Admin {
+
+ Context context;
+ {
+ try {
+ // Use the jetty JNDI context since it's mutable.
+ final Hashtable<String, String> env = new Hashtable<String, String>();
+ env.put("java.naming.factory.initial", "org.eclipse.jetty.jndi.InitialContextFactory");
+ env.put("java.naming.factory.url.pkgs", "org.eclipse.jetty.jndi");
+ ;
+ context = new InitialContext(env);
+ } catch (NamingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ return BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false"));
+ }
+
+ @Override
+ public String getName() {
+ return getClass().getName();
+ }
+
+ static BrokerService broker;
+ static int port;
+
+ @Override
+ public void startServer() throws Exception {
+ if (broker != null) {
+ stopServer();
+ }
+ if (System.getProperty("basedir") == null) {
+ File file = new File(".");
+ System.setProperty("basedir", file.getAbsolutePath());
+ }
+ broker = createBroker();
+ TransportConnector connector = broker.addConnector(getConnectorURI());
+ broker.start();
+ port = connector.getConnectUri().getPort();
+ }
+
+ protected String getConnectorURI() {
+ return "amqp://localhost:0";
+ }
+
+ @Override
+ public void stopServer() throws Exception {
+ broker.stop();
+ broker = null;
+ }
+
+ @Override
+ public void start() throws Exception {
+ }
+
+ @Override
+ public void stop() throws Exception {
+ }
+
+ @Override
+ public Context createContext() throws NamingException {
+ return context;
+ }
+
+ @Override
+ public void createQueue(String name) {
+ try {
+ context.bind(name, new JmsQueue(name));
+ } catch (NamingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void createTopic(String name) {
+ try {
+ context.bind(name, new JmsTopic(name));
+ } catch (NamingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void deleteQueue(String name) {
+ try {
+ context.unbind(name);
+ } catch (NamingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void deleteTopic(String name) {
+ try {
+ context.unbind(name);
+ } catch (NamingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void createConnectionFactory(String name) {
+ try {
+ final ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + port);
+ context.bind(name, factory);
+ } catch (NamingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void deleteConnectionFactory(String name) {
+ try {
+ context.unbind(name);
+ } catch (NamingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void createQueueConnectionFactory(String name) {
+ createConnectionFactory(name);
+ }
+
+ @Override
+ public void createTopicConnectionFactory(String name) {
+ createConnectionFactory(name);
+ }
+
+ @Override
+ public void deleteQueueConnectionFactory(String name) {
+ deleteConnectionFactory(name);
+ }
+
+ @Override
+ public void deleteTopicConnectionFactory(String name) {
+ deleteConnectionFactory(name);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/joram/JoramJmsTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/joram/JoramJmsTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/joram/JoramJmsTest.java
new file mode 100644
index 0000000..3e59b69
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/joram/JoramJmsTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.joram;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import org.objectweb.jtests.jms.conform.connection.ConnectionTest;
+import org.objectweb.jtests.jms.conform.connection.TopicConnectionTest;
+import org.objectweb.jtests.jms.conform.message.MessageBodyTest;
+import org.objectweb.jtests.jms.conform.message.MessageDefaultTest;
+import org.objectweb.jtests.jms.conform.message.headers.MessageHeaderTest;
+import org.objectweb.jtests.jms.conform.message.properties.MessagePropertyConversionTest;
+import org.objectweb.jtests.jms.conform.message.properties.MessagePropertyTest;
+import org.objectweb.jtests.jms.conform.queue.TemporaryQueueTest;
+import org.objectweb.jtests.jms.conform.selector.SelectorSyntaxTest;
+import org.objectweb.jtests.jms.conform.selector.SelectorTest;
+import org.objectweb.jtests.jms.conform.session.QueueSessionTest;
+import org.objectweb.jtests.jms.conform.topic.TemporaryTopicTest;
+
+public class JoramJmsTest extends TestCase {
+
+ public static Test suite() {
+ TestSuite suite = new TestSuite();
+
+ // TODO: Fix these tests..
+ // Fails due to
+ // https://issues.apache.org/jira/browse/PROTON-154
+ // suite.addTestSuite(TopicSessionTest.class);
+
+// suite.addTestSuite(MessageTypeTest.class);
+// suite.addTestSuite(UnifiedSessionTest.class);
+// suite.addTestSuite(JMSXPropertyTest.class);
+// suite.addTestSuite(SessionTest.class);
+
+// suite.addTestSuite(QueueBrowserTest.class);
+ suite.addTestSuite(QueueSessionTest.class);
+ suite.addTestSuite(SelectorSyntaxTest.class);
+ suite.addTestSuite(SelectorTest.class);
+ suite.addTestSuite(MessageHeaderTest.class);
+ suite.addTestSuite(TemporaryTopicTest.class);
+ suite.addTestSuite(TemporaryQueueTest.class);
+ suite.addTestSuite(TopicConnectionTest.class);
+ suite.addTestSuite(ConnectionTest.class);
+ suite.addTestSuite(MessageBodyTest.class);
+ suite.addTestSuite(MessageDefaultTest.class);
+ suite.addTestSuite(MessagePropertyConversionTest.class);
+ suite.addTestSuite(MessagePropertyTest.class);
+
+ return suite;
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsAnonymousProducerTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsAnonymousProducerTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsAnonymousProducerTest.java
new file mode 100644
index 0000000..04b954a
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsAnonymousProducerTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.producer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+
+/**
+ * Test JMS Anonymous Producer functionality.
+ */
+public class JmsAnonymousProducerTest extends AmqpTestSupport {
+
+ @Test(timeout = 60000)
+ public void testCreateProducer() throws Exception {
+ connection = createAmqpConnection();
+ assertNotNull(connection);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ session.createProducer(null);
+
+ assertTrue(brokerService.getAdminView().getTotalProducerCount() == 0);
+ }
+
+ @Test(timeout = 60000)
+ public void testAnonymousSend() throws Exception {
+ connection = createAmqpConnection();
+ assertNotNull(connection);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(name.getMethodName());
+ assertNotNull(session);
+ MessageProducer producer = session.createProducer(null);
+
+ Message message = session.createMessage();
+ producer.send(queue, message);
+
+ QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(1, proxy.getQueueSize());
+ }
+
+ @Test(timeout = 60000)
+ public void testAnonymousSendToMultipleDestinations() throws Exception {
+ connection = createAmqpConnection();
+ assertNotNull(connection);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue1 = session.createQueue(name.getMethodName() + 1);
+ Queue queue2 = session.createQueue(name.getMethodName() + 2);
+ Queue queue3 = session.createQueue(name.getMethodName() + 3);
+ assertNotNull(session);
+ MessageProducer producer = session.createProducer(null);
+
+ Message message = session.createMessage();
+ producer.send(queue1, message);
+ producer.send(queue2, message);
+ producer.send(queue3, message);
+
+ QueueViewMBean proxy = getProxyToQueue(name.getMethodName() + 1);
+ assertEquals(1, proxy.getQueueSize());
+ proxy = getProxyToQueue(name.getMethodName() + 2);
+ assertEquals(1, proxy.getQueueSize());
+ proxy = getProxyToQueue(name.getMethodName() + 3);
+ assertEquals(1, proxy.getQueueSize());
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerClosedTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerClosedTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerClosedTest.java
new file mode 100644
index 0000000..a7b8cdf
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerClosedTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.producer;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+
+/**
+ * Test the contract of MessageProducer that has been closed.
+ */
+public class JmsMessageProducerClosedTest extends AmqpTestSupport {
+
+ protected MessageProducer producer;
+ protected Message message;
+ protected Destination destination;
+
+ protected MessageProducer createProducer() throws Exception {
+ connection = createAmqpConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ message = session.createMessage();
+ destination = session.createTopic("test");
+ MessageProducer producer = session.createProducer(destination);
+ producer.close();
+ return producer;
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ producer = createProducer();
+ }
+
+ @Test(timeout=30000)
+ public void testClose() throws JMSException {
+ producer.close();
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testSetDisableMessageIDFails() throws JMSException {
+ producer.setDisableMessageID(true);
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testGetDisableMessageIDFails() throws JMSException {
+ producer.getDisableMessageID();
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testSetDisableMessageTimestampFails() throws JMSException {
+ producer.setDisableMessageTimestamp(false);
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testGetDisableMessageTimestampFails() throws JMSException {
+ producer.getDisableMessageTimestamp();
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testSetDeliveryModeFails() throws JMSException {
+ producer.setDeliveryMode(1);
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testGetDeliveryModeFails() throws JMSException {
+ producer.getDeliveryMode();
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testSetPriorityFails() throws JMSException {
+ producer.setPriority(1);
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testGetPriorityFails() throws JMSException {
+ producer.getPriority();
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testSetTimeToLiveFails() throws JMSException {
+ producer.setTimeToLive(1);
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testGetTimeToLiveFails() throws JMSException {
+ producer.getTimeToLive();
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testGetDestinationFails() throws JMSException {
+ producer.getDestination();
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testSendFails() throws JMSException {
+ producer.send(message);
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testSendWithDestinationFails() throws JMSException {
+ producer.send(destination, message);
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testSendWithModePriorityTTLFails() throws JMSException {
+ producer.send(message, 1, 3, 111);
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testSendWithDestinationModePriorityTTLFails() throws JMSException {
+ producer.send(destination, message, 1, 3, 111);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerFailedTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerFailedTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerFailedTest.java
new file mode 100644
index 0000000..b6f5011
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerFailedTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.producer;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.support.Wait;
+
+/**
+ * Tests the MessageProducer method contract when it's connection has failed.
+ */
+public class JmsMessageProducerFailedTest extends JmsMessageProducerClosedTest {
+
+ @Override
+ protected MessageProducer createProducer() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ connection = createAmqpConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ message = session.createMessage();
+ destination = session.createQueue("test");
+ MessageProducer producer = session.createProducer(destination);
+ connection.setExceptionListener(new ExceptionListener() {
+
+ @Override
+ public void onException(JMSException exception) {
+ latch.countDown();
+ }
+ });
+ connection.start();
+ stopPrimaryBroker();
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ final JmsConnection jmsConnection = (JmsConnection) connection;
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return !jmsConnection.isConnected();
+ }
+ }));
+ return producer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerTest.java
new file mode 100644
index 0000000..ceb2d53
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerTest.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.producer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSSecurityException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class JmsMessageProducerTest extends AmqpTestSupport {
+
+ @Test(timeout = 60000)
+ public void testCreateMessageProducer() throws Exception {
+ connection = createAmqpConnection();
+ assertNotNull(connection);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ Queue queue = session.createQueue(name.getMethodName());
+ session.createProducer(queue);
+
+ QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(0, proxy.getQueueSize());
+ }
+
+ @Test
+ public void testSendWorksWhenConnectionNotStarted() throws Exception {
+ connection = createAmqpConnection();
+ assertNotNull(connection);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageProducer producer = session.createProducer(queue);
+
+ QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(0, proxy.getQueueSize());
+
+ Message message = session.createMessage();
+ producer.send(message);
+
+ assertEquals(1, proxy.getQueueSize());
+ }
+
+ @Test
+ public void testSendWorksAfterConnectionStopped() throws Exception {
+ connection = createAmqpConnection();
+ assertNotNull(connection);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageProducer producer = session.createProducer(queue);
+
+ QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(0, proxy.getQueueSize());
+ connection.stop();
+
+ Message message = session.createMessage();
+ producer.send(message);
+
+ assertEquals(1, proxy.getQueueSize());
+ }
+
+ @Test
+ public void testPersistentSendsAreMarkedPersistent() throws Exception {
+ connection = createAmqpConnection();
+ assertNotNull(connection);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(0, proxy.getQueueSize());
+
+ Message message = session.createMessage();
+ producer.send(message);
+
+ assertEquals(1, proxy.getQueueSize());
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ message = consumer.receive(5000);
+ assertNotNull(message);
+ assertTrue(message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT);
+ }
+
+ @Test
+ public void testProducerWithNoTTLSendsMessagesWithoutTTL() throws Exception {
+ connection = createAmqpConnection();
+ assertNotNull(connection);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageProducer producer = session.createProducer(queue);
+
+ QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(0, proxy.getQueueSize());
+
+ Message message = session.createMessage();
+ producer.send(message);
+
+ assertEquals(1, proxy.getQueueSize());
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ message = consumer.receive(5000);
+ assertNotNull(message);
+ assertEquals(0, message.getJMSExpiration());
+ }
+
+ private String createLargeString(int sizeInBytes) {
+ byte[] base = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < sizeInBytes; i++) {
+ builder.append(base[i % base.length]);
+ }
+
+ LOG.debug("Created string with size : " + builder.toString().getBytes().length + " bytes");
+ return builder.toString();
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testSendLargeMessage() throws Exception {
+ connection = createAmqpConnection();
+ assertNotNull(connection);
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String queueName = name.toString();
+ Queue queue = session.createQueue(queueName);
+
+ MessageProducer producer = session.createProducer(queue);
+ int messageSize = 1024 * 1024;
+ String messageText = createLargeString(messageSize);
+ Message m = session.createTextMessage(messageText);
+ LOG.debug("Sending message of {} bytes on queue {}", messageSize, queueName);
+ producer.send(m);
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ Message message = consumer.receive();
+ assertNotNull(message);
+ assertTrue(message instanceof TextMessage);
+ TextMessage textMessage = (TextMessage) message;
+ LOG.debug(">>>> Received message of length {}", textMessage.getText().length());
+ assertEquals(messageSize, textMessage.getText().length());
+ assertEquals(messageText, textMessage.getText());
+ }
+
+ @Test(timeout=90000, expected=JMSSecurityException.class)
+ public void testProducerNotAuthorized() throws Exception{
+ connection = createAmqpConnection("guest", "password");
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("USERS." + name.getMethodName());
+ session.createProducer(queue);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsProduceMessageTypesTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsProduceMessageTypesTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsProduceMessageTypesTest.java
new file mode 100644
index 0000000..5ba9f9e
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsProduceMessageTypesTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.producer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import javax.jms.BytesMessage;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+
+/**
+ * Test basic MessageProducer functionality.
+ */
+public class JmsProduceMessageTypesTest extends AmqpTestSupport {
+
+ @Test(timeout = 60000)
+ public void testSendJMSMessage() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageProducer producer = session.createProducer(queue);
+ Message message = session.createMessage();
+ producer.send(message);
+ QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(1, proxy.getQueueSize());
+ }
+
+ @Test(timeout = 60000)
+ public void testSendJMSBytesMessage() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+
+ String payload = "TEST";
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageProducer producer = session.createProducer(queue);
+ BytesMessage message = session.createBytesMessage();
+ message.writeUTF(payload);
+ producer.send(message);
+ QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(1, proxy.getQueueSize());
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message received = consumer.receive(5000);
+ assertNotNull(received);
+ assertTrue(received instanceof BytesMessage);
+ BytesMessage bytes = (BytesMessage) received;
+ assertEquals(payload, bytes.readUTF());
+ }
+
+ @Test(timeout = 60000)
+ public void testSendJMSMapMessage() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageProducer producer = session.createProducer(queue);
+ MapMessage message = session.createMapMessage();
+ message.setBoolean("Boolean", false);
+ message.setString("STRING", "TEST");
+ producer.send(message);
+ QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(1, proxy.getQueueSize());
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message received = consumer.receive(5000);
+ assertNotNull(received);
+ assertTrue(received instanceof MapMessage);
+ MapMessage map = (MapMessage) received;
+ assertEquals("TEST", map.getString("STRING"));
+ assertEquals(false, map.getBooleanProperty("Boolean"));
+ }
+
+ @Test(timeout = 60000)
+ public void testSendJMSStreamMessage() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+
+ String payload = "TEST";
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageProducer producer = session.createProducer(queue);
+ StreamMessage message = session.createStreamMessage();
+ message.writeString(payload);
+ producer.send(message);
+ QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(1, proxy.getQueueSize());
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message received = consumer.receive(5000);
+ assertNotNull(received);
+ assertTrue(received instanceof StreamMessage);
+ StreamMessage stream = (StreamMessage) received;
+ assertEquals(payload, stream.readString());
+ }
+
+ @Test(timeout = 60000)
+ public void testSendJMSTextMessage() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+
+ String payload = "TEST";
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageProducer producer = session.createProducer(queue);
+ TextMessage message = session.createTextMessage("TEST");
+ producer.send(message);
+ QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(1, proxy.getQueueSize());
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message received = consumer.receive(5000);
+ assertNotNull(received);
+ assertTrue(received instanceof TextMessage);
+ TextMessage text = (TextMessage) received;
+ assertEquals(payload, text.getText());
+ }
+
+ @Test(timeout = 60000)
+ public void testSendJMSObjectMessage() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageProducer producer = session.createProducer(queue);
+ ObjectMessage message = session.createObjectMessage("TEST");
+ producer.send(message);
+ QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(1, proxy.getQueueSize());
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsQueueSenderTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsQueueSenderTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsQueueSenderTest.java
new file mode 100644
index 0000000..e4c0910
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsQueueSenderTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.producer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+
+/**
+ * Test basic QueueSender functionality.
+ */
+public class JmsQueueSenderTest extends AmqpTestSupport {
+
+ @Test
+ public void testCreateQueueSender() throws Exception {
+ JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+ QueueConnection connection = factory.createQueueConnection();
+ assertNotNull(connection);
+
+ QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ Queue queue = session.createQueue(name.getMethodName());
+ QueueSender sender = session.createSender(queue);
+ assertNotNull(sender);
+
+ QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(0, proxy.getQueueSize());
+ connection.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsTopicPublisherTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsTopicPublisherTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsTopicPublisherTest.java
new file mode 100644
index 0000000..80c4215
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsTopicPublisherTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.producer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+
+import org.apache.activemq.broker.jmx.TopicViewMBean;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+
+/**
+ * test basic TopicPublisher functionality.
+ */
+public class JmsTopicPublisherTest extends AmqpTestSupport {
+
+ @Test
+ public void testCreateTopicPublisher() throws Exception {
+ JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+ TopicConnection connection = factory.createTopicConnection();
+ assertNotNull(connection);
+
+ TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ Topic topic = session.createTopic(name.getMethodName());
+ TopicPublisher publisher = session.createPublisher(topic);
+ assertNotNull(publisher);
+
+ TopicViewMBean proxy = getProxyToTopic(name.getMethodName());
+ assertEquals(0, proxy.getEnqueueCount());
+ connection.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/AmqpTestSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/AmqpTestSupport.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/AmqpTestSupport.java
new file mode 100644
index 0000000..0e1d247
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/AmqpTestSupport.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.support;
+
+import java.net.URI;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AmqpTestSupport extends QpidJmsTestSupport {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(AmqpTestSupport.class);
+
+ protected boolean isAmqpDiscovery() {
+ return false;
+ }
+
+ protected String getAmqpTransformer() {
+ return "jms";
+ }
+
+ protected int getSocketBufferSize() {
+ return 64 * 1024;
+ }
+
+ protected int getIOBufferSize() {
+ return 8 * 1024;
+ }
+
+ @Override
+ protected void addAdditionalConnectors(BrokerService brokerService, Map<String, Integer> portMap) throws Exception {
+ int port = 0;
+ if (portMap.containsKey("amqp")) {
+ port = portMap.get("amqp");
+ }
+ TransportConnector connector = brokerService.addConnector(
+ "amqp://0.0.0.0:" + port + "?transport.transformer=" + getAmqpTransformer() +
+ "&transport.socketBufferSize=" + getSocketBufferSize() + "&ioBufferSize=" + getIOBufferSize());
+ connector.setName("amqp");
+ if (isAmqpDiscovery()) {
+ connector.setDiscoveryUri(new URI("multicast://default"));
+ }
+ port = connector.getPublishableConnectURI().getPort();
+ LOG.debug("Using amqp port: {}", port);
+ }
+
+ public String getAmqpConnectionURIOptions() {
+ return "";
+ }
+
+ public URI getBrokerAmqpConnectionURI() {
+ try {
+ String uri = "amqp://127.0.0.1:" +
+ brokerService.getTransportConnectorByName("amqp").getPublishableConnectURI().getPort();
+
+ if (!getAmqpConnectionURIOptions().isEmpty()) {
+ uri = uri + "?" + getAmqpConnectionURIOptions();
+ }
+
+ return new URI(uri);
+ } catch (Exception e) {
+ throw new RuntimeException();
+ }
+ }
+
+ public String getAmqpFailoverURI() throws Exception {
+ StringBuilder uri = new StringBuilder();
+ uri.append("failover://(");
+ uri.append(brokerService.getTransportConnectorByName("amqp").getPublishableConnectString());
+
+ for (BrokerService broker : brokers) {
+ uri.append(",");
+ uri.append(broker.getTransportConnectorByName("amqp").getPublishableConnectString());
+ }
+
+ uri.append(")");
+
+ return uri.toString();
+ }
+
+ public Connection createAmqpConnection() throws Exception {
+ return createAmqpConnection(getBrokerAmqpConnectionURI());
+ }
+
+ public Connection createAmqpConnection(String username, String password) throws Exception {
+ return createAmqpConnection(getBrokerAmqpConnectionURI(), username, password);
+ }
+
+ public Connection createAmqpConnection(URI brokerURI) throws Exception {
+ return createAmqpConnection(brokerURI, null, null);
+ }
+
+ public Connection createAmqpConnection(URI brokerURI, String username, String password) throws Exception {
+ ConnectionFactory factory = createAmqpConnectionFactory(brokerURI, username, password);
+ return factory.createConnection();
+ }
+
+ public ConnectionFactory createAmqpConnectionFactory() throws Exception {
+ return createAmqpConnectionFactory(getBrokerAmqpConnectionURI(), null, null);
+ }
+
+ public ConnectionFactory createAmqpConnectionFactory(URI brokerURI) throws Exception {
+ return createAmqpConnectionFactory(brokerURI, null, null);
+ }
+
+ public ConnectionFactory createAmqpConnectionFactory(String username, String password) throws Exception {
+ return createAmqpConnectionFactory(getBrokerAmqpConnectionURI(), username, password);
+ }
+
+ public ConnectionFactory createAmqpConnectionFactory(URI brokerURI, String username, String password) throws Exception {
+ JmsConnectionFactory factory = new JmsConnectionFactory(brokerURI);
+ factory.setForceAsyncSend(isForceAsyncSends());
+ factory.setAlwaysSyncSend(isAlwaysSyncSend());
+ factory.setMessagePrioritySupported(isMessagePrioritySupported());
+ factory.setSendAcksAsync(isSendAcksAsync());
+ if (username != null) {
+ factory.setUsername(username);
+ }
+ if (password != null) {
+ factory.setPassword(password);
+ }
+ return factory;
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org