You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/09/23 20:20:27 UTC
[03/27] Initial drop of donated AMQP Client Code.
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java
new file mode 100644
index 0000000..aa97a60
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.apache.qpid.jms.support.Wait;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test that Session CLIENT_ACKNOWLEDGE works as expected.
+ */
+public class JmsClientAckTest extends AmqpTestSupport {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(JmsClientAckTest.class);
+
+ private Connection connection;
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ connection.close();
+ super.tearDown();
+ }
+
+ @Test(timeout = 60000)
+ public void testAckedMessageAreConsumed() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Hello"));
+
+ final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(1, proxy.getQueueSize());
+
+ // Consume the message...
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+
+ assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return proxy.getQueueSize() == 0;
+ }
+ }));
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testLastMessageAcked() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Hello"));
+ producer.send(session.createTextMessage("Hello2"));
+ producer.send(session.createTextMessage("Hello3"));
+
+ final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(3, proxy.getQueueSize());
+
+ // Consume the message...
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+
+ assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return proxy.getQueueSize() == 0;
+ }
+ }));
+ }
+
+ @Test(timeout = 60000)
+ public void testUnAckedMessageAreNotConsumedOnSessionClose() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Hello"));
+
+ final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(1, proxy.getQueueSize());
+
+ // Consume the message...but don't ack it.
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message msg = consumer.receive(1000);
+ assertNotNull(msg);
+ session.close();
+
+ assertEquals(1, proxy.getQueueSize());
+
+ // Consume the message...and this time we ack it.
+ session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ consumer = session.createConsumer(queue);
+ msg = consumer.receive(2000);
+ assertNotNull(msg);
+ msg.acknowledge();
+
+ assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return proxy.getQueueSize() == 0;
+ }
+ }));
+ }
+
+ @Test(timeout = 60000)
+ public void testAckedMessageAreConsumedByAsync() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Hello"));
+
+ final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(1, proxy.getQueueSize());
+
+ // Consume the message...
+ MessageConsumer consumer = session.createConsumer(queue);
+ consumer.setMessageListener(new MessageListener() {
+
+ @Override
+ public void onMessage(Message message) {
+ try {
+ message.acknowledge();
+ } catch (JMSException e) {
+ LOG.warn("Unexpected exception on acknowledge: {}", e.getMessage());
+ }
+ }
+ });
+
+ assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return proxy.getQueueSize() == 0;
+ }
+ }));
+ }
+
+ @Test(timeout = 60000)
+ public void testUnAckedAsyncMessageAreNotConsumedOnSessionClose() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Hello"));
+
+ final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(1, proxy.getQueueSize());
+
+ // Consume the message...
+ MessageConsumer consumer = session.createConsumer(queue);
+ consumer.setMessageListener(new MessageListener() {
+
+ @Override
+ public void onMessage(Message message) {
+ // Don't ack the message.
+ }
+ });
+
+ session.close();
+ assertEquals(1, proxy.getQueueSize());
+
+ // Now we consume and ack the Message.
+ session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ consumer = session.createConsumer(queue);
+ Message msg = consumer.receive(2000);
+ assertNotNull(msg);
+ msg.acknowledge();
+
+ assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return proxy.getQueueSize() == 0;
+ }
+ }));
+ }
+
+ @Test(timeout=90000)
+ public void testAckMarksAllConsumerMessageAsConsumed() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = session.createQueue(name.getMethodName());
+
+ final int MSG_COUNT = 30;
+ final AtomicReference<Message> lastMessage = new AtomicReference<Message>();
+ final CountDownLatch done = new CountDownLatch(MSG_COUNT);
+
+ MessageListener myListener = new MessageListener() {
+
+ @Override
+ public void onMessage(Message message) {
+ lastMessage.set(message);
+ done.countDown();
+ }
+ };
+
+ MessageConsumer consumer1 = session.createConsumer(queue);
+ consumer1.setMessageListener(myListener);
+ MessageConsumer consumer2 = session.createConsumer(queue);
+ consumer2.setMessageListener(myListener);
+ MessageConsumer consumer3 = session.createConsumer(queue);
+ consumer3.setMessageListener(myListener);
+
+ MessageProducer producer = session.createProducer(queue);
+ for (int i = 0; i < MSG_COUNT; ++i) {
+ producer.send(session.createTextMessage("Hello: " + i));
+ }
+
+ final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(MSG_COUNT, proxy.getQueueSize());
+
+ assertTrue("Failed to consume all messages.", done.await(20, TimeUnit.SECONDS));
+ assertNotNull(lastMessage.get());
+ assertEquals(MSG_COUNT, proxy.getInFlightCount());
+
+ lastMessage.get().acknowledge();
+
+ assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return proxy.getQueueSize() == 0;
+ }
+ }));
+ }
+
+ @Test(timeout=60000)
+ public void testUnackedAreRecovered() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+ Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = consumerSession.createQueue(name.getMethodName());
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ TextMessage sent1 = producerSession.createTextMessage();
+ sent1.setText("msg1");
+ producer.send(sent1);
+ TextMessage sent2 = producerSession.createTextMessage();
+ sent1.setText("msg2");
+ producer.send(sent2);
+ TextMessage sent3 = producerSession.createTextMessage();
+ sent1.setText("msg3");
+ producer.send(sent3);
+
+ consumer.receive(5000);
+ Message rec2 = consumer.receive(5000);
+ consumer.receive(5000);
+ rec2.acknowledge();
+
+ TextMessage sent4 = producerSession.createTextMessage();
+ sent4.setText("msg4");
+ producer.send(sent4);
+
+ Message rec4 = consumer.receive(5000);
+ assertNotNull(rec4);
+ assertTrue(rec4.equals(sent4));
+ consumerSession.recover();
+ rec4 = consumer.receive(5000);
+ assertNotNull(rec4);
+ assertTrue(rec4.equals(sent4));
+ assertTrue(rec4.getJMSRedelivered());
+ rec4.acknowledge();
+ }
+
+ @Test(timeout=60000)
+ public void testRecoverRedelivery() throws Exception {
+ final CountDownLatch redelivery = new CountDownLatch(6);
+ connection = createAmqpConnection();
+ connection.start();
+
+ final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ consumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ try {
+ LOG.info("Got message: " + message.getJMSMessageID());
+ if (message.getJMSRedelivered()) {
+ LOG.info("It's a redelivery.");
+ redelivery.countDown();
+ }
+ LOG.info("calling recover() on the session to force redelivery.");
+ session.recover();
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ connection.start();
+
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("test"));
+
+ assertTrue("we got 6 redeliveries", redelivery.await(20, TimeUnit.SECONDS));
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java
new file mode 100644
index 0000000..018d6bc
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.apache.qpid.jms.support.Wait;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test for Message priority ordering.
+ */
+public class JmsConsumerPriorityDispatchTest extends AmqpTestSupport {
+
+ private Connection connection;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ connection = createAmqpConnection();
+ }
+
+ @Test(timeout = 60000)
+ public void testPrefetchedMessageArePriorityOrdered() throws Exception {
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageProducer producer = session.createProducer(queue);
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = null;
+
+ for (int i = 0; i < 10; i++) {
+ message = session.createTextMessage();
+ producer.setPriority(i);
+ producer.send(message);
+ }
+
+ // Wait for all sent to be dispatched.
+ final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return proxy.getInFlightCount() == 10;
+ }
+ });
+
+ // We need to make sure that all messages are in the prefetch buffer.
+ TimeUnit.SECONDS.sleep(4);
+
+ for (int i = 9; i >= 0; i--) {
+ message = consumer.receive(5000);
+ assertNotNull(message);
+ assertEquals(i, message.getJMSPriority());
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testPrefetchedMessageAreNotPriorityOrdered() throws Exception {
+ // We are assuming that Broker side priority support is not enabled in the create
+ // broker method in AmqpTestSupport. If that changes then this test will sometimes
+ // fail.
+ ((JmsConnection) connection).setMessagePrioritySupported(false);
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageProducer producer = session.createProducer(queue);
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = null;
+
+ for (int i = 0; i < 10; i++) {
+ message = session.createTextMessage();
+ producer.setPriority(i);
+ producer.send(message);
+ }
+
+ // Wait for all sent to be dispatched.
+ final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return proxy.getInFlightCount() == 10;
+ }
+ });
+
+ // We need to make sure that all messages are in the prefetch buffer.
+ TimeUnit.SECONDS.sleep(4);
+
+ for (int i = 0; i < 10; i++) {
+ message = consumer.receive(5000);
+ assertNotNull(message);
+ assertEquals(i, message.getJMSPriority());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsCreateResourcesInOnMessageTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsCreateResourcesInOnMessageTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsCreateResourcesInOnMessageTest.java
new file mode 100644
index 0000000..dbbee26
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsCreateResourcesInOnMessageTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.apache.qpid.jms.support.Wait;
+import org.junit.Test;
+
+/**
+ * Test the case where messages are sent and consumers are created in onMessage.
+ */
+public class JmsCreateResourcesInOnMessageTest extends AmqpTestSupport {
+
+ @Test(timeout = 60000)
+ public void testCreateProducerInOnMessage() throws Exception {
+ Connection connection = createAmqpConnection();
+ connection.start();
+
+ final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(queue);
+ final Queue forwardQ = session.createQueue(name.getMethodName() + "-forwarded");
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("TEST-MESSAGE"));
+ producer.close();
+
+ final QueueViewMBean proxy = getProxyToQueue(queue.getQueueName());
+ assertEquals(1, proxy.getQueueSize());
+
+ consumer.setMessageListener(new MessageListener() {
+
+ @Override
+ public void onMessage(Message message) {
+
+ try {
+ LOG.debug("Received async message: {}", message);
+ MessageProducer producer = session.createProducer(forwardQ);
+ producer.send(message);
+ LOG.debug("forwarded async message: {}", message);
+ } catch (Throwable e) {
+ LOG.debug("Caught exception: {}", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+ });
+
+ assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return proxy.getQueueSize() == 0;
+ }
+ }));
+
+ final QueueViewMBean proxy2 = getProxyToQueue(forwardQ.getQueueName());
+ assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return proxy2.getQueueSize() == 1;
+ }
+ }));
+
+ connection.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java
new file mode 100644
index 0000000..5994184
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.broker.jmx.TopicViewMBean;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test Durable Topic Subscriber functionality.
+ */
+public class JmsDurableSubscriberTest extends AmqpTestSupport {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(JmsMessageConsumerTest.class);
+
+ @Override
+ public boolean isPersistent() {
+ return true;
+ }
+
+ @Test(timeout = 60000)
+ public void testCreateDuableSubscriber() throws Exception {
+ connection = createAmqpConnection();
+ connection.setClientID("DURABLE-AMQP");
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ Topic topic = session.createTopic(name.getMethodName());
+ session.createDurableSubscriber(topic, name.getMethodName() + "-subscriber");
+
+ TopicViewMBean proxy = getProxyToTopic(name.getMethodName());
+ assertEquals(0, proxy.getQueueSize());
+
+ assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
+ }
+
+ @Test(timeout = 60000)
+ public void testDurableGoesOfflineAndReturns() throws Exception {
+ connection = createAmqpConnection();
+ connection.setClientID("DURABLE-AMQP");
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ Topic topic = session.createTopic(name.getMethodName());
+ TopicSubscriber subscriber = session.createDurableSubscriber(topic, name.getMethodName() + "-subscriber");
+
+ TopicViewMBean proxy = getProxyToTopic(name.getMethodName());
+ assertEquals(0, proxy.getQueueSize());
+
+ assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
+ assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+ subscriber.close();
+
+ assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+ assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+ subscriber = session.createDurableSubscriber(topic, name.getMethodName() + "-subscriber");
+
+ assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
+ assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+ }
+
+ @Test(timeout = 60000)
+ public void testOfflineSubscriberGetsItsMessages() throws Exception {
+ connection = createAmqpConnection();
+ connection.setClientID("DURABLE-AMQP");
+ connection.start();
+
+ final int MSG_COUNT = 5;
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ Topic topic = session.createTopic(name.getMethodName());
+ TopicSubscriber subscriber = session.createDurableSubscriber(topic, name.getMethodName() + "-subscriber");
+
+ TopicViewMBean proxy = getProxyToTopic(name.getMethodName());
+ assertEquals(0, proxy.getQueueSize());
+ assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
+ assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+ subscriber.close();
+
+ assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+ assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+ MessageProducer producer = session.createProducer(topic);
+ for (int i = 0; i < MSG_COUNT; i++) {
+ producer.send(session.createTextMessage("Message: " + i));
+ }
+ producer.close();
+
+ LOG.info("Bringing offline subscription back online.");
+ subscriber = session.createDurableSubscriber(topic, name.getMethodName() + "-subscriber");
+
+ assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
+ assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+ final CountDownLatch messages = new CountDownLatch(MSG_COUNT);
+ subscriber.setMessageListener(new MessageListener() {
+
+ @Override
+ public void onMessage(Message message) {
+ LOG.info("Consumer got a message: {}", message);
+ messages.countDown();
+ }
+ });
+
+ assertTrue("Only recieved messages: " + messages.getCount(), messages.await(30, TimeUnit.SECONDS));
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerClosedTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerClosedTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerClosedTest.java
new file mode 100644
index 0000000..8fbcbad
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerClosedTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.consumer;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+
+/**
+ * Tests MessageConsumer method contracts after the MessageConsumer is closed.
+ */
+public class JmsMessageConsumerClosedTest extends AmqpTestSupport {
+
+ protected MessageConsumer consumer;
+
+ protected MessageConsumer createConsumer() throws Exception {
+ connection = createAmqpConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue destination = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(destination);
+ consumer.close();
+ return consumer;
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ consumer = createConsumer();
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testGetMessageSelectorFails() throws JMSException {
+ consumer.getMessageSelector();
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testGetMessageListenerFails() throws JMSException {
+ consumer.getMessageListener();
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testSetMessageListenerFails() throws JMSException {
+ consumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ }
+ });
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testRreceiveFails() throws JMSException {
+ consumer.receive();
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testRreceiveTimedFails() throws JMSException {
+ consumer.receive(11);
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testRreceiveNoWaitFails() throws JMSException {
+ consumer.receiveNoWait();
+ }
+
+ @Test(timeout=30000)
+ public void testClose() throws JMSException {
+ consumer.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerFailedTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerFailedTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerFailedTest.java
new file mode 100644
index 0000000..85b4b37
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerFailedTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.consumer;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.support.Wait;
+
+/**
+ * Tests MessageConsumer method contracts after the MessageConsumer connection fails.
+ */
+public class JmsMessageConsumerFailedTest extends JmsMessageConsumerClosedTest {
+
+ @Override
+ protected MessageConsumer createConsumer() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ connection = createAmqpConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue destination = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(destination);
+ connection.setExceptionListener(new ExceptionListener() {
+
+ @Override
+ public void onException(JMSException exception) {
+ latch.countDown();
+ }
+ });
+ connection.start();
+ stopPrimaryBroker();
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ final JmsConnection jmsConnection = (JmsConnection) connection;
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return !jmsConnection.isConnected();
+ }
+ }));
+ return consumer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerTest.java
new file mode 100644
index 0000000..609b46a
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerTest.java
@@ -0,0 +1,467 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.JMSSecurityException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.jmx.TopicViewMBean;
+import org.apache.qpid.jms.JmsMessageAvailableListener;
+import org.apache.qpid.jms.JmsMessageConsumer;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.apache.qpid.jms.support.Wait;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test for basic JMS MessageConsumer functionality.
+ */
+public class JmsMessageConsumerTest extends AmqpTestSupport {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(JmsMessageConsumerTest.class);
+
+ @Override
+ public boolean isPersistent() {
+ return true;
+ }
+
+ @Test(timeout = 60000)
+ public void testCreateMessageConsumer() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ Queue queue = session.createQueue(name.getMethodName());
+ session.createConsumer(queue);
+
+ QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(0, proxy.getQueueSize());
+ }
+
+ @Test(timeout = 60000)
+ public void testSyncConsumeFromQueue() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ sendToAmqQueue(1);
+
+ final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(1, proxy.getQueueSize());
+
+ assertNotNull("Failed to receive any message.", consumer.receive(2000));
+
+ assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return proxy.getQueueSize() == 0;
+ }
+ }));
+ }
+
+ @Test(timeout = 60000)
+ public void testSyncConsumeFromTopic() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ Topic topic = session.createTopic(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(topic);
+
+ sendToAmqTopic(1);
+
+ final TopicViewMBean proxy = getProxyToTopic(name.getMethodName());
+ //assertEquals(1, proxy.getQueueSize());
+
+ assertNotNull("Failed to receive any message.", consumer.receive(2000));
+
+ assertTrue("Published message not consumed.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return proxy.getQueueSize() == 0;
+ }
+ }));
+ }
+
+ @Test(timeout = 60000)
+ public void testMessageAvailableConsumer() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+
+ final int MSG_COUNT = 10;
+ final AtomicInteger available = new AtomicInteger();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(queue);
+ ((JmsMessageConsumer) consumer).setAvailableListener(new JmsMessageAvailableListener() {
+
+ @Override
+ public void onMessageAvailable(MessageConsumer consumer) {
+ available.incrementAndGet();
+ }
+ });
+
+ sendToAmqQueue(MSG_COUNT);
+
+ final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(MSG_COUNT, proxy.getQueueSize());
+
+ assertTrue("Listener not notified of correct number of messages.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return available.get() == MSG_COUNT;
+ }
+ }));
+
+ // All should be immediately ready for consume.
+ for (int i = 0; i < MSG_COUNT; ++i) {
+ assertNotNull(consumer.receiveNoWait());
+ }
+
+ assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return proxy.getQueueSize() == 0;
+ }
+ }));
+ }
+
+ /**
+ * Test to check if consumer thread wakes up inside a receive(timeout) after
+ * a message is dispatched to the consumer. We do a long poll here to ensure
+ * that a blocked receive with timeout does eventually get a Message. We don't
+ * want to test the short poll and retry case here since that's not what we are
+ * testing.
+ *
+ * @throws Exception
+ */
+ @Test(timeout=60000)
+ public void testConsumerReceiveBeforeMessageDispatched() throws Exception {
+ final Connection connection = createAmqpConnection();
+ this.connection = connection;
+ connection.start();
+
+ final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Queue queue = session.createQueue(name.getMethodName());
+
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ TimeUnit.SECONDS.sleep(10);
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Hello"));
+ } catch (Exception e) {
+ LOG.warn("Caught during message send: {}", e.getMessage());
+ }
+ }
+ };
+ t.start();
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message msg = consumer.receive(60000);
+ assertNotNull(msg);
+ }
+
+ @Test(timeout=60000)
+ public void testAsynchronousMessageConsumption() throws Exception {
+ final int msgCount = 4;
+ final Connection connection = createAmqpConnection();
+ final AtomicInteger counter = new AtomicInteger(0);
+ final CountDownLatch done = new CountDownLatch(1);
+ this.connection = connection;
+
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue destination = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ consumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message m) {
+ LOG.debug("Async consumer got Message: {}", m);
+ counter.incrementAndGet();
+ if (counter.get() == msgCount) {
+ done.countDown();
+ }
+ }
+ });
+
+ sendToAmqQueue(msgCount);
+ assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
+ TimeUnit.SECONDS.sleep(1);
+ assertEquals(msgCount, counter.get());
+ }
+
+ @Test(timeout=60000)
+ public void testSyncReceiveFailsWhenListenerSet() throws Exception {
+ final int msgCount = 4;
+ final Connection connection = createAmqpConnection();
+ final AtomicInteger counter = new AtomicInteger(0);
+ final CountDownLatch done = new CountDownLatch(1);
+ this.connection = connection;
+
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue destination = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ consumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message m) {
+ LOG.debug("Async consumer got Message: {}", m);
+ counter.incrementAndGet();
+ if (counter.get() == msgCount) {
+ done.countDown();
+ }
+ }
+ });
+
+ try {
+ consumer.receive();
+ fail("Should have thrown an exception.");
+ } catch (JMSException ex) {
+ }
+
+ try {
+ consumer.receive(1000);
+ fail("Should have thrown an exception.");
+ } catch (JMSException ex) {
+ }
+
+ try {
+ consumer.receiveNoWait();
+ fail("Should have thrown an exception.");
+ } catch (JMSException ex) {
+ }
+ }
+
+ @Test(timeout=60000)
+ public void testSetMessageListenerAfterStartAndSend() throws Exception {
+ final int msgCount = 4;
+ final Connection connection = createAmqpConnection();
+ final AtomicInteger counter = new AtomicInteger(0);
+ final CountDownLatch done = new CountDownLatch(1);
+ this.connection = connection;
+
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue destination = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(destination);
+ sendToAmqQueue(msgCount);
+
+ consumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message m) {
+ LOG.debug("Async consumer got Message: {}", m);
+ counter.incrementAndGet();
+ if (counter.get() == msgCount) {
+ done.countDown();
+ }
+ }
+ });
+
+ assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
+ TimeUnit.SECONDS.sleep(1);
+ assertEquals(msgCount, counter.get());
+ }
+
+ @Test(timeout=60000)
+ public void testNoReceivedMessagesWhenConnectionNotStarted() throws Exception {
+ connection = createAmqpConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue destination = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(destination);
+ sendToAmqQueue(3);
+ assertNull(consumer.receive(2000));
+ }
+
+ @Test(timeout = 60000)
+ public void testMessagesAreAckedAMQProducer() throws Exception {
+ int messagesSent = 3;
+ assertTrue(brokerService.isPersistent());
+
+ connection = createActiveMQConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageProducer p = session.createProducer(queue);
+ TextMessage message = null;
+ for (int i=0; i < messagesSent; i++) {
+ message = session.createTextMessage();
+ String messageText = "Hello " + i + " sent at " + new java.util.Date().toString();
+ message.setText(messageText);
+ LOG.debug(">>>> Sent [{}]", messageText);
+ p.send(message);
+ }
+
+ // After the first restart we should get all messages sent above
+ restartPrimaryBroker();
+ int messagesReceived = readAllMessages();
+ assertEquals(messagesSent, messagesReceived);
+
+ // This time there should be no messages on this queue
+ restartPrimaryBroker();
+ messagesReceived = readAllMessages();
+ assertEquals(0, messagesReceived);
+ }
+
+ @Test(timeout = 60000)
+ public void testMessagesAreAckedAMQPProducer() throws Exception {
+ int messagesSent = 3;
+
+ connection = createAmqpConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ TextMessage message = null;
+ for (int i=0; i < messagesSent; i++) {
+ message = session.createTextMessage();
+ String messageText = "Hello " + i + " sent at " + new java.util.Date().toString();
+ message.setText(messageText);
+ LOG.debug(">>>> Sent [{}]", messageText);
+ producer.send(message);
+ }
+
+ connection.close();
+
+ // After the first restart we should get all messages sent above
+ restartPrimaryBroker();
+ int messagesReceived = readAllMessages();
+ assertEquals(messagesSent, messagesReceived);
+
+ // This time there should be no messages on this queue
+ restartPrimaryBroker();
+ messagesReceived = readAllMessages();
+ assertEquals(0, messagesReceived);
+ }
+
+ private int readAllMessages() throws Exception {
+ return readAllMessages(null);
+ }
+
+ private int readAllMessages(String selector) throws Exception {
+ Connection connection = createAmqpConnection();
+ connection.start();
+ try {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(name.getMethodName());
+ int messagesReceived = 0;
+ MessageConsumer consumer;
+
+ if (selector == null) {
+ consumer = session.createConsumer(queue);
+ } else {
+ consumer = session.createConsumer(queue, selector);
+ }
+
+ Message msg = consumer.receive(5000);
+ while (msg != null) {
+ assertNotNull(msg);
+ assertTrue(msg instanceof TextMessage);
+ TextMessage textMessage = (TextMessage) msg;
+ LOG.debug(">>>> Received [{}]", textMessage.getText());
+ messagesReceived++;
+ msg = consumer.receive(5000);
+ }
+
+ consumer.close();
+ return messagesReceived;
+ } finally {
+ connection.close();
+ }
+ }
+
+ @Test(timeout=30000)
+ public void testSelectors() throws Exception{
+ connection = createAmqpConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageProducer p = session.createProducer(queue);
+
+ TextMessage message = session.createTextMessage();
+ message.setText("hello");
+ p.send(message, DeliveryMode.PERSISTENT, 5, 0);
+
+ message = session.createTextMessage();
+ message.setText("hello + 9");
+ p.send(message, DeliveryMode.PERSISTENT, 9, 0);
+
+ QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(2, proxy.getQueueSize());
+
+ MessageConsumer consumer = session.createConsumer(queue, "JMSPriority > 8");
+ Message msg = consumer.receive(5000);
+ assertNotNull(msg);
+ assertTrue(msg instanceof TextMessage);
+ assertEquals("hello + 9", ((TextMessage) msg).getText());
+ assertNull(consumer.receive(1000));
+ }
+
+ @Test(timeout=90000, expected=JMSSecurityException.class)
+ public void testConsumerNotAuthorized() throws Exception{
+ connection = createAmqpConnection("guest", "password");
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("USERS." + name.getMethodName());
+ session.createConsumer(queue);
+ }
+
+ @Test(timeout=90000, expected=InvalidSelectorException.class)
+ public void testInvalidSelector() throws Exception{
+ connection = createAmqpConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(name.getMethodName());
+ session.createConsumer(queue, "3+5");
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageGroupTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageGroupTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageGroupTest.java
new file mode 100644
index 0000000..395edfe
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageGroupTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JmsMessageGroupTest extends AmqpTestSupport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JmsMessageGroupTest.class);
+
+ @Ignore // TODO - FIXME
+ @Test(timeout = 60000)
+ public void testGroupedMessagesDeliveredToOnlyOneConsumer() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageConsumer consumer1 = session.createConsumer(queue);
+ MessageProducer producer = session.createProducer(queue);
+
+ // Send the messages.
+ for (int i = 0; i < 4; i++) {
+ TextMessage message = session.createTextMessage("message " + i);
+ message.setStringProperty("JMSXGroupID", "TEST-GROUP");
+ message.setIntProperty("JMSXGroupSeq", i + 1);
+ LOG.info("sending message: " + message);
+ producer.send(message);
+ }
+
+ // All the messages should have been sent down connection 1.. just get
+ // the first 3
+ for (int i = 0; i < 3; i++) {
+ TextMessage m1 = (TextMessage) consumer1.receive(500);
+ assertNotNull("m1 is null for index: " + i, m1);
+ assertEquals(m1.getIntProperty("JMSXGroupSeq"), i + 1);
+ }
+
+ // Setup a second connection
+ Connection connection1 = createAmqpConnection();
+ connection1.start();
+ Session session2 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer2 = session2.createConsumer(queue);
+
+ // Close the first consumer.
+ consumer1.close();
+
+ // The last messages should now go the the second consumer.
+ for (int i = 0; i < 1; i++) {
+ TextMessage m1 = (TextMessage) consumer2.receive(500);
+ assertNotNull("m1 is null for index: " + i, m1);
+ assertEquals(m1.getIntProperty("JMSXGroupSeq"), 4 + i);
+ }
+
+ // assert that there are no other messages left for the consumer 2
+ Message m = consumer2.receive(100);
+ assertNull("consumer 2 has some messages left", m);
+ connection1.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java
new file mode 100644
index 0000000..dde6451
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.Enumeration;
+
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test Basic Queue Browser implementation.
+ */
+public class JmsQueueBrowserTest extends AmqpTestSupport {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(JmsQueueBrowserTest.class);
+
+ @Test(timeout = 60000)
+ public void testCreateQueueBrowser() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ Queue queue = session.createQueue(name.getMethodName());
+ session.createConsumer(queue).close();
+
+ QueueBrowser browser = session.createBrowser(queue);
+ assertNotNull(browser);
+
+ QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(0, proxy.getQueueSize());
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test(timeout = 60000)
+ public void testNoMessagesBrowserHasNoElements() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ Queue queue = session.createQueue(name.getMethodName());
+ session.createConsumer(queue).close();
+
+ QueueBrowser browser = session.createBrowser(queue);
+ assertNotNull(browser);
+
+ QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(0, proxy.getQueueSize());
+
+ Enumeration enumeration = browser.getEnumeration();
+ assertFalse(enumeration.hasMoreElements());
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test(timeout = 60000)
+ public void testBrowseAllInQueue() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ Queue queue = session.createQueue(name.getMethodName());
+ sendToAmqQueue(5);
+
+ QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(5, proxy.getQueueSize());
+
+ QueueBrowser browser = session.createBrowser(queue);
+ assertNotNull(browser);
+ Enumeration enumeration = browser.getEnumeration();
+ int count = 0;
+ while (enumeration.hasMoreElements()) {
+ Message msg = (Message) enumeration.nextElement();
+ assertNotNull(msg);
+ LOG.debug("Recv: {}", msg);
+ count++;
+ }
+ assertFalse(enumeration.hasMoreElements());
+ assertEquals(5, count);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test(timeout = 90000)
+ public void testBrowseAllInQueueSmallPrefetch() throws Exception {
+ connection = createAmqpConnection();
+ ((JmsConnection) connection).getPrefetchPolicy().setQueueBrowserPrefetch(10);
+ connection.start();
+
+ final int MSG_COUNT = 30;
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ Queue queue = session.createQueue(name.getMethodName());
+ sendToAmqQueue(MSG_COUNT);
+
+ QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(MSG_COUNT, proxy.getQueueSize());
+
+ QueueBrowser browser = session.createBrowser(queue);
+ assertNotNull(browser);
+ Enumeration enumeration = browser.getEnumeration();
+ int count = 0;
+ while (enumeration.hasMoreElements()) {
+ Message msg = (Message) enumeration.nextElement();
+ assertNotNull(msg);
+ LOG.debug("Recv: {}", msg);
+ count++;
+ }
+ assertFalse(enumeration.hasMoreElements());
+ assertEquals(MSG_COUNT, count);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
new file mode 100644
index 0000000..eef250b
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class JmsZeroPrefetchTest extends AmqpTestSupport {
+
+ @Test(timeout=60000, expected=JMSException.class)
+ public void testCannotUseMessageListener() throws Exception {
+ connection = createAmqpConnection();
+ ((JmsConnection)connection).getPrefetchPolicy().setAll(0);
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ MessageListener listener = new MessageListener() {
+
+ @Override
+ public void onMessage(Message message) {
+ }
+ };
+
+ consumer.setMessageListener(listener);
+ }
+
+ @Test(timeout = 60000)
+ public void testPullConsumerWorks() throws Exception {
+ connection = createAmqpConnection();
+ ((JmsConnection)connection).getPrefetchPolicy().setAll(0);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Hello World!"));
+
+ // now lets receive it
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message answer = consumer.receive(5000);
+ assertNotNull("Should have received a message!", answer);
+ // check if method will return at all and will return a null
+ answer = consumer.receive(1);
+ assertNull("Should have not received a message!", answer);
+ answer = consumer.receiveNoWait();
+ assertNull("Should have not received a message!", answer);
+ }
+
+ @Ignore // ActiveMQ doesn't honor link credit.
+ @Test(timeout = 60000)
+ public void testTwoConsumers() throws Exception {
+ connection = createAmqpConnection();
+ ((JmsConnection)connection).getPrefetchPolicy().setAll(0);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(name.getMethodName());
+
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Msg1"));
+ producer.send(session.createTextMessage("Msg2"));
+
+ // now lets receive it
+ MessageConsumer consumer1 = session.createConsumer(queue);
+ MessageConsumer consumer2 = session.createConsumer(queue);
+ TextMessage answer = (TextMessage)consumer1.receive(5000);
+ assertNotNull(answer);
+ assertEquals("Should have received a message!", answer.getText(), "Msg1");
+ answer = (TextMessage)consumer2.receive(5000);
+ assertNotNull(answer);
+ assertEquals("Should have received a message!", answer.getText(), "Msg2");
+
+ answer = (TextMessage)consumer2.receiveNoWait();
+ assertNull("Should have not received a message!", answer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/destinations/JmsTemporaryQueueTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/destinations/JmsTemporaryQueueTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/destinations/JmsTemporaryQueueTest.java
new file mode 100644
index 0000000..1eccaf2
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/destinations/JmsTemporaryQueueTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.destinations;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test functionality of Temporary Queues.
+ */
+public class JmsTemporaryQueueTest extends AmqpTestSupport {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(JmsTemporaryQueueTest.class);
+
+ @Test(timeout = 60000)
+ public void testCreateTemporaryQueue() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ TemporaryQueue queue = session.createTemporaryQueue();
+ session.createConsumer(queue);
+
+ assertEquals(1, brokerService.getAdminView().getTemporaryQueues().length);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/destinations/JmsTemporaryTopicTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/destinations/JmsTemporaryTopicTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/destinations/JmsTemporaryTopicTest.java
new file mode 100644
index 0000000..95e3303
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/destinations/JmsTemporaryTopicTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.destinations;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import javax.jms.Session;
+import javax.jms.TemporaryTopic;
+
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test functionality of Temporary Topics
+ */
+public class JmsTemporaryTopicTest extends AmqpTestSupport {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(JmsTemporaryTopicTest.class);
+
+ // Temp Topics not yet supported on the Broker.
+ @Ignore
+ @Test(timeout = 60000)
+ public void testCreateTemporaryTopic() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ TemporaryTopic topic = session.createTemporaryTopic();
+ session.createConsumer(topic);
+
+ assertEquals(1, brokerService.getAdminView().getTemporaryTopics().length);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java
new file mode 100644
index 0000000..77cf6ce
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.discovery;
+
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.JmsConnectionListener;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.apache.qpid.jms.support.Wait;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test that a Broker using AMQP can be discovered and JMS operations can be performed.
+ */
+public class JmsAmqpDiscoveryTest extends AmqpTestSupport implements JmsConnectionListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JmsAmqpDiscoveryTest.class);
+
+ private CountDownLatch interrupted;
+ private CountDownLatch restored;
+ private JmsConnection jmsConnection;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ interrupted = new CountDownLatch(1);
+ restored = new CountDownLatch(1);
+ }
+
+ @Test(timeout=60000)
+ public void testRunningBrokerIsDiscovered() throws Exception {
+ connection = createConnection();
+ connection.start();
+
+ assertTrue("connection never connected.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return jmsConnection.isConnected();
+ }
+ }));
+ }
+
+ @Test(timeout=60000)
+ public void testConnectionFailsWhenBrokerGoesDown() throws Exception {
+ connection = createConnection();
+ connection.start();
+
+ assertTrue("connection never connected.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return jmsConnection.isConnected();
+ }
+ }));
+
+ LOG.info("Connection established, stopping broker.");
+ stopPrimaryBroker();
+
+ assertTrue("Interrupted event never fired", interrupted.await(30, TimeUnit.SECONDS));
+ }
+
+ @Test(timeout=60000)
+ public void testConnectionRestoresAfterBrokerRestarted() throws Exception {
+ connection = createConnection();
+ connection.start();
+
+ assertTrue("connection never connected.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return jmsConnection.isConnected();
+ }
+ }));
+
+ stopPrimaryBroker();
+ assertTrue(interrupted.await(20, TimeUnit.SECONDS));
+ startPrimaryBroker();
+ assertTrue(restored.await(20, TimeUnit.SECONDS));
+ }
+
+ @Test(timeout=60000)
+ public void testDiscoversAndReconnectsToSecondaryBroker() throws Exception {
+
+ connection = createConnection();
+ connection.start();
+
+ assertTrue("connection never connected.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return jmsConnection.isConnected();
+ }
+ }));
+
+ startNewBroker();
+ stopPrimaryBroker();
+
+ assertTrue(interrupted.await(20, TimeUnit.SECONDS));
+ assertTrue(restored.await(20, TimeUnit.SECONDS));
+ }
+
+ @Override
+ protected boolean isAmqpDiscovery() {
+ return true;
+ }
+
+ protected Connection createConnection() throws Exception {
+ JmsConnectionFactory factory = new JmsConnectionFactory(
+ "discovery:(multicast://default)?maxReconnectDelay=500");
+ connection = factory.createConnection();
+ jmsConnection = (JmsConnection) connection;
+ jmsConnection.addConnectionListener(this);
+ return connection;
+ }
+
+ @Override
+ public void onConnectionFailure(Throwable error) {
+ LOG.info("Connection reported failover: {}", error.getMessage());
+ }
+
+ @Override
+ public void onConnectionInterrupted(URI remoteURI) {
+ LOG.info("Connection reports interrupted. Lost connection to -> {}", remoteURI);
+ interrupted.countDown();
+ }
+
+ @Override
+ public void onConnectionRestored(URI remoteURI) {
+ LOG.info("Connection reports restored. Connected to -> {}", remoteURI);
+ restored.countDown();
+ }
+
+ @Override
+ public void onMessage(JmsInboundMessageDispatch envelope) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsDiscoveryProviderTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsDiscoveryProviderTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsDiscoveryProviderTest.java
new file mode 100644
index 0000000..3d53957
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsDiscoveryProviderTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.discovery;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.qpid.jms.provider.DefaultProviderListener;
+import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.provider.discovery.DiscoveryProviderFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test basic discovery of remote brokers
+ */
+public class JmsDiscoveryProviderTest {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(JmsDiscoveryProviderTest.class);
+
+ @Rule public TestName name = new TestName();
+
+ private BrokerService broker;
+
+ @Before
+ public void setup() throws Exception {
+ broker = createBroker();
+ broker.start();
+ broker.waitUntilStarted();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (broker != null) {
+ broker.stop();
+ broker.waitUntilStopped();
+ broker = null;
+ }
+ }
+
+ @Test(timeout=30000)
+ public void testCreateDiscvoeryProvider() throws Exception {
+ URI discoveryUri = new URI("discovery:multicast://default");
+ Provider provider = DiscoveryProviderFactory.createAsync(discoveryUri);
+ assertNotNull(provider);
+
+ DefaultProviderListener listener = new DefaultProviderListener();
+ provider.setProviderListener(listener);
+ provider.start();
+ provider.close();
+ }
+
+ @Test(timeout=30000, expected=IllegalStateException.class)
+ public void testStartFailsWithNoListener() throws Exception {
+ URI discoveryUri = new URI("discovery:multicast://default");
+ Provider provider =
+ DiscoveryProviderFactory.createAsync(discoveryUri);
+ assertNotNull(provider);
+ provider.start();
+ provider.close();
+ }
+
+ @Test(timeout=30000, expected=IOException.class)
+ public void testCreateFailsWithUnknownAgent() throws Exception {
+ URI discoveryUri = new URI("discovery:unknown://default");
+ Provider provider = DiscoveryProviderFactory.createAsync(discoveryUri);
+ provider.close();
+ }
+
+ protected BrokerService createBroker() throws Exception {
+
+ BrokerService brokerService = new BrokerService();
+ brokerService.setBrokerName("localhost");
+ brokerService.setPersistent(false);
+ brokerService.setAdvisorySupport(false);
+ brokerService.setUseJmx(false);
+
+ TransportConnector connector = brokerService.addConnector("amqp://0.0.0.0:0");
+ connector.setName("amqp");
+ connector.setDiscoveryUri(new URI("multicast://default"));
+
+ return brokerService;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org