You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/09/23 20:20:27 UTC

[03/27] Initial drop of donated AMQP Client Code.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java
new file mode 100644
index 0000000..aa97a60
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.apache.qpid.jms.support.Wait;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test that Session CLIENT_ACKNOWLEDGE works as expected.
+ */
+public class JmsClientAckTest extends AmqpTestSupport {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(JmsClientAckTest.class);
+
+    private Connection connection;
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        connection.close();
+        super.tearDown();
+    }
+
+    @Test(timeout = 60000)
+    public void testAckedMessageAreConsumed() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Hello"));
+
+        final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(1, proxy.getQueueSize());
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+        msg.acknowledge();
+
+        assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return proxy.getQueueSize() == 0;
+            }
+        }));
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testLastMessageAcked() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Hello"));
+        producer.send(session.createTextMessage("Hello2"));
+        producer.send(session.createTextMessage("Hello3"));
+
+        final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(3, proxy.getQueueSize());
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+        msg = consumer.receive(1000);
+        assertNotNull(msg);
+        msg = consumer.receive(1000);
+        assertNotNull(msg);
+        msg.acknowledge();
+
+        assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return proxy.getQueueSize() == 0;
+            }
+        }));
+    }
+
+    @Test(timeout = 60000)
+    public void testUnAckedMessageAreNotConsumedOnSessionClose() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Hello"));
+
+        final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(1, proxy.getQueueSize());
+
+        // Consume the message...but don't ack it.
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+        session.close();
+
+        assertEquals(1, proxy.getQueueSize());
+
+        // Consume the message...and this time we ack it.
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        consumer = session.createConsumer(queue);
+        msg = consumer.receive(2000);
+        assertNotNull(msg);
+        msg.acknowledge();
+
+        assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return proxy.getQueueSize() == 0;
+            }
+        }));
+    }
+
+    @Test(timeout = 60000)
+    public void testAckedMessageAreConsumedByAsync() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Hello"));
+
+        final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(1, proxy.getQueueSize());
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        consumer.setMessageListener(new MessageListener() {
+
+            @Override
+            public void onMessage(Message message) {
+                try {
+                    message.acknowledge();
+                } catch (JMSException e) {
+                    LOG.warn("Unexpected exception on acknowledge: {}", e.getMessage());
+                }
+            }
+        });
+
+        assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return proxy.getQueueSize() == 0;
+            }
+        }));
+    }
+
+    @Test(timeout = 60000)
+    public void testUnAckedAsyncMessageAreNotConsumedOnSessionClose() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Hello"));
+
+        final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(1, proxy.getQueueSize());
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        consumer.setMessageListener(new MessageListener() {
+
+            @Override
+            public void onMessage(Message message) {
+                // Don't ack the message.
+            }
+        });
+
+        session.close();
+        assertEquals(1, proxy.getQueueSize());
+
+        // Now we consume and ack the Message.
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(2000);
+        assertNotNull(msg);
+        msg.acknowledge();
+
+        assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return proxy.getQueueSize() == 0;
+            }
+        }));
+    }
+
+    @Test(timeout=90000)
+    public void testAckMarksAllConsumerMessageAsConsumed() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+
+        final int MSG_COUNT = 30;
+        final AtomicReference<Message> lastMessage = new AtomicReference<Message>();
+        final CountDownLatch done = new CountDownLatch(MSG_COUNT);
+
+        MessageListener myListener = new MessageListener() {
+
+            @Override
+            public void onMessage(Message message) {
+                lastMessage.set(message);
+                done.countDown();
+            }
+        };
+
+        MessageConsumer consumer1 = session.createConsumer(queue);
+        consumer1.setMessageListener(myListener);
+        MessageConsumer consumer2 = session.createConsumer(queue);
+        consumer2.setMessageListener(myListener);
+        MessageConsumer consumer3 = session.createConsumer(queue);
+        consumer3.setMessageListener(myListener);
+
+        MessageProducer producer = session.createProducer(queue);
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            producer.send(session.createTextMessage("Hello: " + i));
+        }
+
+        final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(MSG_COUNT, proxy.getQueueSize());
+
+        assertTrue("Failed to consume all messages.", done.await(20, TimeUnit.SECONDS));
+        assertNotNull(lastMessage.get());
+        assertEquals(MSG_COUNT, proxy.getInFlightCount());
+
+        lastMessage.get().acknowledge();
+
+        assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return proxy.getQueueSize() == 0;
+            }
+        }));
+    }
+
+    @Test(timeout=60000)
+    public void testUnackedAreRecovered() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+        Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = consumerSession.createQueue(name.getMethodName());
+        MessageConsumer consumer = consumerSession.createConsumer(queue);
+        Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(queue);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+        TextMessage sent1 = producerSession.createTextMessage();
+        sent1.setText("msg1");
+        producer.send(sent1);
+        TextMessage sent2 = producerSession.createTextMessage();
+        sent1.setText("msg2");
+        producer.send(sent2);
+        TextMessage sent3 = producerSession.createTextMessage();
+        sent1.setText("msg3");
+        producer.send(sent3);
+
+        consumer.receive(5000);
+        Message rec2 = consumer.receive(5000);
+        consumer.receive(5000);
+        rec2.acknowledge();
+
+        TextMessage sent4 = producerSession.createTextMessage();
+        sent4.setText("msg4");
+        producer.send(sent4);
+
+        Message rec4 = consumer.receive(5000);
+        assertNotNull(rec4);
+        assertTrue(rec4.equals(sent4));
+        consumerSession.recover();
+        rec4 = consumer.receive(5000);
+        assertNotNull(rec4);
+        assertTrue(rec4.equals(sent4));
+        assertTrue(rec4.getJMSRedelivered());
+        rec4.acknowledge();
+    }
+
+    @Test(timeout=60000)
+    public void testRecoverRedelivery() throws Exception {
+        final CountDownLatch redelivery = new CountDownLatch(6);
+        connection = createAmqpConnection();
+        connection.start();
+
+        final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                try {
+                    LOG.info("Got message: " + message.getJMSMessageID());
+                    if (message.getJMSRedelivered()) {
+                        LOG.info("It's a redelivery.");
+                        redelivery.countDown();
+                    }
+                    LOG.info("calling recover() on the session to force redelivery.");
+                    session.recover();
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        connection.start();
+
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("test"));
+
+        assertTrue("we got 6 redeliveries", redelivery.await(20, TimeUnit.SECONDS));
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java
new file mode 100644
index 0000000..018d6bc
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.apache.qpid.jms.support.Wait;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test for Message priority ordering.
+ */
+public class JmsConsumerPriorityDispatchTest extends AmqpTestSupport {
+
+    private Connection connection;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        connection = createAmqpConnection();
+    }
+
+    @Test(timeout = 60000)
+    public void testPrefetchedMessageArePriorityOrdered() throws Exception {
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageProducer producer = session.createProducer(queue);
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message message = null;
+
+        for (int i = 0; i < 10; i++) {
+            message = session.createTextMessage();
+            producer.setPriority(i);
+            producer.send(message);
+        }
+
+        // Wait for all sent to be dispatched.
+        final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return proxy.getInFlightCount() == 10;
+            }
+        });
+
+        // We need to make sure that all messages are in the prefetch buffer.
+        TimeUnit.SECONDS.sleep(4);
+
+        for (int i = 9; i >= 0; i--) {
+            message = consumer.receive(5000);
+            assertNotNull(message);
+            assertEquals(i, message.getJMSPriority());
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testPrefetchedMessageAreNotPriorityOrdered() throws Exception {
+        // We are assuming that Broker side priority support is not enabled in the create
+        // broker method in AmqpTestSupport.  If that changes then this test will sometimes
+        // fail.
+        ((JmsConnection) connection).setMessagePrioritySupported(false);
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageProducer producer = session.createProducer(queue);
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message message = null;
+
+        for (int i = 0; i < 10; i++) {
+            message = session.createTextMessage();
+            producer.setPriority(i);
+            producer.send(message);
+        }
+
+        // Wait for all sent to be dispatched.
+        final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return proxy.getInFlightCount() == 10;
+            }
+        });
+
+        // We need to make sure that all messages are in the prefetch buffer.
+        TimeUnit.SECONDS.sleep(4);
+
+        for (int i = 0; i < 10; i++) {
+            message = consumer.receive(5000);
+            assertNotNull(message);
+            assertEquals(i, message.getJMSPriority());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsCreateResourcesInOnMessageTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsCreateResourcesInOnMessageTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsCreateResourcesInOnMessageTest.java
new file mode 100644
index 0000000..dbbee26
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsCreateResourcesInOnMessageTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.apache.qpid.jms.support.Wait;
+import org.junit.Test;
+
+/**
+ * Test the case where messages are sent and consumers are created in onMessage.
+ */
+public class JmsCreateResourcesInOnMessageTest extends AmqpTestSupport {
+
+    @Test(timeout = 60000)
+    public void testCreateProducerInOnMessage() throws Exception {
+        Connection connection = createAmqpConnection();
+        connection.start();
+
+        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(queue);
+        final Queue forwardQ = session.createQueue(name.getMethodName() + "-forwarded");
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("TEST-MESSAGE"));
+        producer.close();
+
+        final QueueViewMBean proxy = getProxyToQueue(queue.getQueueName());
+        assertEquals(1, proxy.getQueueSize());
+
+        consumer.setMessageListener(new MessageListener() {
+
+            @Override
+            public void onMessage(Message message) {
+
+                try {
+                    LOG.debug("Received async message: {}", message);
+                    MessageProducer producer = session.createProducer(forwardQ);
+                    producer.send(message);
+                    LOG.debug("forwarded async message: {}", message);
+                } catch (Throwable e) {
+                    LOG.debug("Caught exception: {}", e);
+                    throw new RuntimeException(e.getMessage());
+                }
+            }
+        });
+
+        assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return proxy.getQueueSize() == 0;
+            }
+        }));
+
+        final QueueViewMBean proxy2 = getProxyToQueue(forwardQ.getQueueName());
+        assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return proxy2.getQueueSize() == 1;
+            }
+        }));
+
+        connection.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java
new file mode 100644
index 0000000..5994184
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.broker.jmx.TopicViewMBean;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test Durable Topic Subscriber functionality.
+ */
+public class JmsDurableSubscriberTest extends AmqpTestSupport {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(JmsMessageConsumerTest.class);
+
+    @Override
+    public boolean isPersistent() {
+        return true;
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateDuableSubscriber() throws Exception {
+        connection = createAmqpConnection();
+        connection.setClientID("DURABLE-AMQP");
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Topic topic = session.createTopic(name.getMethodName());
+        session.createDurableSubscriber(topic, name.getMethodName() + "-subscriber");
+
+        TopicViewMBean proxy = getProxyToTopic(name.getMethodName());
+        assertEquals(0, proxy.getQueueSize());
+
+        assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
+    }
+
+    @Test(timeout = 60000)
+    public void testDurableGoesOfflineAndReturns() throws Exception {
+        connection = createAmqpConnection();
+        connection.setClientID("DURABLE-AMQP");
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Topic topic = session.createTopic(name.getMethodName());
+        TopicSubscriber subscriber = session.createDurableSubscriber(topic, name.getMethodName() + "-subscriber");
+
+        TopicViewMBean proxy = getProxyToTopic(name.getMethodName());
+        assertEquals(0, proxy.getQueueSize());
+
+        assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        subscriber.close();
+
+        assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        subscriber = session.createDurableSubscriber(topic, name.getMethodName() + "-subscriber");
+
+        assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+    }
+
+    @Test(timeout = 60000)
+    public void testOfflineSubscriberGetsItsMessages() throws Exception {
+        connection = createAmqpConnection();
+        connection.setClientID("DURABLE-AMQP");
+        connection.start();
+
+        final int MSG_COUNT = 5;
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Topic topic = session.createTopic(name.getMethodName());
+        TopicSubscriber subscriber = session.createDurableSubscriber(topic, name.getMethodName() + "-subscriber");
+
+        TopicViewMBean proxy = getProxyToTopic(name.getMethodName());
+        assertEquals(0, proxy.getQueueSize());
+        assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        subscriber.close();
+
+        assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        MessageProducer producer = session.createProducer(topic);
+        for (int i = 0; i < MSG_COUNT; i++) {
+            producer.send(session.createTextMessage("Message: " + i));
+        }
+        producer.close();
+
+        LOG.info("Bringing offline subscription back online.");
+        subscriber = session.createDurableSubscriber(topic, name.getMethodName() + "-subscriber");
+
+        assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        final CountDownLatch messages = new CountDownLatch(MSG_COUNT);
+        subscriber.setMessageListener(new MessageListener() {
+
+            @Override
+            public void onMessage(Message message) {
+                LOG.info("Consumer got a message: {}", message);
+                messages.countDown();
+            }
+        });
+
+        assertTrue("Only recieved messages: " + messages.getCount(), messages.await(30, TimeUnit.SECONDS));
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerClosedTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerClosedTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerClosedTest.java
new file mode 100644
index 0000000..8fbcbad
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerClosedTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.consumer;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+
+/**
+ * Tests MessageConsumer method contracts after the MessageConsumer is closed.
+ */
+public class JmsMessageConsumerClosedTest extends AmqpTestSupport {
+
+    protected MessageConsumer consumer;
+
+    protected MessageConsumer createConsumer() throws Exception {
+        connection = createAmqpConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue destination = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(destination);
+        consumer.close();
+        return consumer;
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        consumer = createConsumer();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testGetMessageSelectorFails() throws JMSException {
+        consumer.getMessageSelector();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testGetMessageListenerFails() throws JMSException {
+        consumer.getMessageListener();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testSetMessageListenerFails() throws JMSException {
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+            }
+        });
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testRreceiveFails() throws JMSException {
+        consumer.receive();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testRreceiveTimedFails() throws JMSException {
+        consumer.receive(11);
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testRreceiveNoWaitFails() throws JMSException {
+        consumer.receiveNoWait();
+    }
+
+    @Test(timeout=30000)
+    public void testClose() throws JMSException {
+        consumer.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerFailedTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerFailedTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerFailedTest.java
new file mode 100644
index 0000000..85b4b37
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerFailedTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.consumer;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.support.Wait;
+
+/**
+ * Tests MessageConsumer method contracts after the MessageConsumer connection fails.
+ */
+public class JmsMessageConsumerFailedTest extends JmsMessageConsumerClosedTest {
+
+    @Override
+    protected MessageConsumer createConsumer() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+        connection = createAmqpConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue destination = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(destination);
+        connection.setExceptionListener(new ExceptionListener() {
+
+            @Override
+            public void onException(JMSException exception) {
+                latch.countDown();
+            }
+        });
+        connection.start();
+        stopPrimaryBroker();
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        final JmsConnection jmsConnection = (JmsConnection) connection;
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return !jmsConnection.isConnected();
+            }
+        }));
+        return consumer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerTest.java
new file mode 100644
index 0000000..609b46a
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerTest.java
@@ -0,0 +1,467 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.JMSSecurityException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.jmx.TopicViewMBean;
+import org.apache.qpid.jms.JmsMessageAvailableListener;
+import org.apache.qpid.jms.JmsMessageConsumer;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.apache.qpid.jms.support.Wait;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test for basic JMS MessageConsumer functionality.
+ */
+public class JmsMessageConsumerTest extends AmqpTestSupport {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(JmsMessageConsumerTest.class);
+
+    @Override
+    public boolean isPersistent() {
+        return true;
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateMessageConsumer() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(name.getMethodName());
+        session.createConsumer(queue);
+
+        QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(0, proxy.getQueueSize());
+    }
+
+    @Test(timeout = 60000)
+    public void testSyncConsumeFromQueue() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        sendToAmqQueue(1);
+
+        final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(1, proxy.getQueueSize());
+
+        assertNotNull("Failed to receive any message.", consumer.receive(2000));
+
+        assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return proxy.getQueueSize() == 0;
+            }
+        }));
+    }
+
+    @Test(timeout = 60000)
+    public void testSyncConsumeFromTopic() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Topic topic = session.createTopic(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(topic);
+
+        sendToAmqTopic(1);
+
+        final TopicViewMBean proxy = getProxyToTopic(name.getMethodName());
+        //assertEquals(1, proxy.getQueueSize());
+
+        assertNotNull("Failed to receive any message.", consumer.receive(2000));
+
+        assertTrue("Published message not consumed.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return proxy.getQueueSize() == 0;
+            }
+        }));
+    }
+
+    @Test(timeout = 60000)
+    public void testMessageAvailableConsumer() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+
+        final int MSG_COUNT = 10;
+        final AtomicInteger available = new AtomicInteger();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(queue);
+        ((JmsMessageConsumer) consumer).setAvailableListener(new JmsMessageAvailableListener() {
+
+            @Override
+            public void onMessageAvailable(MessageConsumer consumer) {
+                available.incrementAndGet();
+            }
+        });
+
+        sendToAmqQueue(MSG_COUNT);
+
+        final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(MSG_COUNT, proxy.getQueueSize());
+
+        assertTrue("Listener not notified of correct number of messages.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return available.get() == MSG_COUNT;
+            }
+        }));
+
+        // All should be immediately ready for consume.
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            assertNotNull(consumer.receiveNoWait());
+        }
+
+        assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return proxy.getQueueSize() == 0;
+            }
+        }));
+    }
+
+    /**
+     * Test to check if consumer thread wakes up inside a receive(timeout) after
+     * a message is dispatched to the consumer.  We do a long poll here to ensure
+     * that a blocked receive with timeout does eventually get a Message.  We don't
+     * want to test the short poll and retry case here since that's not what we are
+     * testing.
+     *
+     * @throws Exception
+     */
+    @Test(timeout=60000)
+    public void testConsumerReceiveBeforeMessageDispatched() throws Exception {
+        final Connection connection = createAmqpConnection();
+        this.connection = connection;
+        connection.start();
+
+        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Queue queue = session.createQueue(name.getMethodName());
+
+        Thread t = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    TimeUnit.SECONDS.sleep(10);
+                    MessageProducer producer = session.createProducer(queue);
+                    producer.send(session.createTextMessage("Hello"));
+                } catch (Exception e) {
+                    LOG.warn("Caught during message send: {}", e.getMessage());
+                }
+            }
+        };
+        t.start();
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(60000);
+        assertNotNull(msg);
+    }
+
+    @Test(timeout=60000)
+    public void testAsynchronousMessageConsumption() throws Exception {
+        final int msgCount = 4;
+        final Connection connection = createAmqpConnection();
+        final AtomicInteger counter = new AtomicInteger(0);
+        final CountDownLatch done = new CountDownLatch(1);
+        this.connection = connection;
+
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue destination = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message m) {
+                LOG.debug("Async consumer got Message: {}", m);
+                counter.incrementAndGet();
+                if (counter.get() == msgCount) {
+                    done.countDown();
+                }
+            }
+        });
+
+        sendToAmqQueue(msgCount);
+        assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
+        TimeUnit.SECONDS.sleep(1);
+        assertEquals(msgCount, counter.get());
+    }
+
+    @Test(timeout=60000)
+    public void testSyncReceiveFailsWhenListenerSet() throws Exception {
+        final int msgCount = 4;
+        final Connection connection = createAmqpConnection();
+        final AtomicInteger counter = new AtomicInteger(0);
+        final CountDownLatch done = new CountDownLatch(1);
+        this.connection = connection;
+
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue destination = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message m) {
+                LOG.debug("Async consumer got Message: {}", m);
+                counter.incrementAndGet();
+                if (counter.get() == msgCount) {
+                    done.countDown();
+                }
+            }
+        });
+
+        try {
+            consumer.receive();
+            fail("Should have thrown an exception.");
+        } catch (JMSException ex) {
+        }
+
+        try {
+            consumer.receive(1000);
+            fail("Should have thrown an exception.");
+        } catch (JMSException ex) {
+        }
+
+        try {
+            consumer.receiveNoWait();
+            fail("Should have thrown an exception.");
+        } catch (JMSException ex) {
+        }
+    }
+
+    @Test(timeout=60000)
+    public void testSetMessageListenerAfterStartAndSend() throws Exception {
+        final int msgCount = 4;
+        final Connection connection = createAmqpConnection();
+        final AtomicInteger counter = new AtomicInteger(0);
+        final CountDownLatch done = new CountDownLatch(1);
+        this.connection = connection;
+
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue destination = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(destination);
+        sendToAmqQueue(msgCount);
+
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message m) {
+                LOG.debug("Async consumer got Message: {}", m);
+                counter.incrementAndGet();
+                if (counter.get() == msgCount) {
+                    done.countDown();
+                }
+            }
+        });
+
+        assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
+        TimeUnit.SECONDS.sleep(1);
+        assertEquals(msgCount, counter.get());
+    }
+
+    @Test(timeout=60000)
+    public void testNoReceivedMessagesWhenConnectionNotStarted() throws Exception {
+        connection = createAmqpConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue destination = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(destination);
+        sendToAmqQueue(3);
+        assertNull(consumer.receive(2000));
+    }
+
+    @Test(timeout = 60000)
+    public void testMessagesAreAckedAMQProducer() throws Exception {
+        int messagesSent = 3;
+        assertTrue(brokerService.isPersistent());
+
+        connection = createActiveMQConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageProducer p = session.createProducer(queue);
+        TextMessage message = null;
+        for (int i=0; i < messagesSent; i++) {
+            message = session.createTextMessage();
+            String messageText = "Hello " + i + " sent at " + new java.util.Date().toString();
+            message.setText(messageText);
+            LOG.debug(">>>> Sent [{}]", messageText);
+            p.send(message);
+        }
+
+        // After the first restart we should get all messages sent above
+        restartPrimaryBroker();
+        int messagesReceived = readAllMessages();
+        assertEquals(messagesSent, messagesReceived);
+
+        // This time there should be no messages on this queue
+        restartPrimaryBroker();
+        messagesReceived = readAllMessages();
+        assertEquals(0, messagesReceived);
+    }
+
+    @Test(timeout = 60000)
+    public void testMessagesAreAckedAMQPProducer() throws Exception {
+        int messagesSent = 3;
+
+        connection = createAmqpConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageProducer producer = session.createProducer(queue);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        TextMessage message = null;
+        for (int i=0; i < messagesSent; i++) {
+            message = session.createTextMessage();
+            String messageText = "Hello " + i + " sent at " + new java.util.Date().toString();
+            message.setText(messageText);
+            LOG.debug(">>>> Sent [{}]", messageText);
+            producer.send(message);
+        }
+
+        connection.close();
+
+        // After the first restart we should get all messages sent above
+        restartPrimaryBroker();
+        int messagesReceived = readAllMessages();
+        assertEquals(messagesSent, messagesReceived);
+
+        // This time there should be no messages on this queue
+        restartPrimaryBroker();
+        messagesReceived = readAllMessages();
+        assertEquals(0, messagesReceived);
+    }
+
+    private int readAllMessages() throws Exception {
+        return readAllMessages(null);
+    }
+
+    private int readAllMessages(String selector) throws Exception {
+        Connection connection = createAmqpConnection();
+        connection.start();
+        try {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue(name.getMethodName());
+            int messagesReceived = 0;
+            MessageConsumer consumer;
+
+            if (selector == null) {
+                consumer = session.createConsumer(queue);
+            } else {
+                consumer = session.createConsumer(queue, selector);
+            }
+
+            Message msg = consumer.receive(5000);
+            while (msg != null) {
+                assertNotNull(msg);
+                assertTrue(msg instanceof TextMessage);
+                TextMessage textMessage = (TextMessage) msg;
+                LOG.debug(">>>> Received [{}]", textMessage.getText());
+                messagesReceived++;
+                msg = consumer.receive(5000);
+            }
+
+            consumer.close();
+            return messagesReceived;
+        } finally {
+            connection.close();
+        }
+    }
+
+    @Test(timeout=30000)
+    public void testSelectors() throws Exception{
+        connection = createAmqpConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageProducer p = session.createProducer(queue);
+
+        TextMessage message = session.createTextMessage();
+        message.setText("hello");
+        p.send(message, DeliveryMode.PERSISTENT, 5, 0);
+
+        message = session.createTextMessage();
+        message.setText("hello + 9");
+        p.send(message, DeliveryMode.PERSISTENT, 9, 0);
+
+        QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(2, proxy.getQueueSize());
+
+        MessageConsumer consumer = session.createConsumer(queue, "JMSPriority > 8");
+        Message msg = consumer.receive(5000);
+        assertNotNull(msg);
+        assertTrue(msg instanceof TextMessage);
+        assertEquals("hello + 9", ((TextMessage) msg).getText());
+        assertNull(consumer.receive(1000));
+    }
+
+    @Test(timeout=90000, expected=JMSSecurityException.class)
+    public void testConsumerNotAuthorized() throws Exception{
+        connection = createAmqpConnection("guest", "password");
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("USERS." + name.getMethodName());
+        session.createConsumer(queue);
+    }
+
+    @Test(timeout=90000, expected=InvalidSelectorException.class)
+    public void testInvalidSelector() throws Exception{
+        connection = createAmqpConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+        session.createConsumer(queue, "3+5");
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageGroupTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageGroupTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageGroupTest.java
new file mode 100644
index 0000000..395edfe
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageGroupTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JmsMessageGroupTest extends AmqpTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JmsMessageGroupTest.class);
+
+    @Ignore  // TODO - FIXME
+    @Test(timeout = 60000)
+    public void testGroupedMessagesDeliveredToOnlyOneConsumer() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageConsumer consumer1 = session.createConsumer(queue);
+        MessageProducer producer = session.createProducer(queue);
+
+        // Send the messages.
+        for (int i = 0; i < 4; i++) {
+            TextMessage message = session.createTextMessage("message " + i);
+            message.setStringProperty("JMSXGroupID", "TEST-GROUP");
+            message.setIntProperty("JMSXGroupSeq", i + 1);
+            LOG.info("sending message: " + message);
+            producer.send(message);
+        }
+
+        // All the messages should have been sent down connection 1.. just get
+        // the first 3
+        for (int i = 0; i < 3; i++) {
+            TextMessage m1 = (TextMessage) consumer1.receive(500);
+            assertNotNull("m1 is null for index: " + i, m1);
+            assertEquals(m1.getIntProperty("JMSXGroupSeq"), i + 1);
+        }
+
+        // Setup a second connection
+        Connection connection1 = createAmqpConnection();
+        connection1.start();
+        Session session2 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer2 = session2.createConsumer(queue);
+
+        // Close the first consumer.
+        consumer1.close();
+
+        // The last messages should now go the the second consumer.
+        for (int i = 0; i < 1; i++) {
+            TextMessage m1 = (TextMessage) consumer2.receive(500);
+            assertNotNull("m1 is null for index: " + i, m1);
+            assertEquals(m1.getIntProperty("JMSXGroupSeq"), 4 + i);
+        }
+
+        // assert that there are no other messages left for the consumer 2
+        Message m = consumer2.receive(100);
+        assertNull("consumer 2 has some messages left", m);
+        connection1.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java
new file mode 100644
index 0000000..dde6451
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.Enumeration;
+
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test Basic Queue Browser implementation.
+ */
+public class JmsQueueBrowserTest extends AmqpTestSupport {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(JmsQueueBrowserTest.class);
+
+    @Test(timeout = 60000)
+    public void testCreateQueueBrowser() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(name.getMethodName());
+        session.createConsumer(queue).close();
+
+        QueueBrowser browser = session.createBrowser(queue);
+        assertNotNull(browser);
+
+        QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(0, proxy.getQueueSize());
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Test(timeout = 60000)
+    public void testNoMessagesBrowserHasNoElements() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(name.getMethodName());
+        session.createConsumer(queue).close();
+
+        QueueBrowser browser = session.createBrowser(queue);
+        assertNotNull(browser);
+
+        QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(0, proxy.getQueueSize());
+
+        Enumeration enumeration = browser.getEnumeration();
+        assertFalse(enumeration.hasMoreElements());
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Test(timeout = 60000)
+    public void testBrowseAllInQueue() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(name.getMethodName());
+        sendToAmqQueue(5);
+
+        QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(5, proxy.getQueueSize());
+
+        QueueBrowser browser = session.createBrowser(queue);
+        assertNotNull(browser);
+        Enumeration enumeration = browser.getEnumeration();
+        int count = 0;
+        while (enumeration.hasMoreElements()) {
+            Message msg = (Message) enumeration.nextElement();
+            assertNotNull(msg);
+            LOG.debug("Recv: {}", msg);
+            count++;
+        }
+        assertFalse(enumeration.hasMoreElements());
+        assertEquals(5, count);
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Test(timeout = 90000)
+    public void testBrowseAllInQueueSmallPrefetch() throws Exception {
+        connection = createAmqpConnection();
+        ((JmsConnection) connection).getPrefetchPolicy().setQueueBrowserPrefetch(10);
+        connection.start();
+
+        final int MSG_COUNT = 30;
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(name.getMethodName());
+        sendToAmqQueue(MSG_COUNT);
+
+        QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(MSG_COUNT, proxy.getQueueSize());
+
+        QueueBrowser browser = session.createBrowser(queue);
+        assertNotNull(browser);
+        Enumeration enumeration = browser.getEnumeration();
+        int count = 0;
+        while (enumeration.hasMoreElements()) {
+            Message msg = (Message) enumeration.nextElement();
+            assertNotNull(msg);
+            LOG.debug("Recv: {}", msg);
+            count++;
+        }
+        assertFalse(enumeration.hasMoreElements());
+        assertEquals(MSG_COUNT, count);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
new file mode 100644
index 0000000..eef250b
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class JmsZeroPrefetchTest extends AmqpTestSupport {
+
+    @Test(timeout=60000, expected=JMSException.class)
+    public void testCannotUseMessageListener() throws Exception {
+        connection = createAmqpConnection();
+        ((JmsConnection)connection).getPrefetchPolicy().setAll(0);
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        MessageListener listener = new MessageListener() {
+
+            @Override
+            public void onMessage(Message message) {
+            }
+        };
+
+        consumer.setMessageListener(listener);
+    }
+
+    @Test(timeout = 60000)
+    public void testPullConsumerWorks() throws Exception {
+        connection = createAmqpConnection();
+        ((JmsConnection)connection).getPrefetchPolicy().setAll(0);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Hello World!"));
+
+        // now lets receive it
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message answer = consumer.receive(5000);
+        assertNotNull("Should have received a message!", answer);
+        // check if method will return at all and will return a null
+        answer = consumer.receive(1);
+        assertNull("Should have not received a message!", answer);
+        answer = consumer.receiveNoWait();
+        assertNull("Should have not received a message!", answer);
+    }
+
+    @Ignore // ActiveMQ doesn't honor link credit.
+    @Test(timeout = 60000)
+    public void testTwoConsumers() throws Exception {
+        connection = createAmqpConnection();
+        ((JmsConnection)connection).getPrefetchPolicy().setAll(0);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Msg1"));
+        producer.send(session.createTextMessage("Msg2"));
+
+        // now lets receive it
+        MessageConsumer consumer1 = session.createConsumer(queue);
+        MessageConsumer consumer2 = session.createConsumer(queue);
+        TextMessage answer = (TextMessage)consumer1.receive(5000);
+        assertNotNull(answer);
+        assertEquals("Should have received a message!", answer.getText(), "Msg1");
+        answer = (TextMessage)consumer2.receive(5000);
+        assertNotNull(answer);
+        assertEquals("Should have received a message!", answer.getText(), "Msg2");
+
+        answer = (TextMessage)consumer2.receiveNoWait();
+        assertNull("Should have not received a message!", answer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/destinations/JmsTemporaryQueueTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/destinations/JmsTemporaryQueueTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/destinations/JmsTemporaryQueueTest.java
new file mode 100644
index 0000000..1eccaf2
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/destinations/JmsTemporaryQueueTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.destinations;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test functionality of Temporary Queues.
+ */
+public class JmsTemporaryQueueTest extends AmqpTestSupport {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(JmsTemporaryQueueTest.class);
+
+    @Test(timeout = 60000)
+    public void testCreateTemporaryQueue() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        TemporaryQueue queue = session.createTemporaryQueue();
+        session.createConsumer(queue);
+
+        assertEquals(1, brokerService.getAdminView().getTemporaryQueues().length);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/destinations/JmsTemporaryTopicTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/destinations/JmsTemporaryTopicTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/destinations/JmsTemporaryTopicTest.java
new file mode 100644
index 0000000..95e3303
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/destinations/JmsTemporaryTopicTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.destinations;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import javax.jms.Session;
+import javax.jms.TemporaryTopic;
+
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test functionality of Temporary Topics
+ */
+public class JmsTemporaryTopicTest extends AmqpTestSupport {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(JmsTemporaryTopicTest.class);
+
+    // Temp Topics not yet supported on the Broker.
+    @Ignore
+    @Test(timeout = 60000)
+    public void testCreateTemporaryTopic() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        TemporaryTopic topic = session.createTemporaryTopic();
+        session.createConsumer(topic);
+
+        assertEquals(1, brokerService.getAdminView().getTemporaryTopics().length);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java
new file mode 100644
index 0000000..77cf6ce
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.discovery;
+
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.JmsConnectionListener;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.apache.qpid.jms.support.Wait;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test that a Broker using AMQP can be discovered and JMS operations can be performed.
+ */
+public class JmsAmqpDiscoveryTest extends AmqpTestSupport implements JmsConnectionListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JmsAmqpDiscoveryTest.class);
+
+    private CountDownLatch interrupted;
+    private CountDownLatch restored;
+    private JmsConnection jmsConnection;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+
+        interrupted = new CountDownLatch(1);
+        restored = new CountDownLatch(1);
+    }
+
+    @Test(timeout=60000)
+    public void testRunningBrokerIsDiscovered() throws Exception {
+        connection = createConnection();
+        connection.start();
+
+        assertTrue("connection never connected.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return jmsConnection.isConnected();
+            }
+        }));
+    }
+
+    @Test(timeout=60000)
+    public void testConnectionFailsWhenBrokerGoesDown() throws Exception {
+        connection = createConnection();
+        connection.start();
+
+        assertTrue("connection never connected.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return jmsConnection.isConnected();
+            }
+        }));
+
+        LOG.info("Connection established, stopping broker.");
+        stopPrimaryBroker();
+
+        assertTrue("Interrupted event never fired", interrupted.await(30, TimeUnit.SECONDS));
+    }
+
+    @Test(timeout=60000)
+    public void testConnectionRestoresAfterBrokerRestarted() throws Exception {
+        connection = createConnection();
+        connection.start();
+
+        assertTrue("connection never connected.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return jmsConnection.isConnected();
+            }
+        }));
+
+        stopPrimaryBroker();
+        assertTrue(interrupted.await(20, TimeUnit.SECONDS));
+        startPrimaryBroker();
+        assertTrue(restored.await(20, TimeUnit.SECONDS));
+    }
+
+    @Test(timeout=60000)
+    public void testDiscoversAndReconnectsToSecondaryBroker() throws Exception {
+
+        connection = createConnection();
+        connection.start();
+
+        assertTrue("connection never connected.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return jmsConnection.isConnected();
+            }
+        }));
+
+        startNewBroker();
+        stopPrimaryBroker();
+
+        assertTrue(interrupted.await(20, TimeUnit.SECONDS));
+        assertTrue(restored.await(20, TimeUnit.SECONDS));
+    }
+
+    @Override
+    protected boolean isAmqpDiscovery() {
+        return true;
+    }
+
+    protected Connection createConnection() throws Exception {
+        JmsConnectionFactory factory = new JmsConnectionFactory(
+            "discovery:(multicast://default)?maxReconnectDelay=500");
+        connection = factory.createConnection();
+        jmsConnection = (JmsConnection) connection;
+        jmsConnection.addConnectionListener(this);
+        return connection;
+    }
+
+    @Override
+    public void onConnectionFailure(Throwable error) {
+        LOG.info("Connection reported failover: {}", error.getMessage());
+    }
+
+    @Override
+    public void onConnectionInterrupted(URI remoteURI) {
+        LOG.info("Connection reports interrupted. Lost connection to -> {}", remoteURI);
+        interrupted.countDown();
+    }
+
+    @Override
+    public void onConnectionRestored(URI remoteURI) {
+        LOG.info("Connection reports restored.  Connected to -> {}", remoteURI);
+        restored.countDown();
+    }
+
+    @Override
+    public void onMessage(JmsInboundMessageDispatch envelope) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsDiscoveryProviderTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsDiscoveryProviderTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsDiscoveryProviderTest.java
new file mode 100644
index 0000000..3d53957
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsDiscoveryProviderTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.discovery;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.qpid.jms.provider.DefaultProviderListener;
+import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.provider.discovery.DiscoveryProviderFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test basic discovery of remote brokers
+ */
+public class JmsDiscoveryProviderTest {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(JmsDiscoveryProviderTest.class);
+
+    @Rule public TestName name = new TestName();
+
+    private BrokerService broker;
+
+    @Before
+    public void setup() throws Exception {
+        broker = createBroker();
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+            broker = null;
+        }
+    }
+
+    @Test(timeout=30000)
+    public void testCreateDiscvoeryProvider() throws Exception {
+        URI discoveryUri = new URI("discovery:multicast://default");
+        Provider provider = DiscoveryProviderFactory.createAsync(discoveryUri);
+        assertNotNull(provider);
+
+        DefaultProviderListener listener = new DefaultProviderListener();
+        provider.setProviderListener(listener);
+        provider.start();
+        provider.close();
+    }
+
+    @Test(timeout=30000, expected=IllegalStateException.class)
+    public void testStartFailsWithNoListener() throws Exception {
+        URI discoveryUri = new URI("discovery:multicast://default");
+        Provider provider =
+            DiscoveryProviderFactory.createAsync(discoveryUri);
+        assertNotNull(provider);
+        provider.start();
+        provider.close();
+    }
+
+    @Test(timeout=30000, expected=IOException.class)
+    public void testCreateFailsWithUnknownAgent() throws Exception {
+        URI discoveryUri = new URI("discovery:unknown://default");
+        Provider provider = DiscoveryProviderFactory.createAsync(discoveryUri);
+        provider.close();
+    }
+
+    protected BrokerService createBroker() throws Exception {
+
+        BrokerService brokerService = new BrokerService();
+        brokerService.setBrokerName("localhost");
+        brokerService.setPersistent(false);
+        brokerService.setAdvisorySupport(false);
+        brokerService.setUseJmx(false);
+
+        TransportConnector connector = brokerService.addConnector("amqp://0.0.0.0:0");
+        connector.setName("amqp");
+        connector.setDiscoveryUri(new URI("multicast://default"));
+
+        return brokerService;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org