You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/02/26 22:20:36 UTC
[1/7] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5617
Repository: activemq
Updated Branches:
refs/heads/master 79568f16c -> eaf773da5
https://issues.apache.org/jira/browse/AMQ-5617
Remove this older test as the test in JMSClientTest ->
testProduceConsumer covers this accross all four transport variants.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7a8085c0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7a8085c0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7a8085c0
Branch: refs/heads/master
Commit: 7a8085c0a49c0cac865ea4a6a91777c49790664d
Parents: 79568f1
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Feb 26 14:35:51 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Feb 26 14:35:51 2015 -0500
----------------------------------------------------------------------
.../transport/amqp/bugs/AMQ4753Test.java | 73 --------------------
1 file changed, 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/7a8085c0/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4753Test.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4753Test.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4753Test.java
deleted file mode 100644
index 4258039..0000000
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4753Test.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.bugs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.transport.amqp.AmqpTestSupport;
-import org.apache.activemq.transport.amqp.JmsClientContext;
-import org.junit.Test;
-
-public class AMQ4753Test extends AmqpTestSupport {
-
- @Override
- protected boolean isUseTcpConnector() {
- return false;
- }
-
- @Override
- protected boolean isUseNioPlusSslConnector() {
- return true;
- }
-
- @Test(timeout = 120 * 1000)
- public void testAmqpNioPlusSslSendReceive() throws JMSException {
- Connection connection = JmsClientContext.INSTANCE.createConnection(amqpNioPlusSslURI);
- runSimpleSendReceiveTest(connection);
- }
-
- public void runSimpleSendReceiveTest(Connection connection) throws JMSException{
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue =session.createQueue("txqueue");
- MessageProducer producer = session.createProducer(queue);
- TextMessage message = session.createTextMessage();
- String messageText = "hello sent at " + new java.util.Date().toString();
- message.setText(messageText);
- producer.send(message);
-
- // Get the message we just sent
- MessageConsumer consumer = session.createConsumer(queue);
- connection.start();
- Message receivedMessage = consumer.receive(5000);
- assertNotNull(receivedMessage);
- assertTrue(receivedMessage instanceof TextMessage);
- TextMessage textMessage = (TextMessage) receivedMessage;
- assertEquals(messageText, textMessage.getText());
- connection.close();
- }
-}
[7/7] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5617
Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5617
Fix test failure and give it a more meaningful name.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/eaf773da
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/eaf773da
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/eaf773da
Branch: refs/heads/master
Commit: eaf773da578a2827f9fe87be05676148ac31e90e
Parents: 276ef15
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Feb 26 16:06:33 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Feb 26 16:06:33 2015 -0500
----------------------------------------------------------------------
.../java/org/apache/activemq/transport/amqp/JMSClientTest.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/eaf773da/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index 0cdb895..3801168 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -886,7 +886,7 @@ public class JMSClientTest extends JMSClientTestSupport {
}
@Test(timeout=30*1000)
- public void simpleDurableTopicTest() throws Exception {
+ public void testDurableTopicStateAfterSubscriberClosed() throws Exception {
String durableClientId = getDestinationName() + "-ClientId";
String durableSubscriberName = getDestinationName() + "-SubscriptionName";
@@ -896,7 +896,7 @@ public class JMSClientTest extends JMSClientTestSupport {
LOG.debug(">>>> At Start, durable Subscribers {} inactiveDurableSubscribers {}", durableSubscribersAtStart, inactiveSubscribersAtStart);
TopicConnection subscriberConnection =
- JMSClientContext.INSTANCE.createTopicConnection(amqpURI, "admin", "password");
+ JMSClientContext.INSTANCE.createTopicConnection(getBrokerURI(), "admin", "password");
subscriberConnection.setClientID(durableClientId);
TopicSession subscriberSession = subscriberConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = subscriberSession.createTopic(getDestinationName());
[3/7] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5617
Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5617
Absorbe these into the general JMSClientTest that covers all four
transport types.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4fa420bb
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4fa420bb
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4fa420bb
Branch: refs/heads/master
Commit: 4fa420bbd198167281cacf96313fa5a04da055ae
Parents: 12202c9
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Feb 26 14:48:57 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Feb 26 14:48:57 2015 -0500
----------------------------------------------------------------------
.../activemq/transport/amqp/AMQ4696Test.java | 68 ---------
.../activemq/transport/amqp/JMSClientTest.java | 38 +++++
.../transport/amqp/bugs/AMQ4914Test.java | 109 --------------
.../transport/amqp/bugs/AMQ5256Test.java | 149 -------------------
4 files changed, 38 insertions(+), 326 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/4fa420bb/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4696Test.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4696Test.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4696Test.java
deleted file mode 100644
index 677374e..0000000
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4696Test.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-
-import org.apache.activemq.broker.jmx.BrokerView;
-import org.junit.Test;
-
-public class AMQ4696Test extends AmqpTestSupport {
-
- @Test(timeout=30*1000)
- public void simpleDurableTopicTest() throws Exception {
- String TOPIC_NAME = "AMQ4696Test" + System.currentTimeMillis();
- String durableClientId = "AMQPDurableTopicTestClient";
- String durableSubscriberName = "durableSubscriberName";
-
- BrokerView adminView = this.brokerService.getAdminView();
- int durableSubscribersAtStart = adminView.getDurableTopicSubscribers().length;
- int inactiveSubscribersAtStart = adminView.getInactiveDurableTopicSubscribers().length;
- LOG.debug(">>>> At Start, durable Subscribers {} inactiveDurableSubscribers {}", durableSubscribersAtStart, inactiveSubscribersAtStart);
-
- TopicConnection subscriberConnection =
- JmsClientContext.INSTANCE.createTopicConnection(amqpURI, "admin", "password");
- subscriberConnection.setClientID(durableClientId);
- TopicSession subscriberSession = subscriberConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = subscriberSession.createTopic(TOPIC_NAME);
- TopicSubscriber messageConsumer = subscriberSession.createDurableSubscriber(topic, durableSubscriberName);
-
- assertNotNull(messageConsumer);
-
- int durableSubscribers = adminView.getDurableTopicSubscribers().length;
- int inactiveSubscribers = adminView.getInactiveDurableTopicSubscribers().length;
- LOG.debug(">>>> durable Subscribers after creation {} inactiveDurableSubscribers {}", durableSubscribers, inactiveSubscribers);
- assertEquals("Wrong number of durable subscribers after first subscription", 1, (durableSubscribers - durableSubscribersAtStart));
- assertEquals("Wrong number of inactive durable subscribers after first subscription", 0, (inactiveSubscribers - inactiveSubscribersAtStart));
-
- subscriberConnection.close();
- subscriberConnection = null;
-
- durableSubscribers = adminView.getDurableTopicSubscribers().length;
- inactiveSubscribers = adminView.getInactiveDurableTopicSubscribers().length;
- LOG.debug(">>>> durable Subscribers after close {} inactiveDurableSubscribers {}", durableSubscribers, inactiveSubscribers);
- assertEquals("Wrong number of durable subscribers after close", 0, (durableSubscribersAtStart));
- assertEquals("Wrong number of inactive durable subscribers after close", 1, (inactiveSubscribers - inactiveSubscribersAtStart));
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq/blob/4fa420bb/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index 34f20ba..250053a 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -45,7 +45,11 @@ import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.ConnectorViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
@@ -881,6 +885,40 @@ public class JMSClientTest extends JMSClientTestSupport {
assertEquals(messageText, textMessage.getText());
}
+ @Test(timeout=30*1000)
+ public void simpleDurableTopicTest() throws Exception {
+ String durableClientId = getDestinationName() + "-ClientId";
+ String durableSubscriberName = getDestinationName() + "-SubscriptionName";
+
+ BrokerView adminView = this.brokerService.getAdminView();
+ int durableSubscribersAtStart = adminView.getDurableTopicSubscribers().length;
+ int inactiveSubscribersAtStart = adminView.getInactiveDurableTopicSubscribers().length;
+ LOG.debug(">>>> At Start, durable Subscribers {} inactiveDurableSubscribers {}", durableSubscribersAtStart, inactiveSubscribersAtStart);
+
+ TopicConnection subscriberConnection =
+ JmsClientContext.INSTANCE.createTopicConnection(amqpURI, "admin", "password");
+ subscriberConnection.setClientID(durableClientId);
+ TopicSession subscriberSession = subscriberConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = subscriberSession.createTopic(getDestinationName());
+ TopicSubscriber messageConsumer = subscriberSession.createDurableSubscriber(topic, durableSubscriberName);
+
+ assertNotNull(messageConsumer);
+
+ int durableSubscribers = adminView.getDurableTopicSubscribers().length;
+ int inactiveSubscribers = adminView.getInactiveDurableTopicSubscribers().length;
+ LOG.debug(">>>> durable Subscribers after creation {} inactiveDurableSubscribers {}", durableSubscribers, inactiveSubscribers);
+ assertEquals("Wrong number of durable subscribers after first subscription", 1, (durableSubscribers - durableSubscribersAtStart));
+ assertEquals("Wrong number of inactive durable subscribers after first subscription", 0, (inactiveSubscribers - inactiveSubscribersAtStart));
+
+ subscriberConnection.close();
+
+ durableSubscribers = adminView.getDurableTopicSubscribers().length;
+ inactiveSubscribers = adminView.getInactiveDurableTopicSubscribers().length;
+ LOG.debug(">>>> durable Subscribers after close {} inactiveDurableSubscribers {}", durableSubscribers, inactiveSubscribers);
+ assertEquals("Wrong number of durable subscribers after close", 0, (durableSubscribersAtStart));
+ assertEquals("Wrong number of inactive durable subscribers after close", 1, (inactiveSubscribers - inactiveSubscribersAtStart));
+ }
+
@Test(timeout=30000)
public void testDurableConsumerUnsubscribe() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
http://git-wip-us.apache.org/repos/asf/activemq/blob/4fa420bb/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4914Test.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4914Test.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4914Test.java
deleted file mode 100644
index d171ea9..0000000
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4914Test.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.bugs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.transport.amqp.AmqpTestSupport;
-import org.apache.activemq.transport.amqp.JmsClientContext;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4914Test extends AmqpTestSupport {
-
- @Rule
- public TestName testName = new TestName();
-
- protected static final Logger LOG = LoggerFactory.getLogger(AMQ4914Test.class);
-
- private String createLargeString(int sizeInBytes) {
- byte[] base = {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
- StringBuilder builder = new StringBuilder();
- for (int i = 0; i < sizeInBytes; i++) {
- builder.append(base[i % base.length]);
- }
-
- LOG.debug("Created string with size : " + builder.toString().getBytes().length + " bytes");
- return builder.toString();
- }
-
- @Test(timeout = 60 * 1000)
- public void testSendSmallerMessages() throws JMSException {
- for (int i = 512; i <= (8 * 1024); i += 512) {
- doTestSendLargeMessage(i);
- }
- }
-
- @Test(timeout = 60 * 1000)
- public void testSendFixedSizedMessages() throws JMSException {
- doTestSendLargeMessage(65536);
- doTestSendLargeMessage(65536 * 2);
- doTestSendLargeMessage(65536 * 4);
- }
-
- @Test(timeout = 60 * 1000)
- public void testSendHugeMessage() throws JMSException {
- doTestSendLargeMessage(1024 * 1024 * 10);
- }
-
- public void doTestSendLargeMessage(int expectedSize) throws JMSException{
- LOG.info("doTestSendLargeMessage called with expectedSize " + expectedSize);
- String payload = createLargeString(expectedSize);
- assertEquals(expectedSize, payload.getBytes().length);
-
- Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI);
- long startTime = System.currentTimeMillis();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(testName.getMethodName());
- MessageProducer producer = session.createProducer(queue);
- TextMessage message = session.createTextMessage();
- message.setText(payload);
- producer.send(message);
- long endTime = System.currentTimeMillis();
- LOG.info("Returned from send after {} ms", endTime - startTime);
-
- startTime = System.currentTimeMillis();
- MessageConsumer consumer = session.createConsumer(queue);
- connection.start();
- LOG.info("Calling receive");
- Message receivedMessage = consumer.receive();
- assertNotNull(receivedMessage);
- assertTrue(receivedMessage instanceof TextMessage);
- TextMessage receivedTextMessage = (TextMessage) receivedMessage;
- assertNotNull(receivedMessage);
- endTime = System.currentTimeMillis();
- LOG.info("Returned from receive after {} ms", endTime - startTime);
- String receivedText = receivedTextMessage.getText();
- assertEquals(expectedSize, receivedText.getBytes().length);
- assertEquals(payload, receivedText);
- connection.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq/blob/4fa420bb/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ5256Test.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ5256Test.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ5256Test.java
deleted file mode 100644
index 989e501..0000000
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ5256Test.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.bugs;
-
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-
-import org.apache.activemq.transport.amqp.AmqpTestSupport;
-import org.apache.activemq.transport.amqp.JmsClientContext;
-import org.junit.Test;
-
-public class AMQ5256Test extends AmqpTestSupport {
-
- @Override
- protected boolean isUseTcpConnector() {
- return true;
- }
-
- @Override
- protected boolean isUseSslConnector() {
- return true;
- }
-
- @Override
- protected boolean isUseNioConnector() {
- return true;
- }
-
- @Override
- protected boolean isUseNioPlusSslConnector() {
- return true;
- }
-
- @Test(timeout = 60000)
- public void testParallelConnectPlain() throws Exception {
- final int numThreads = 40;
- ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
- for (int i = 0; i < numThreads; i++) {
- executorService.execute(new Runnable() {
- @Override
- public void run() {
-
- try {
- Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI, "admin", "password");
- connection.start();
- connection.close();
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- });
- }
-
- executorService.shutdown();
- assertTrue("executor done on time", executorService.awaitTermination(30, TimeUnit.SECONDS));
- }
-
- @Test(timeout = 60000)
- public void testParallelConnectNio() throws Exception {
- final int numThreads = 40;
- ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
- for (int i = 0; i < numThreads; i++) {
- executorService.execute(new Runnable() {
- @Override
- public void run() {
-
- try {
- Connection connection = JmsClientContext.INSTANCE.createConnection(amqpNioURI, "admin", "password");
- connection.start();
- connection.close();
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- });
- }
-
- executorService.shutdown();
- assertTrue("executor done on time", executorService.awaitTermination(30, TimeUnit.SECONDS));
- }
-
- @Test(timeout = 60000)
- public void testParallelConnectSsl() throws Exception {
- final int numThreads = 40;
- ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
- for (int i = 0; i < numThreads; i++) {
- executorService.execute(new Runnable() {
- @Override
- public void run() {
-
- try {
- Connection connection = JmsClientContext.INSTANCE.createConnection(amqpSslURI, "admin", "password");
- connection.start();
- connection.close();
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- });
- }
-
- executorService.shutdown();
- assertTrue("executor done on time", executorService.awaitTermination(30, TimeUnit.SECONDS));
- }
-
- @Test(timeout = 60000)
- public void testParallelConnectNioPlusSsl() throws Exception {
- final int numThreads = 40;
- ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
- for (int i = 0; i < numThreads; i++) {
- executorService.execute(new Runnable() {
- @Override
- public void run() {
-
- try {
- Connection connection = JmsClientContext.INSTANCE.createConnection(amqpNioPlusSslURI, "admin", "password");
- connection.start();
- connection.close();
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- });
- }
-
- executorService.shutdown();
- assertTrue("executor done on time", executorService.awaitTermination(30, TimeUnit.SECONDS));
- }
-}
[6/7] activemq git commit: remove a warning.
Posted by ta...@apache.org.
remove a warning.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/276ef150
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/276ef150
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/276ef150
Branch: refs/heads/master
Commit: 276ef15024d43038a5db23ed9044309a691104f2
Parents: 240278d
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Feb 26 15:23:26 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Feb 26 15:23:26 2015 -0500
----------------------------------------------------------------------
.../transport/amqp/message/JMSMappingOutboundTransformerTest.java | 1 -
1 file changed, 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/276ef150/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
index ac92d8f..63d7842 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
@@ -17,7 +17,6 @@
package org.apache.activemq.transport.amqp.message;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
[2/7] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5617
Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5617
Rename these and move to the general tests folder as they cover specific
use cases that are applicable beyond the issue the were created for.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/12202c97
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/12202c97
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/12202c97
Branch: refs/heads/master
Commit: 12202c9702452b36a6cea66fc5516c94ebdd60b9
Parents: 7a8085c
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Feb 26 14:47:12 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Feb 26 14:47:12 2015 -0500
----------------------------------------------------------------------
.../amqp/JMSLargeMessageSendRecvTest.java | 107 ++++++++++++++
.../transport/amqp/JMSParallelConnectTest.java | 147 +++++++++++++++++++
2 files changed, 254 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/12202c97/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java
new file mode 100644
index 0000000..0abdad2
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JMSLargeMessageSendRecvTest extends AmqpTestSupport {
+
+ @Rule
+ public TestName testName = new TestName();
+
+ protected static final Logger LOG = LoggerFactory.getLogger(JMSLargeMessageSendRecvTest.class);
+
+ private String createLargeString(int sizeInBytes) {
+ byte[] base = {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < sizeInBytes; i++) {
+ builder.append(base[i % base.length]);
+ }
+
+ LOG.debug("Created string with size : " + builder.toString().getBytes().length + " bytes");
+ return builder.toString();
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testSendSmallerMessages() throws JMSException {
+ for (int i = 512; i <= (8 * 1024); i += 512) {
+ doTestSendLargeMessage(i);
+ }
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testSendFixedSizedMessages() throws JMSException {
+ doTestSendLargeMessage(65536);
+ doTestSendLargeMessage(65536 * 2);
+ doTestSendLargeMessage(65536 * 4);
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testSendHugeMessage() throws JMSException {
+ doTestSendLargeMessage(1024 * 1024 * 10);
+ }
+
+ public void doTestSendLargeMessage(int expectedSize) throws JMSException{
+ LOG.info("doTestSendLargeMessage called with expectedSize " + expectedSize);
+ String payload = createLargeString(expectedSize);
+ assertEquals(expectedSize, payload.getBytes().length);
+
+ Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI);
+ long startTime = System.currentTimeMillis();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(testName.getMethodName());
+ MessageProducer producer = session.createProducer(queue);
+ TextMessage message = session.createTextMessage();
+ message.setText(payload);
+ producer.send(message);
+ long endTime = System.currentTimeMillis();
+ LOG.info("Returned from send after {} ms", endTime - startTime);
+
+ startTime = System.currentTimeMillis();
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+ LOG.info("Calling receive");
+ Message receivedMessage = consumer.receive();
+ assertNotNull(receivedMessage);
+ assertTrue(receivedMessage instanceof TextMessage);
+ TextMessage receivedTextMessage = (TextMessage) receivedMessage;
+ assertNotNull(receivedMessage);
+ endTime = System.currentTimeMillis();
+ LOG.info("Returned from receive after {} ms", endTime - startTime);
+ String receivedText = receivedTextMessage.getText();
+ assertEquals(expectedSize, receivedText.getBytes().length);
+ assertEquals(payload, receivedText);
+ connection.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/12202c97/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSParallelConnectTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSParallelConnectTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSParallelConnectTest.java
new file mode 100644
index 0000000..d9debb5
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSParallelConnectTest.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+
+import org.junit.Test;
+
+public class JMSParallelConnectTest extends AmqpTestSupport {
+
+ @Override
+ protected boolean isUseTcpConnector() {
+ return true;
+ }
+
+ @Override
+ protected boolean isUseSslConnector() {
+ return true;
+ }
+
+ @Override
+ protected boolean isUseNioConnector() {
+ return true;
+ }
+
+ @Override
+ protected boolean isUseNioPlusSslConnector() {
+ return true;
+ }
+
+ @Test(timeout = 60000)
+ public void testParallelConnectPlain() throws Exception {
+ final int numThreads = 40;
+ ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+ for (int i = 0; i < numThreads; i++) {
+ executorService.execute(new Runnable() {
+ @Override
+ public void run() {
+
+ try {
+ Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI, "admin", "password");
+ connection.start();
+ connection.close();
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+ executorService.shutdown();
+ assertTrue("executor done on time", executorService.awaitTermination(30, TimeUnit.SECONDS));
+ }
+
+ @Test(timeout = 60000)
+ public void testParallelConnectNio() throws Exception {
+ final int numThreads = 40;
+ ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+ for (int i = 0; i < numThreads; i++) {
+ executorService.execute(new Runnable() {
+ @Override
+ public void run() {
+
+ try {
+ Connection connection = JmsClientContext.INSTANCE.createConnection(amqpNioURI, "admin", "password");
+ connection.start();
+ connection.close();
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+ executorService.shutdown();
+ assertTrue("executor done on time", executorService.awaitTermination(30, TimeUnit.SECONDS));
+ }
+
+ @Test(timeout = 60000)
+ public void testParallelConnectSsl() throws Exception {
+ final int numThreads = 40;
+ ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+ for (int i = 0; i < numThreads; i++) {
+ executorService.execute(new Runnable() {
+ @Override
+ public void run() {
+
+ try {
+ Connection connection = JmsClientContext.INSTANCE.createConnection(amqpSslURI, "admin", "password");
+ connection.start();
+ connection.close();
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+ executorService.shutdown();
+ assertTrue("executor done on time", executorService.awaitTermination(30, TimeUnit.SECONDS));
+ }
+
+ @Test(timeout = 60000)
+ public void testParallelConnectNioPlusSsl() throws Exception {
+ final int numThreads = 40;
+ ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+ for (int i = 0; i < numThreads; i++) {
+ executorService.execute(new Runnable() {
+ @Override
+ public void run() {
+
+ try {
+ Connection connection = JmsClientContext.INSTANCE.createConnection(amqpNioPlusSslURI, "admin", "password");
+ connection.start();
+ connection.close();
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+ executorService.shutdown();
+ assertTrue("executor done on time", executorService.awaitTermination(30, TimeUnit.SECONDS));
+ }
+}
[5/7] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5617
Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5617
Consolidate remaining dirct JMS client type usages to the context.
Rename some tests so be consistent.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/240278db
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/240278db
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/240278db
Branch: refs/heads/master
Commit: 240278dbef63816cce91f34e1cd53cd16618fd46
Parents: 8f0bf60
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Feb 26 15:22:45 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Feb 26 15:22:45 2015 -0500
----------------------------------------------------------------------
.../activemq/transport/amqp/AMQ4563Test.java | 6 +-
.../transport/amqp/AmqpTransformerTest.java | 6 +-
.../transport/amqp/JMSClientContext.java | 164 ++++++++++++++
.../amqp/JMSClientRequestResponseTest.java | 212 +++++++++++++++++++
.../transport/amqp/JMSClientSimpleAuthTest.java | 165 +++++++++++++++
.../activemq/transport/amqp/JMSClientTest.java | 2 +-
.../amqp/JMSConcurrentConsumersTest.java | 4 +-
.../amqp/JMSLargeMessageSendRecvTest.java | 2 +-
.../transport/amqp/JMSMessageGroupsTest.java | 102 +++++++++
.../transport/amqp/JMSParallelConnectTest.java | 8 +-
.../transport/amqp/JmsClientContext.java | 136 ------------
.../amqp/JmsClientRequestResponseTest.java | 212 -------------------
.../transport/amqp/JmsMessageGroupsTest.java | 102 ---------
.../transport/amqp/SimpleAMQPAuthTest.java | 190 -----------------
14 files changed, 657 insertions(+), 654 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/240278db/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java
index 63f3216..88e0a44 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java
@@ -76,7 +76,7 @@ public class AMQ4563Test extends AmqpTestSupport {
ActiveMQAdmin.enableJMSFrameTracing();
assertTrue(brokerService.isPersistent());
- Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI);
+ Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.getMethodName());
@@ -122,7 +122,7 @@ public class AMQ4563Test extends AmqpTestSupport {
ActiveMQAdmin.enableJMSFrameTracing();
assertTrue(brokerService.isPersistent());
- Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI);
+ Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.getMethodName());
@@ -152,7 +152,7 @@ public class AMQ4563Test extends AmqpTestSupport {
}
private int readAllMessages(String queueName, String selector) throws JMSException {
- Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI);
+ Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI);
connection.start();
try {
http://git-wip-us.apache.org/repos/asf/activemq/blob/240278db/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
index 65d38a4..0c2c6f7 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
@@ -57,7 +57,7 @@ public class AmqpTransformerTest {
startBrokerWithAmqpTransport(String.format(AMQP_URL, "?transport.transformer=native"));
// send "text message" with AMQP JMS API
- Connection amqpConnection = JmsClientContext.INSTANCE.createConnection(amqpConnectionURI);
+ Connection amqpConnection = JMSClientContext.INSTANCE.createConnection(amqpConnectionURI);
amqpConnection.start();
Session amqpSession = amqpConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -104,7 +104,7 @@ public class AmqpTransformerTest {
startBrokerWithAmqpTransport(String.format(AMQP_URL, "?transport.transformer=raw"));
// send "text message" with AMQP JMS API
- Connection amqpConnection = JmsClientContext.INSTANCE.createConnection(amqpConnectionURI);
+ Connection amqpConnection = JMSClientContext.INSTANCE.createConnection(amqpConnectionURI);
amqpConnection.start();
Session amqpSession = amqpConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -156,7 +156,7 @@ public class AmqpTransformerTest {
startBrokerWithAmqpTransport(String.format(AMQP_URL, "?transport.transformer=jms"));
// send "text message" with AMQP JMS API
- Connection amqpConnection = JmsClientContext.INSTANCE.createConnection(amqpConnectionURI);
+ Connection amqpConnection = JMSClientContext.INSTANCE.createConnection(amqpConnectionURI);
amqpConnection.start();
Session amqpSession = amqpConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
http://git-wip-us.apache.org/repos/asf/activemq/blob/240278db/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientContext.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientContext.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientContext.java
new file mode 100644
index 0000000..81b0399
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientContext.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import java.net.URI;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.QueueConnection;
+import javax.jms.TopicConnection;
+
+import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Context used for AMQP JMS Clients to create connection instances.
+ */
+public class JMSClientContext {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JMSClientContext.class);
+
+ public static final JMSClientContext INSTANCE = new JMSClientContext();
+
+ //----- Plain JMS Connection Create methods ------------------------------//
+
+ public Connection createConnection(URI remoteURI) throws JMSException {
+ return createConnection(remoteURI, null, null, true);
+ }
+
+ public Connection createConnection(URI remoteURI, String username, String password) throws JMSException {
+ return createConnection(remoteURI, username, password, null, true);
+ }
+
+ public Connection createConnection(URI remoteURI, String username, String password, boolean syncPublish) throws JMSException {
+ return createConnection(remoteURI, username, password, null, syncPublish);
+ }
+
+ public Connection createConnection(URI remoteURI, String username, String password, String clientId) throws JMSException {
+ return createConnection(remoteURI, username, password, clientId, true);
+ }
+
+ public Connection createConnection(URI remoteURI, String username, String password, String clientId, boolean syncPublish) throws JMSException {
+ ConnectionFactoryImpl factory = createConnectionFactory(remoteURI, username, password, clientId, syncPublish);
+
+ Connection connection = factory.createConnection();
+ connection.setExceptionListener(new ExceptionListener() {
+ @Override
+ public void onException(JMSException exception) {
+ LOG.error("Unexpected exception ", exception);
+ exception.printStackTrace();
+ }
+ });
+
+ return connection;
+ }
+
+ //----- JMS TopicConnection Create methods -------------------------------//
+
+ public TopicConnection createTopicConnection(URI remoteURI) throws JMSException {
+ return createTopicConnection(remoteURI, null, null, true);
+ }
+
+ public TopicConnection createTopicConnection(URI remoteURI, String username, String password) throws JMSException {
+ return createTopicConnection(remoteURI, username, password, null, true);
+ }
+
+ public TopicConnection createTopicConnection(URI remoteURI, String username, String password, boolean syncPublish) throws JMSException {
+ return createTopicConnection(remoteURI, username, password, null, syncPublish);
+ }
+
+ public TopicConnection createTopicConnection(URI remoteURI, String username, String password, String clientId) throws JMSException {
+ return createTopicConnection(remoteURI, username, password, clientId, true);
+ }
+
+ public TopicConnection createTopicConnection(URI remoteURI, String username, String password, String clientId, boolean syncPublish) throws JMSException {
+ ConnectionFactoryImpl factory = createConnectionFactory(remoteURI, username, password, clientId, syncPublish);
+
+ TopicConnection connection = factory.createTopicConnection();
+ connection.setExceptionListener(new ExceptionListener() {
+ @Override
+ public void onException(JMSException exception) {
+ LOG.error("Unexpected exception ", exception);
+ exception.printStackTrace();
+ }
+ });
+
+ return connection;
+ }
+
+ //----- JMS QueueConnection Create methods -------------------------------//
+
+ public QueueConnection createQueueConnection(URI remoteURI) throws JMSException {
+ return createQueueConnection(remoteURI, null, null, true);
+ }
+
+ public QueueConnection createQueueConnection(URI remoteURI, String username, String password) throws JMSException {
+ return createQueueConnection(remoteURI, username, password, null, true);
+ }
+
+ public QueueConnection createQueueConnection(URI remoteURI, String username, String password, boolean syncPublish) throws JMSException {
+ return createQueueConnection(remoteURI, username, password, null, syncPublish);
+ }
+
+ public QueueConnection createQueueConnection(URI remoteURI, String username, String password, String clientId) throws JMSException {
+ return createQueueConnection(remoteURI, username, password, clientId, true);
+ }
+
+ public QueueConnection createQueueConnection(URI remoteURI, String username, String password, String clientId, boolean syncPublish) throws JMSException {
+ ConnectionFactoryImpl factory = createConnectionFactory(remoteURI, username, password, clientId, syncPublish);
+
+ QueueConnection connection = factory.createQueueConnection();
+ connection.setExceptionListener(new ExceptionListener() {
+ @Override
+ public void onException(JMSException exception) {
+ LOG.error("Unexpected exception ", exception);
+ exception.printStackTrace();
+ }
+ });
+
+ return connection;
+ }
+
+ //------ Internal Implementation bits ------------------------------------//
+
+ private ConnectionFactoryImpl createConnectionFactory(
+ URI remoteURI, String username, String password, String clientId, boolean syncPublish) {
+
+ boolean useSSL = remoteURI.getScheme().toLowerCase().contains("ssl");
+
+ LOG.debug("In createConnectionFactory using port {} ssl? {}", remoteURI.getPort(), useSSL);
+
+ ConnectionFactoryImpl factory =
+ new ConnectionFactoryImpl(remoteURI.getHost(), remoteURI.getPort(), username, password, clientId, useSSL);
+
+ if (useSSL) {
+ factory.setKeyStorePath(System.getProperty("javax.net.ssl.trustStore"));
+ factory.setKeyStorePassword("password");
+ factory.setTrustStorePath(System.getProperty("javax.net.ssl.keyStore"));
+ factory.setTrustStorePassword("password");
+ }
+
+ factory.setTopicPrefix("topic://");
+ factory.setQueuePrefix("queue://");
+ factory.setSyncPublish(syncPublish);
+
+ return factory;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/240278db/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientRequestResponseTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientRequestResponseTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientRequestResponseTest.java
new file mode 100644
index 0000000..7d8a44f
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientRequestResponseTest.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.List;
+import java.util.Vector;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JMSClientRequestResponseTest extends AmqpTestSupport implements MessageListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JMSClientRequestResponseTest.class);
+
+ @Rule public TestName name = new TestName();
+
+ private Connection requestorConnection;
+ private Destination requestDestination;
+ private Session requestorSession;
+
+ private Connection responderConnection;
+ private MessageProducer responseProducer;
+ private Session responderSession;
+ private Destination replyDestination;
+
+ private final List<JMSException> failures = new Vector<JMSException>();
+ private boolean dynamicallyCreateProducer;
+ private final boolean useAsyncConsumer = true;
+ private Thread syncThread;
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ if (requestorConnection != null) {
+ try {
+ requestorConnection.close();
+ } catch (Exception e) {}
+ }
+ if (responderConnection != null) {
+ try {
+ responderConnection.close();
+ } catch (Exception e) {}
+ }
+
+ if (syncThread != null) {
+ syncThread.join(5000);
+ }
+
+ super.tearDown();
+ }
+
+ private void doSetupConnections(boolean topic) throws Exception {
+ responderConnection = createConnection(name.getMethodName() + "-responder");
+ responderSession = responderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ if (topic) {
+ requestDestination = responderSession.createTopic(name.getMethodName());
+ } else {
+ requestDestination = responderSession.createQueue(name.getMethodName());
+ }
+ responseProducer = responderSession.createProducer(null);
+
+ final MessageConsumer requestConsumer = responderSession.createConsumer(requestDestination);
+ if (useAsyncConsumer) {
+ requestConsumer.setMessageListener(this);
+ } else {
+ syncThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ syncConsumeLoop(requestConsumer);
+ }
+ });
+ syncThread.start();
+ }
+ responderConnection.start();
+
+ requestorConnection = createConnection(name.getMethodName() + "-requestor");
+ requestorSession = requestorConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ if (topic) {
+ replyDestination = requestorSession.createTemporaryTopic();
+ } else {
+ replyDestination = requestorSession.createTemporaryQueue();
+ }
+ requestorConnection.start();
+ }
+
+ @Test(timeout=60000)
+ public void testRequestResponseToTempQueue() throws Exception {
+ doSetupConnections(false);
+ doTestRequestResponse();
+ }
+
+ @Test(timeout=60000)
+ public void testRequestResponseToTempTopic() throws Exception {
+ doSetupConnections(true);
+ doTestRequestResponse();
+ }
+
+ private void doTestRequestResponse() throws Exception {
+
+ MessageProducer requestProducer = requestorSession.createProducer(requestDestination);
+ MessageConsumer replyConsumer = requestorSession.createConsumer(replyDestination);
+
+ TextMessage requestMessage = requestorSession.createTextMessage("SomeRequest");
+ requestMessage.setJMSReplyTo(replyDestination);
+ requestProducer.send(requestMessage);
+
+ LOG.info("Sent request to destination: {}", requestDestination.toString());
+
+ Message msg = replyConsumer.receive(10000);
+
+ if (msg instanceof TextMessage) {
+ TextMessage replyMessage = (TextMessage)msg;
+ LOG.info("Received reply.");
+ LOG.info(replyMessage.toString());
+ assertTrue("Wrong message content", replyMessage.getText().startsWith("response"));
+ } else {
+ fail("Should have received a reply by now");
+ }
+ replyConsumer.close();
+
+ assertEquals("Should not have had any failures: " + failures, 0, failures.size());
+ }
+
+ private Connection createConnection(String clientId) throws JMSException {
+ return JMSClientContext.INSTANCE.createConnection(amqpURI, "admin", "password", clientId);
+ }
+
+ protected void syncConsumeLoop(MessageConsumer requestConsumer) {
+ try {
+ Message message = requestConsumer.receive(5000);
+ if (message != null) {
+ onMessage(message);
+ } else {
+ LOG.error("No message received");
+ }
+ } catch (JMSException e) {
+ onException(e);
+ }
+ }
+
+ @Override
+ public void onMessage(Message message) {
+ try {
+ TextMessage requestMessage = (TextMessage)message;
+
+ LOG.info("Received request.");
+ LOG.info(requestMessage.toString());
+
+ Destination replyDestination = requestMessage.getJMSReplyTo();
+ if (replyDestination instanceof Topic) {
+ LOG.info("Reply destination is: {}", ((Topic)replyDestination).getTopicName());
+ } else {
+ LOG.info("Reply destination is: {}", ((Queue)replyDestination).getQueueName());
+ }
+
+ TextMessage replyMessage = responderSession.createTextMessage("response for: " + requestMessage.getText());
+ replyMessage.setJMSCorrelationID(requestMessage.getJMSMessageID());
+
+ if (dynamicallyCreateProducer) {
+ responseProducer = responderSession.createProducer(replyDestination);
+ responseProducer.send(replyMessage);
+ } else {
+ responseProducer.send(replyDestination, replyMessage);
+ }
+
+ LOG.info("Sent reply.");
+ LOG.info(replyMessage.toString());
+ } catch (JMSException e) {
+ onException(e);
+ }
+ }
+
+ protected void onException(JMSException e) {
+ LOG.info("Caught: " + e);
+ e.printStackTrace();
+ failures.add(e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/240278db/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSimpleAuthTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSimpleAuthTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSimpleAuthTest.java
new file mode 100644
index 0000000..064e132
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSimpleAuthTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.URI;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.qpid.amqp_1_0.client.ConnectionClosedException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JMSClientSimpleAuthTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JMSClientSimpleAuthTest.class);
+
+ private final String SIMPLE_AUTH_AMQP_BROKER_XML =
+ "org/apache/activemq/transport/amqp/simple-auth-amqp-broker.xml";
+ private BrokerService brokerService;
+ private URI amqpURI;
+
+ @Before
+ public void setUp() throws Exception {
+ startBroker();
+ }
+
+ @After
+ public void stopBroker() throws Exception {
+ if (brokerService != null) {
+ brokerService.stop();
+ brokerService = null;
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testNoUserOrPassword() throws Exception {
+ try {
+ Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "", "");
+ connection.start();
+ Thread.sleep(500);
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ fail("Expected JMSException");
+ } catch (JMSException e) {
+ Exception linkedException = e.getLinkedException();
+ if (linkedException != null && linkedException instanceof ConnectionClosedException) {
+ ConnectionClosedException cce = (ConnectionClosedException) linkedException;
+ assertEquals("Error{condition=unauthorized-access,description=User name [null] or password is invalid.}", cce.getRemoteError().toString());
+ } else {
+ LOG.error("Unexpected Exception", e);
+ fail("Unexpected exception: " + e.getMessage());
+ }
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testUnknownUser() throws Exception {
+ try {
+ Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "nosuchuser", "blah");
+ connection.start();
+ Thread.sleep(500);
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ fail("Expected JMSException");
+ } catch (JMSException e) {
+ Exception linkedException = e.getLinkedException();
+ if (linkedException != null && linkedException instanceof ConnectionClosedException) {
+ ConnectionClosedException cce = (ConnectionClosedException) linkedException;
+ assertEquals("Error{condition=unauthorized-access,description=User name [nosuchuser] or password is invalid.}", cce.getRemoteError().toString());
+ } else {
+ LOG.error("Unexpected Exception", e);
+ fail("Unexpected exception: " + e.getMessage());
+ }
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testKnownUserWrongPassword() throws Exception {
+ try {
+ Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "user", "wrongPassword");
+ connection.start();
+ Thread.sleep(500);
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ fail("Expected JMSException");
+ } catch (JMSException e) {
+ Exception linkedException = e.getLinkedException();
+ if (linkedException != null && linkedException instanceof ConnectionClosedException) {
+ ConnectionClosedException cce = (ConnectionClosedException) linkedException;
+ assertEquals("Error{condition=unauthorized-access,description=User name [user] or password is invalid.}", cce.getRemoteError().toString());
+ } else {
+ LOG.error("Unexpected Exception", e);
+ fail("Unexpected exception: " + e.getMessage());
+ }
+ }
+ }
+
+ @Test(timeout = 30000)
+ public void testSendReceive() throws Exception {
+ Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "user", "userPassword");
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("txQueue");
+ MessageProducer p = session.createProducer(queue);
+ TextMessage message = null;
+ message = session.createTextMessage();
+ String messageText = "hello sent at " + new java.util.Date().toString();
+ message.setText(messageText);
+ p.send(message);
+
+ // Get the message we just sent
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+ Message msg = consumer.receive(5000);
+ assertNotNull(msg);
+ assertTrue(msg instanceof TextMessage);
+ TextMessage textMessage = (TextMessage) msg;
+ assertEquals(messageText, textMessage.getText());
+ connection.close();
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ return createBroker(SIMPLE_AUTH_AMQP_BROKER_XML);
+ }
+
+ protected BrokerService createBroker(String uri) throws Exception {
+ LOG.debug(">>>>> Loading broker configuration from the classpath with URI: {}", uri);
+ return BrokerFactory.createBroker(new URI("xbean:" + uri));
+ }
+
+ public void startBroker() throws Exception {
+ brokerService = createBroker();
+ brokerService.start();
+ amqpURI = brokerService.getTransportConnectorByName("amqp").getPublishableConnectURI();
+ brokerService.waitUntilStarted();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/activemq/blob/240278db/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index 250053a..0cdb895 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -896,7 +896,7 @@ public class JMSClientTest extends JMSClientTestSupport {
LOG.debug(">>>> At Start, durable Subscribers {} inactiveDurableSubscribers {}", durableSubscribersAtStart, inactiveSubscribersAtStart);
TopicConnection subscriberConnection =
- JmsClientContext.INSTANCE.createTopicConnection(amqpURI, "admin", "password");
+ JMSClientContext.INSTANCE.createTopicConnection(amqpURI, "admin", "password");
subscriberConnection.setClientID(durableClientId);
TopicSession subscriberSession = subscriberConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = subscriberSession.createTopic(getDestinationName());
http://git-wip-us.apache.org/repos/asf/activemq/blob/240278db/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSConcurrentConsumersTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSConcurrentConsumersTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSConcurrentConsumersTest.java
index 2d7b34f..62c70cd 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSConcurrentConsumersTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSConcurrentConsumersTest.java
@@ -101,7 +101,7 @@ public class JMSConcurrentConsumersTest extends AmqpTestSupport {
public void doTestSendWithMultipleConsumers(URI remoteURI) throws Exception {
- Connection connection = JmsClientContext.INSTANCE.createConnection(remoteURI, "admin", "password", false);
+ Connection connection = JMSClientContext.INSTANCE.createConnection(remoteURI, "admin", "password", false);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String destinationName = "AMQ4920Test" + System.currentTimeMillis();
Destination destination = session.createTopic(destinationName);
@@ -170,7 +170,7 @@ public class JMSConcurrentConsumersTest extends AmqpTestSupport {
LOG.debug(consumerName + " starting");
Connection connection = null;
try {
- connection = JmsClientContext.INSTANCE.createConnection(amqpURI, "admin", "admin", false);
+ connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "admin", "admin", false);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(destinationName);
MessageConsumer consumer = session.createConsumer(destination);
http://git-wip-us.apache.org/repos/asf/activemq/blob/240278db/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java
index 0abdad2..ef6eaba 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java
@@ -77,7 +77,7 @@ public class JMSLargeMessageSendRecvTest extends AmqpTestSupport {
String payload = createLargeString(expectedSize);
assertEquals(expectedSize, payload.getBytes().length);
- Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI);
+ Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI);
long startTime = System.currentTimeMillis();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(testName.getMethodName());
http://git-wip-us.apache.org/repos/asf/activemq/blob/240278db/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSMessageGroupsTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSMessageGroupsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSMessageGroupsTest.java
new file mode 100644
index 0000000..8b1e786
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSMessageGroupsTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JMSMessageGroupsTest extends JMSClientTestSupport {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(JMSMessageGroupsTest.class);
+
+ private static final int ITERATIONS = 10;
+ private static final int MESSAGE_COUNT = 10;
+ private static final int MESSAGE_SIZE = 200 * 1024;
+ private static final int RECEIVE_TIMEOUT = 3000;
+ private static final String JMSX_GROUP_ID = "JmsGroupsTest";
+
+ @Test(timeout = 60000)
+ public void testGroupSeqIsNeverLost() throws Exception {
+ AtomicInteger sequenceCounter = new AtomicInteger();
+
+ for (int i = 0; i < ITERATIONS; ++i) {
+ connection = createConnection();
+ {
+ sendMessagesToBroker(MESSAGE_COUNT, sequenceCounter);
+ readMessagesOnBroker(MESSAGE_COUNT);
+ }
+ connection.close();
+ }
+ }
+
+ protected void readMessagesOnBroker(int count) throws Exception {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getDestinationName());
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ for (int i = 0; i < MESSAGE_COUNT; ++i) {
+ Message message = consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull(message);
+ LOG.debug("Read message #{}: type = {}", i, message.getClass().getSimpleName());
+ String gid = message.getStringProperty("JMSXGroupID");
+ String seq = message.getStringProperty("JMSXGroupSeq");
+ LOG.debug("Message assigned JMSXGroupID := {}", gid);
+ LOG.debug("Message assigned JMSXGroupSeq := {}", seq);
+ }
+
+ consumer.close();
+ }
+
+ protected void sendMessagesToBroker(int count, AtomicInteger sequence) throws Exception {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getDestinationName());
+ MessageProducer producer = session.createProducer(queue);
+
+ byte[] buffer = new byte[MESSAGE_SIZE];
+ for (count = 0; count < MESSAGE_SIZE; count++) {
+ String s = String.valueOf(count % 10);
+ Character c = s.charAt(0);
+ int value = c.charValue();
+ buffer[count] = (byte) value;
+ }
+
+ LOG.debug("Sending {} messages to destination: {}", MESSAGE_COUNT, queue);
+ for (int i = 1; i <= MESSAGE_COUNT; i++) {
+ BytesMessage message = session.createBytesMessage();
+ message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+ message.setStringProperty("JMSXGroupID", JMSX_GROUP_ID);
+ message.setIntProperty("JMSXGroupSeq", sequence.incrementAndGet());
+ message.writeBytes(buffer);
+ producer.send(message);
+ }
+
+ producer.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/240278db/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSParallelConnectTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSParallelConnectTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSParallelConnectTest.java
index d9debb5..f02d3a9 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSParallelConnectTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSParallelConnectTest.java
@@ -59,7 +59,7 @@ public class JMSParallelConnectTest extends AmqpTestSupport {
public void run() {
try {
- Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI, "admin", "password");
+ Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "admin", "password");
connection.start();
connection.close();
} catch (JMSException e) {
@@ -83,7 +83,7 @@ public class JMSParallelConnectTest extends AmqpTestSupport {
public void run() {
try {
- Connection connection = JmsClientContext.INSTANCE.createConnection(amqpNioURI, "admin", "password");
+ Connection connection = JMSClientContext.INSTANCE.createConnection(amqpNioURI, "admin", "password");
connection.start();
connection.close();
} catch (JMSException e) {
@@ -107,7 +107,7 @@ public class JMSParallelConnectTest extends AmqpTestSupport {
public void run() {
try {
- Connection connection = JmsClientContext.INSTANCE.createConnection(amqpSslURI, "admin", "password");
+ Connection connection = JMSClientContext.INSTANCE.createConnection(amqpSslURI, "admin", "password");
connection.start();
connection.close();
} catch (JMSException e) {
@@ -131,7 +131,7 @@ public class JMSParallelConnectTest extends AmqpTestSupport {
public void run() {
try {
- Connection connection = JmsClientContext.INSTANCE.createConnection(amqpNioPlusSslURI, "admin", "password");
+ Connection connection = JMSClientContext.INSTANCE.createConnection(amqpNioPlusSslURI, "admin", "password");
connection.start();
connection.close();
} catch (JMSException e) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/240278db/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientContext.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientContext.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientContext.java
deleted file mode 100644
index dd7b699..0000000
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientContext.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp;
-
-import java.net.URI;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.QueueConnection;
-import javax.jms.TopicConnection;
-
-import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Context used for AMQP JMS Clients to create connection instances.
- */
-public class JmsClientContext {
-
- private static final Logger LOG = LoggerFactory.getLogger(JmsClientContext.class);
-
- public static final JmsClientContext INSTANCE = new JmsClientContext();
-
- //----- Plain JMS Connection Create methods ------------------------------//
-
- public Connection createConnection(URI remoteURI) throws JMSException {
- return createConnection(remoteURI, null, null, true);
- }
-
- public Connection createConnection(URI remoteURI, String username, String password) throws JMSException {
- return createConnection(remoteURI, username, password, null, true);
- }
-
- public Connection createConnection(URI remoteURI, String username, String password, boolean syncPublish) throws JMSException {
- return createConnection(remoteURI, username, password, null, syncPublish);
- }
-
- public Connection createConnection(URI remoteURI, String username, String password, String clientId) throws JMSException {
- return createConnection(remoteURI, username, password, clientId, true);
- }
-
- public Connection createConnection(URI remoteURI, String username, String password, String clientId, boolean syncPublish) throws JMSException {
- ConnectionFactoryImpl factory = createConnectionFactory(remoteURI, username, password, clientId, syncPublish);
-
- return factory.createConnection();
- }
-
- //----- JMS TopicConnection Create methods -------------------------------//
-
- public TopicConnection createTopicConnection(URI remoteURI) throws JMSException {
- return createTopicConnection(remoteURI, null, null, true);
- }
-
- public TopicConnection createTopicConnection(URI remoteURI, String username, String password) throws JMSException {
- return createTopicConnection(remoteURI, username, password, null, true);
- }
-
- public TopicConnection createTopicConnection(URI remoteURI, String username, String password, boolean syncPublish) throws JMSException {
- return createTopicConnection(remoteURI, username, password, null, syncPublish);
- }
-
- public TopicConnection createTopicConnection(URI remoteURI, String username, String password, String clientId) throws JMSException {
- return createTopicConnection(remoteURI, username, password, clientId, true);
- }
-
- public TopicConnection createTopicConnection(URI remoteURI, String username, String password, String clientId, boolean syncPublish) throws JMSException {
- ConnectionFactoryImpl factory = createConnectionFactory(remoteURI, username, password, clientId, syncPublish);
-
- return factory.createTopicConnection();
- }
-
- //----- JMS QueueConnection Create methods -------------------------------//
-
- public QueueConnection createQueueConnection(URI remoteURI) throws JMSException {
- return createQueueConnection(remoteURI, null, null, true);
- }
-
- public QueueConnection createQueueConnection(URI remoteURI, String username, String password) throws JMSException {
- return createQueueConnection(remoteURI, username, password, null, true);
- }
-
- public QueueConnection createQueueConnection(URI remoteURI, String username, String password, boolean syncPublish) throws JMSException {
- return createQueueConnection(remoteURI, username, password, null, syncPublish);
- }
-
- public QueueConnection createQueueConnection(URI remoteURI, String username, String password, String clientId) throws JMSException {
- return createQueueConnection(remoteURI, username, password, clientId, true);
- }
-
- public QueueConnection createQueueConnection(URI remoteURI, String username, String password, String clientId, boolean syncPublish) throws JMSException {
- ConnectionFactoryImpl factory = createConnectionFactory(remoteURI, username, password, clientId, syncPublish);
-
- return factory.createQueueConnection();
- }
-
- //------ Internal Implementation bits ------------------------------------//
-
- private ConnectionFactoryImpl createConnectionFactory(
- URI remoteURI, String username, String password, String clientId, boolean syncPublish) {
-
- boolean useSSL = remoteURI.getScheme().toLowerCase().contains("ssl");
-
- LOG.debug("In createConnectionFactory using port {} ssl? {}", remoteURI.getPort(), useSSL);
-
- ConnectionFactoryImpl factory =
- new ConnectionFactoryImpl(remoteURI.getHost(), remoteURI.getPort(), username, password, clientId, useSSL);
-
- if (useSSL) {
- factory.setKeyStorePath(System.getProperty("javax.net.ssl.trustStore"));
- factory.setKeyStorePassword("password");
- factory.setTrustStorePath(System.getProperty("javax.net.ssl.keyStore"));
- factory.setTrustStorePassword("password");
- }
-
- factory.setTopicPrefix("topic://");
- factory.setQueuePrefix("queue://");
- factory.setSyncPublish(syncPublish);
-
- return factory;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq/blob/240278db/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientRequestResponseTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientRequestResponseTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientRequestResponseTest.java
deleted file mode 100644
index 1446c10..0000000
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientRequestResponseTest.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.List;
-import java.util.Vector;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JmsClientRequestResponseTest extends AmqpTestSupport implements MessageListener {
-
- private static final Logger LOG = LoggerFactory.getLogger(JmsClientRequestResponseTest.class);
-
- @Rule public TestName name = new TestName();
-
- private Connection requestorConnection;
- private Destination requestDestination;
- private Session requestorSession;
-
- private Connection responderConnection;
- private MessageProducer responseProducer;
- private Session responderSession;
- private Destination replyDestination;
-
- private final List<JMSException> failures = new Vector<JMSException>();
- private boolean dynamicallyCreateProducer;
- private final boolean useAsyncConsumer = true;
- private Thread syncThread;
-
- @Override
- @After
- public void tearDown() throws Exception {
- if (requestorConnection != null) {
- try {
- requestorConnection.close();
- } catch (Exception e) {}
- }
- if (responderConnection != null) {
- try {
- responderConnection.close();
- } catch (Exception e) {}
- }
-
- if (syncThread != null) {
- syncThread.join(5000);
- }
-
- super.tearDown();
- }
-
- private void doSetupConnections(boolean topic) throws Exception {
- responderConnection = createConnection(name.getMethodName() + "-responder");
- responderSession = responderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- if (topic) {
- requestDestination = responderSession.createTopic(name.getMethodName());
- } else {
- requestDestination = responderSession.createQueue(name.getMethodName());
- }
- responseProducer = responderSession.createProducer(null);
-
- final MessageConsumer requestConsumer = responderSession.createConsumer(requestDestination);
- if (useAsyncConsumer) {
- requestConsumer.setMessageListener(this);
- } else {
- syncThread = new Thread(new Runnable() {
- @Override
- public void run() {
- syncConsumeLoop(requestConsumer);
- }
- });
- syncThread.start();
- }
- responderConnection.start();
-
- requestorConnection = createConnection(name.getMethodName() + "-requestor");
- requestorSession = requestorConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- if (topic) {
- replyDestination = requestorSession.createTemporaryTopic();
- } else {
- replyDestination = requestorSession.createTemporaryQueue();
- }
- requestorConnection.start();
- }
-
- @Test(timeout=60000)
- public void testRequestResponseToTempQueue() throws Exception {
- doSetupConnections(false);
- doTestRequestResponse();
- }
-
- @Test(timeout=60000)
- public void testRequestResponseToTempTopic() throws Exception {
- doSetupConnections(true);
- doTestRequestResponse();
- }
-
- private void doTestRequestResponse() throws Exception {
-
- MessageProducer requestProducer = requestorSession.createProducer(requestDestination);
- MessageConsumer replyConsumer = requestorSession.createConsumer(replyDestination);
-
- TextMessage requestMessage = requestorSession.createTextMessage("SomeRequest");
- requestMessage.setJMSReplyTo(replyDestination);
- requestProducer.send(requestMessage);
-
- LOG.info("Sent request to destination: {}", requestDestination.toString());
-
- Message msg = replyConsumer.receive(10000);
-
- if (msg instanceof TextMessage) {
- TextMessage replyMessage = (TextMessage)msg;
- LOG.info("Received reply.");
- LOG.info(replyMessage.toString());
- assertTrue("Wrong message content", replyMessage.getText().startsWith("response"));
- } else {
- fail("Should have received a reply by now");
- }
- replyConsumer.close();
-
- assertEquals("Should not have had any failures: " + failures, 0, failures.size());
- }
-
- private Connection createConnection(String clientId) throws JMSException {
- return JmsClientContext.INSTANCE.createConnection(amqpURI, "admin", "password", clientId);
- }
-
- protected void syncConsumeLoop(MessageConsumer requestConsumer) {
- try {
- Message message = requestConsumer.receive(5000);
- if (message != null) {
- onMessage(message);
- } else {
- LOG.error("No message received");
- }
- } catch (JMSException e) {
- onException(e);
- }
- }
-
- @Override
- public void onMessage(Message message) {
- try {
- TextMessage requestMessage = (TextMessage)message;
-
- LOG.info("Received request.");
- LOG.info(requestMessage.toString());
-
- Destination replyDestination = requestMessage.getJMSReplyTo();
- if (replyDestination instanceof Topic) {
- LOG.info("Reply destination is: {}", ((Topic)replyDestination).getTopicName());
- } else {
- LOG.info("Reply destination is: {}", ((Queue)replyDestination).getQueueName());
- }
-
- TextMessage replyMessage = responderSession.createTextMessage("response for: " + requestMessage.getText());
- replyMessage.setJMSCorrelationID(requestMessage.getJMSMessageID());
-
- if (dynamicallyCreateProducer) {
- responseProducer = responderSession.createProducer(replyDestination);
- responseProducer.send(replyMessage);
- } else {
- responseProducer.send(replyDestination, replyMessage);
- }
-
- LOG.info("Sent reply.");
- LOG.info(replyMessage.toString());
- } catch (JMSException e) {
- onException(e);
- }
- }
-
- protected void onException(JMSException e) {
- LOG.info("Caught: " + e);
- e.printStackTrace();
- failures.add(e);
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq/blob/240278db/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsMessageGroupsTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsMessageGroupsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsMessageGroupsTest.java
deleted file mode 100644
index b559ca7..0000000
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsMessageGroupsTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp;
-
-import static org.junit.Assert.assertNotNull;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JmsMessageGroupsTest extends JMSClientTestSupport {
-
- protected static final Logger LOG = LoggerFactory.getLogger(JmsMessageGroupsTest.class);
-
- private static final int ITERATIONS = 10;
- private static final int MESSAGE_COUNT = 10;
- private static final int MESSAGE_SIZE = 200 * 1024;
- private static final int RECEIVE_TIMEOUT = 3000;
- private static final String JMSX_GROUP_ID = "JmsGroupsTest";
-
- @Test(timeout = 60000)
- public void testGroupSeqIsNeverLost() throws Exception {
- AtomicInteger sequenceCounter = new AtomicInteger();
-
- for (int i = 0; i < ITERATIONS; ++i) {
- connection = createConnection();
- {
- sendMessagesToBroker(MESSAGE_COUNT, sequenceCounter);
- readMessagesOnBroker(MESSAGE_COUNT);
- }
- connection.close();
- }
- }
-
- protected void readMessagesOnBroker(int count) throws Exception {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(getDestinationName());
- MessageConsumer consumer = session.createConsumer(queue);
-
- for (int i = 0; i < MESSAGE_COUNT; ++i) {
- Message message = consumer.receive(RECEIVE_TIMEOUT);
- assertNotNull(message);
- LOG.debug("Read message #{}: type = {}", i, message.getClass().getSimpleName());
- String gid = message.getStringProperty("JMSXGroupID");
- String seq = message.getStringProperty("JMSXGroupSeq");
- LOG.debug("Message assigned JMSXGroupID := {}", gid);
- LOG.debug("Message assigned JMSXGroupSeq := {}", seq);
- }
-
- consumer.close();
- }
-
- protected void sendMessagesToBroker(int count, AtomicInteger sequence) throws Exception {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(getDestinationName());
- MessageProducer producer = session.createProducer(queue);
-
- byte[] buffer = new byte[MESSAGE_SIZE];
- for (count = 0; count < MESSAGE_SIZE; count++) {
- String s = String.valueOf(count % 10);
- Character c = s.charAt(0);
- int value = c.charValue();
- buffer[count] = (byte) value;
- }
-
- LOG.debug("Sending {} messages to destination: {}", MESSAGE_COUNT, queue);
- for (int i = 1; i <= MESSAGE_COUNT; i++) {
- BytesMessage message = session.createBytesMessage();
- message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
- message.setStringProperty("JMSXGroupID", JMSX_GROUP_ID);
- message.setIntProperty("JMSXGroupSeq", sequence.incrementAndGet());
- message.writeBytes(buffer);
- producer.send(message);
- }
-
- producer.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq/blob/240278db/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SimpleAMQPAuthTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SimpleAMQPAuthTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SimpleAMQPAuthTest.java
deleted file mode 100644
index c9b8c23..0000000
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SimpleAMQPAuthTest.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.net.URI;
-
-import javax.jms.Connection;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.broker.BrokerFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.qpid.amqp_1_0.client.ConnectionClosedException;
-import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SimpleAMQPAuthTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(SimpleAMQPAuthTest.class);
-
- private final String SIMPLE_AUTH_AMQP_BROKER_XML =
- "org/apache/activemq/transport/amqp/simple-auth-amqp-broker.xml";
- private BrokerService brokerService;
- private int port;
-
- @Before
- public void setUp() throws Exception {
- startBroker();
- }
-
- @After
- public void stopBroker() throws Exception {
- if (brokerService != null) {
- brokerService.stop();
- brokerService = null;
- }
- }
-
- @Test(timeout = 10000)
- public void testNoUserOrPassword() throws Exception {
- try {
- ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "", "");
- factory.setQueuePrefix("queue://");
- factory.setTopicPrefix("topic://");
-
- Connection connection = factory.createConnection();
- connection.setExceptionListener(new ExceptionListener() {
- @Override
- public void onException(JMSException exception) {
- LOG.error("Unexpected exception ", exception);
- exception.printStackTrace();
- }
- });
- connection.start();
- Thread.sleep(500);
- connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- fail("Expected JMSException");
- } catch (JMSException e) {
- Exception linkedException = e.getLinkedException();
- if (linkedException != null && linkedException instanceof ConnectionClosedException) {
- ConnectionClosedException cce = (ConnectionClosedException) linkedException;
- assertEquals("Error{condition=unauthorized-access,description=User name [null] or password is invalid.}", cce.getRemoteError().toString());
- } else {
- LOG.error("Unexpected Exception", e);
- fail("Unexpected exception: " + e.getMessage());
- }
- }
- }
-
- @Test(timeout = 10000)
- public void testUnknownUser() throws Exception {
- try {
- ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password");
- factory.setQueuePrefix("queue://");
- factory.setTopicPrefix("topic://");
-
- Connection connection = factory.createConnection("nosuchuser", "blah");
- connection.start();
- Thread.sleep(500);
- connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- fail("Expected JMSException");
- } catch (JMSException e) {
- Exception linkedException = e.getLinkedException();
- if (linkedException != null && linkedException instanceof ConnectionClosedException) {
- ConnectionClosedException cce = (ConnectionClosedException) linkedException;
- assertEquals("Error{condition=unauthorized-access,description=User name [nosuchuser] or password is invalid.}", cce.getRemoteError().toString());
- } else {
- LOG.error("Unexpected Exception", e);
- fail("Unexpected exception: " + e.getMessage());
- }
- }
- }
-
- @Test(timeout = 10000)
- public void testKnownUserWrongPassword() throws Exception {
- try {
- ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password");
- factory.setQueuePrefix("queue://");
- factory.setTopicPrefix("topic://");
-
- Connection connection = factory.createConnection("user", "wrongPassword");
- connection.start();
- Thread.sleep(500);
- connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- fail("Expected JMSException");
- } catch (JMSException e) {
- Exception linkedException = e.getLinkedException();
- if (linkedException != null && linkedException instanceof ConnectionClosedException) {
- ConnectionClosedException cce = (ConnectionClosedException) linkedException;
- assertEquals("Error{condition=unauthorized-access,description=User name [user] or password is invalid.}", cce.getRemoteError().toString());
- } else {
- LOG.error("Unexpected Exception", e);
- fail("Unexpected exception: " + e.getMessage());
- }
- }
- }
-
- @Test(timeout = 30000)
- public void testSendReceive() throws Exception {
- ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password");
- factory.setQueuePrefix("queue://");
- factory.setTopicPrefix("topic://");
-
- Connection connection = factory.createConnection("user", "userPassword");
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue("txQueue");
- MessageProducer p = session.createProducer(queue);
- TextMessage message = null;
- message = session.createTextMessage();
- String messageText = "hello sent at " + new java.util.Date().toString();
- message.setText(messageText);
- p.send(message);
-
- // Get the message we just sent
- MessageConsumer consumer = session.createConsumer(queue);
- connection.start();
- Message msg = consumer.receive(5000);
- assertNotNull(msg);
- assertTrue(msg instanceof TextMessage);
- TextMessage textMessage = (TextMessage) msg;
- assertEquals(messageText, textMessage.getText());
- connection.close();
- }
-
- protected BrokerService createBroker() throws Exception {
- return createBroker(SIMPLE_AUTH_AMQP_BROKER_XML);
- }
-
- protected BrokerService createBroker(String uri) throws Exception {
- LOG.debug(">>>>> Loading broker configuration from the classpath with URI: {}", uri);
- return BrokerFactory.createBroker(new URI("xbean:" + uri));
- }
-
- public void startBroker() throws Exception {
- brokerService = createBroker();
- brokerService.start();
- port = brokerService.getTransportConnectorByName("amqp").getPublishableConnectURI().getPort();
- brokerService.waitUntilStarted();
- }
-}
-
[4/7] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5617
Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5617
Refactor as a general test case that covers the scenario accross all the
transport types.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8f0bf606
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8f0bf606
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8f0bf606
Branch: refs/heads/master
Commit: 8f0bf6060a8f93a5bda57e8f152139dbb9aad8d2
Parents: 4fa420b
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Feb 26 15:06:16 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Feb 26 15:06:16 2015 -0500
----------------------------------------------------------------------
.../activemq/transport/amqp/AMQ4920Test.java | 161 --------------
.../amqp/JMSConcurrentConsumersTest.java | 213 +++++++++++++++++++
2 files changed, 213 insertions(+), 161 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/8f0bf606/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java
deleted file mode 100644
index 94123af..0000000
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp;
-
-import static org.junit.Assert.assertEquals;
-
-import java.net.URI;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4920Test extends AmqpTestSupport {
- private static final Logger LOG = LoggerFactory.getLogger(AMQ4920Test.class);
- private static final Integer ITERATIONS = 500;
- private static final Integer CONSUMER_COUNT = 4; // At least 2 consumers are
- // required to reproduce
- // the original issue
- public static final String TEXT_MESSAGE = "TextMessage: ";
- private final CountDownLatch latch = new CountDownLatch(CONSUMER_COUNT * ITERATIONS);
- private final CountDownLatch initLatch = new CountDownLatch(CONSUMER_COUNT);
-
- @Test(timeout = 60000)
- public void testSendWithMultipleConsumers() throws Exception {
- Connection connection =
- JmsClientContext.INSTANCE.createConnection(amqpURI, "admin", "password", false);
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- String destinationName = "AMQ4920Test" + System.currentTimeMillis();
- Destination destination = session.createTopic(destinationName);
- connection.start();
-
- ExecutorService executor = Executors.newCachedThreadPool();
- for (int i = 0; i < CONSUMER_COUNT; i++) {
- AMQ4930ConsumerTask consumerTask = new AMQ4930ConsumerTask(initLatch, destinationName, amqpURI, "Consumer-" + i, latch, ITERATIONS);
- executor.submit(consumerTask);
- }
- connection.start();
-
- // Make sure at least Topic consumers are subscribed before the first send.
- initLatch.await();
-
- LOG.debug("At start latch is " + latch.getCount());
- sendMessages(connection, destination, ITERATIONS, 10);
- LOG.debug("After send latch is " + latch.getCount());
-
- latch.await(15, TimeUnit.SECONDS);
- LOG.debug("After await latch is " + latch.getCount());
- assertEquals(0, latch.getCount());
-
- executor.shutdown();
- }
-
- public void sendMessages(Connection connection, Destination destination, int count, int sleepInterval) throws Exception {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(destination);
-
- for (int i = 0; i < count; i++) {
- TextMessage message = session.createTextMessage();
- message.setText(TEXT_MESSAGE + i);
- LOG.trace("Sending message [" + i + "]");
- producer.send(message);
- if (sleepInterval > 0) {
- Thread.sleep(sleepInterval);
- }
- }
-
- session.close();
- }
-}
-
-class AMQ4930ConsumerTask implements Callable<Boolean> {
- protected static final Logger LOG = LoggerFactory.getLogger(AMQ4930ConsumerTask.class);
- private final String destinationName;
- private final String consumerName;
- private final CountDownLatch messagesReceived;
- private final URI amqpURI;
- private final int expectedMessageCount;
- private final CountDownLatch started;
-
- public AMQ4930ConsumerTask(CountDownLatch started, String destinationName, URI amqpURI, String consumerName, CountDownLatch latch, int expectedMessageCount) {
- this.started = started;
- this.destinationName = destinationName;
- this.amqpURI = amqpURI;
- this.consumerName = consumerName;
- this.messagesReceived = latch;
- this.expectedMessageCount = expectedMessageCount;
- }
-
- @Override
- public Boolean call() throws Exception {
- LOG.debug(consumerName + " starting");
- Connection connection = null;
- try {
- connection = JmsClientContext.INSTANCE.createConnection(amqpURI, "admin", "admin", false);
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createTopic(destinationName);
- MessageConsumer consumer = session.createConsumer(destination);
- connection.start();
-
- started.countDown();
-
- int receivedCount = 0;
- while (receivedCount < expectedMessageCount) {
- Message message = consumer.receive(2000);
- if (message == null) {
- LOG.error("consumer {} got null message on iteration {}", consumerName, receivedCount);
- return false;
- }
- if (!(message instanceof TextMessage)) {
- LOG.error("consumer {} expected text message on iteration {} but got {}", consumerName, receivedCount, message.getClass().getCanonicalName());
- return false;
- }
- TextMessage tm = (TextMessage) message;
- if (!tm.getText().equals(AMQ4920Test.TEXT_MESSAGE + receivedCount)) {
- LOG.error("consumer {} expected {} got message [{}]", consumerName, receivedCount, tm.getText());
- return false;
- }
- LOG.trace("consumer {} expected {} got message [{}]", consumerName, receivedCount, tm.getText());
- messagesReceived.countDown();
- receivedCount++;
- }
- } catch (Exception e) {
- LOG.error("UnexpectedException in " + consumerName, e);
- } finally {
- try {
- connection.close();
- } catch (JMSException ignoreMe) {
- }
- }
-
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq/blob/8f0bf606/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSConcurrentConsumersTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSConcurrentConsumersTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSConcurrentConsumersTest.java
new file mode 100644
index 0000000..2d7b34f
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSConcurrentConsumersTest.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import static org.junit.Assert.assertEquals;
+
+import java.net.URI;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JMSConcurrentConsumersTest extends AmqpTestSupport {
+ private static final Logger LOG = LoggerFactory.getLogger(JMSConcurrentConsumersTest.class);
+ private static final Integer ITERATIONS = 400;
+ private static final Integer CONSUMER_COUNT = 4; // At least 2 consumers are
+ // required to reproduce
+ // the original issue
+ public static final String TEXT_MESSAGE = "TextMessage: ";
+
+ private CountDownLatch latch;
+ private CountDownLatch initLatch;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ latch = new CountDownLatch(CONSUMER_COUNT * ITERATIONS);
+ initLatch = new CountDownLatch(CONSUMER_COUNT);
+ }
+
+ @Override
+ protected boolean isUseTcpConnector() {
+ return true;
+ }
+
+ @Override
+ protected boolean isUseSslConnector() {
+ return true;
+ }
+
+ @Override
+ protected boolean isUseNioConnector() {
+ return true;
+ }
+
+ @Override
+ protected boolean isUseNioPlusSslConnector() {
+ return true;
+ }
+
+ @Test(timeout = 60000)
+ public void testSendWithMultipleConsumersTCP() throws Exception {
+ doTestSendWithMultipleConsumers(amqpURI);
+ }
+
+ @Test(timeout = 60000)
+ public void testSendWithMultipleConsumersNIO() throws Exception {
+ doTestSendWithMultipleConsumers(amqpNioURI);
+ }
+
+ @Test(timeout = 60000)
+ public void testSendWithMultipleConsumersSSL() throws Exception {
+ doTestSendWithMultipleConsumers(amqpSslURI);
+ }
+
+ @Test(timeout = 60000)
+ public void testSendWithMultipleConsumersNIOPlusSSL() throws Exception {
+ doTestSendWithMultipleConsumers(amqpNioPlusSslURI);
+ }
+
+ public void doTestSendWithMultipleConsumers(URI remoteURI) throws Exception {
+
+ Connection connection = JmsClientContext.INSTANCE.createConnection(remoteURI, "admin", "password", false);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String destinationName = "AMQ4920Test" + System.currentTimeMillis();
+ Destination destination = session.createTopic(destinationName);
+ connection.start();
+
+ ExecutorService executor = Executors.newCachedThreadPool();
+ for (int i = 0; i < CONSUMER_COUNT; i++) {
+ ConsumerTask consumerTask = new ConsumerTask(initLatch, destinationName, remoteURI, "Consumer-" + i, latch, ITERATIONS);
+ executor.submit(consumerTask);
+ }
+ connection.start();
+
+ // Make sure at least Topic consumers are subscribed before the first send.
+ initLatch.await();
+
+ LOG.debug("At start latch is " + latch.getCount());
+ sendMessages(connection, destination, ITERATIONS, 10);
+ LOG.debug("After send latch is " + latch.getCount());
+
+ latch.await(15, TimeUnit.SECONDS);
+ LOG.debug("After await latch is " + latch.getCount());
+ assertEquals(0, latch.getCount());
+
+ executor.shutdown();
+ }
+
+ public void sendMessages(Connection connection, Destination destination, int count, int sleepInterval) throws Exception {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(destination);
+
+ for (int i = 0; i < count; i++) {
+ TextMessage message = session.createTextMessage();
+ message.setText(TEXT_MESSAGE + i);
+ LOG.trace("Sending message [" + i + "]");
+ producer.send(message);
+ if (sleepInterval > 0) {
+ Thread.sleep(sleepInterval);
+ }
+ }
+
+ session.close();
+ }
+
+ static class ConsumerTask implements Callable<Boolean> {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(ConsumerTask.class);
+
+ private final String destinationName;
+ private final String consumerName;
+ private final CountDownLatch messagesReceived;
+ private final URI amqpURI;
+ private final int expectedMessageCount;
+ private final CountDownLatch started;
+
+ public ConsumerTask(CountDownLatch started, String destinationName, URI amqpURI, String consumerName, CountDownLatch latch, int expectedMessageCount) {
+ this.started = started;
+ this.destinationName = destinationName;
+ this.amqpURI = amqpURI;
+ this.consumerName = consumerName;
+ this.messagesReceived = latch;
+ this.expectedMessageCount = expectedMessageCount;
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ LOG.debug(consumerName + " starting");
+ Connection connection = null;
+ try {
+ connection = JmsClientContext.INSTANCE.createConnection(amqpURI, "admin", "admin", false);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createTopic(destinationName);
+ MessageConsumer consumer = session.createConsumer(destination);
+ connection.start();
+
+ started.countDown();
+
+ int receivedCount = 0;
+ while (receivedCount < expectedMessageCount) {
+ Message message = consumer.receive(2000);
+ if (message == null) {
+ LOG.error("consumer {} got null message on iteration {}", consumerName, receivedCount);
+ return false;
+ }
+ if (!(message instanceof TextMessage)) {
+ LOG.error("consumer {} expected text message on iteration {} but got {}", consumerName, receivedCount, message.getClass().getCanonicalName());
+ return false;
+ }
+ TextMessage tm = (TextMessage) message;
+ if (!tm.getText().equals(TEXT_MESSAGE + receivedCount)) {
+ LOG.error("consumer {} expected {} got message [{}]", consumerName, receivedCount, tm.getText());
+ return false;
+ }
+ LOG.trace("consumer {} expected {} got message [{}]", consumerName, receivedCount, tm.getText());
+ messagesReceived.countDown();
+ receivedCount++;
+ }
+ } catch (Exception e) {
+ LOG.error("UnexpectedException in " + consumerName, e);
+ } finally {
+ try {
+ connection.close();
+ } catch (JMSException ignoreMe) {
+ }
+ }
+
+ return true;
+ }
+ }
+}