You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/09/23 20:20:28 UTC

[04/27] Initial drop of donated AMQP Client Code.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsMessageIntegrityTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsMessageIntegrityTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsMessageIntegrityTest.java
new file mode 100644
index 0000000..f4b54a6
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsMessageIntegrityTest.java
@@ -0,0 +1,497 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.UUID;
+import java.util.Vector;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Tests that messages sent and received don't lose data and have expected
+ * JMS Message property values.
+ */
+public class JmsMessageIntegrityTest extends AmqpTestSupport {
+
+    private Connection connection;
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        connection = createAmqpConnection();
+    }
+
+    @Test
+    public void testTextMessage() throws Exception {
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        {
+            TextMessage message = session.createTextMessage();
+            message.setText("Hi");
+            producer.send(message);
+        }
+        {
+            TextMessage message = (TextMessage)consumer.receive(1000);
+            assertNotNull(message);
+            assertEquals("Hi", message.getText());
+        }
+
+        assertNull(consumer.receiveNoWait());
+    }
+
+    @Test
+    public void testBytesMessageLength() throws Exception {
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        {
+            BytesMessage message = session.createBytesMessage();
+            message.writeInt(1);
+            message.writeInt(2);
+            message.writeInt(3);
+            message.writeInt(4);
+            producer.send(message);
+        }
+        {
+            BytesMessage message = (BytesMessage)consumer.receive(1000);
+            assertNotNull(message);
+            assertEquals(16, message.getBodyLength());
+        }
+
+        assertNull(consumer.receiveNoWait());
+    }
+
+    @Test
+    public void testObjectMessage() throws Exception {
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        UUID payload = UUID.randomUUID();
+
+        {
+            ObjectMessage message = session.createObjectMessage();
+            message.setObject(payload);
+            producer.send(message);
+        }
+        {
+            ObjectMessage message = (ObjectMessage)consumer.receive(1000);
+            assertNotNull(message);
+            assertEquals(payload, message.getObject());
+        }
+        assertNull(consumer.receiveNoWait());
+    }
+
+    @Test
+    public void testBytesMessage() throws Exception {
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        {
+            BytesMessage message = session.createBytesMessage();
+            message.writeBoolean(true);
+            producer.send(message);
+        }
+        {
+            BytesMessage message = (BytesMessage)consumer.receive(1000);
+            assertNotNull(message);
+            assertTrue(message.readBoolean());
+
+            try {
+                message.readByte();
+                fail("Expected exception not thrown.");
+            } catch (MessageEOFException e) {
+            }
+        }
+        assertNull(consumer.receiveNoWait());
+    }
+
+    @Test
+    public void testStreamMessage() throws Exception {
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        {
+            StreamMessage message = session.createStreamMessage();
+            message.writeString("This is a test to see how it works.");
+            producer.send(message);
+        }
+        {
+            StreamMessage message = (StreamMessage)consumer.receive(1000);
+            assertNotNull(message);
+
+            // Invalid conversion should throw exception and not move the stream position.
+            try {
+                message.readByte();
+                fail("Should have received NumberFormatException");
+            } catch (NumberFormatException e) {
+            }
+
+            assertEquals("This is a test to see how it works.", message.readString());
+
+            // Invalid conversion should throw exception and not move the stream position.
+            try {
+                message.readByte();
+                fail("Should have received MessageEOFException");
+            } catch (MessageEOFException e) {
+            }
+        }
+        assertNull(consumer.receiveNoWait());
+    }
+
+    @Test
+    public void testMapMessage() throws Exception {
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        {
+            MapMessage message = session.createMapMessage();
+            message.setBoolean("boolKey", true);
+            producer.send(message);
+        }
+        {
+            MapMessage message = (MapMessage)consumer.receive(1000);
+            assertNotNull(message);
+            assertTrue(message.getBoolean("boolKey"));
+        }
+        assertNull(consumer.receiveNoWait());
+    }
+
+    static class ForeignMessage implements TextMessage {
+
+        public int deliveryMode;
+
+        private String messageId;
+        private long timestamp;
+        private String correlationId;
+        private Destination replyTo;
+        private Destination destination;
+        private boolean redelivered;
+        private String type;
+        private long expiration;
+        private int priority;
+        private String text;
+        private final HashMap<String, Object> props = new HashMap<String, Object>();
+
+        @Override
+        public String getJMSMessageID() throws JMSException {
+            return messageId;
+        }
+
+        @Override
+        public void setJMSMessageID(String arg0) throws JMSException {
+            messageId = arg0;
+        }
+
+        @Override
+        public long getJMSTimestamp() throws JMSException {
+            return timestamp;
+        }
+
+        @Override
+        public void setJMSTimestamp(long arg0) throws JMSException {
+            timestamp = arg0;
+        }
+
+        @Override
+        public byte[] getJMSCorrelationIDAsBytes() throws JMSException {
+            return null;
+        }
+
+        @Override
+        public void setJMSCorrelationIDAsBytes(byte[] arg0) throws JMSException {
+        }
+
+        @Override
+        public void setJMSCorrelationID(String arg0) throws JMSException {
+            correlationId = arg0;
+        }
+
+        @Override
+        public String getJMSCorrelationID() throws JMSException {
+            return correlationId;
+        }
+
+        @Override
+        public Destination getJMSReplyTo() throws JMSException {
+            return replyTo;
+        }
+
+        @Override
+        public void setJMSReplyTo(Destination arg0) throws JMSException {
+            replyTo = arg0;
+        }
+
+        @Override
+        public Destination getJMSDestination() throws JMSException {
+            return destination;
+        }
+
+        @Override
+        public void setJMSDestination(Destination arg0) throws JMSException {
+            destination = arg0;
+        }
+
+        @Override
+        public int getJMSDeliveryMode() throws JMSException {
+            return deliveryMode;
+        }
+
+        @Override
+        public void setJMSDeliveryMode(int arg0) throws JMSException {
+            deliveryMode = arg0;
+        }
+
+        @Override
+        public boolean getJMSRedelivered() throws JMSException {
+            return redelivered;
+        }
+
+        @Override
+        public void setJMSRedelivered(boolean arg0) throws JMSException {
+            redelivered = arg0;
+        }
+
+        @Override
+        public String getJMSType() throws JMSException {
+            return type;
+        }
+
+        @Override
+        public void setJMSType(String arg0) throws JMSException {
+            type = arg0;
+        }
+
+        @Override
+        public long getJMSExpiration() throws JMSException {
+            return expiration;
+        }
+
+        @Override
+        public void setJMSExpiration(long arg0) throws JMSException {
+            expiration = arg0;
+        }
+
+        @Override
+        public int getJMSPriority() throws JMSException {
+            return priority;
+        }
+
+        @Override
+        public void setJMSPriority(int arg0) throws JMSException {
+            priority = arg0;
+        }
+
+        @Override
+        public void clearProperties() throws JMSException {
+        }
+
+        @Override
+        public boolean propertyExists(String arg0) throws JMSException {
+            return false;
+        }
+
+        @Override
+        public boolean getBooleanProperty(String arg0) throws JMSException {
+            return false;
+        }
+
+        @Override
+        public byte getByteProperty(String arg0) throws JMSException {
+            return 0;
+        }
+
+        @Override
+        public short getShortProperty(String arg0) throws JMSException {
+            return 0;
+        }
+
+        @Override
+        public int getIntProperty(String arg0) throws JMSException {
+            return 0;
+        }
+
+        @Override
+        public long getLongProperty(String arg0) throws JMSException {
+            return 0;
+        }
+
+        @Override
+        public float getFloatProperty(String arg0) throws JMSException {
+            return 0;
+        }
+
+        @Override
+        public double getDoubleProperty(String arg0) throws JMSException {
+            return 0;
+        }
+
+        @Override
+        public String getStringProperty(String arg0) throws JMSException {
+            return (String)props.get(arg0);
+        }
+
+        @Override
+        public Object getObjectProperty(String arg0) throws JMSException {
+            return props.get(arg0);
+        }
+
+        @Override
+        public Enumeration<?> getPropertyNames() throws JMSException {
+            return new Vector<String>(props.keySet()).elements();
+        }
+
+        @Override
+        public void setBooleanProperty(String arg0, boolean arg1) throws JMSException {
+        }
+
+        @Override
+        public void setByteProperty(String arg0, byte arg1) throws JMSException {
+        }
+
+        @Override
+        public void setShortProperty(String arg0, short arg1) throws JMSException {
+        }
+
+        @Override
+        public void setIntProperty(String arg0, int arg1) throws JMSException {
+        }
+
+        @Override
+        public void setLongProperty(String arg0, long arg1) throws JMSException {
+        }
+
+        @Override
+        public void setFloatProperty(String arg0, float arg1) throws JMSException {
+        }
+
+        @Override
+        public void setDoubleProperty(String arg0, double arg1) throws JMSException {
+        }
+
+        @Override
+        public void setStringProperty(String arg0, String arg1) throws JMSException {
+            props.put(arg0, arg1);
+        }
+
+        @Override
+        public void setObjectProperty(String arg0, Object arg1) throws JMSException {
+            props.put(arg0, arg1);
+        }
+
+        @Override
+        public void acknowledge() throws JMSException {
+        }
+
+        @Override
+        public void clearBody() throws JMSException {
+        }
+
+        @Override
+        public void setText(String arg0) throws JMSException {
+            text = arg0;
+        }
+
+        @Override
+        public String getText() throws JMSException {
+            return text;
+        }
+    }
+
+    // TODO - implement proper handling of foreign JMS Message and Destination types.
+    @Ignore("ActiveMQ is dropping messages as expired with current proton lib")
+    @Test
+    public void testForeignMessage() throws Exception {
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        {
+            ForeignMessage message = new ForeignMessage();
+            message.text = "Hello";
+            message.setStringProperty("test", "value");
+            long timeToLive = 10000L;
+            long start = System.currentTimeMillis();
+            producer.send(message, Session.AUTO_ACKNOWLEDGE, 7, timeToLive);
+            long end = System.currentTimeMillis();
+
+            // validate jms spec 1.1 section 3.4.11 table 3.1
+            // JMSDestination, JMSDeliveryMode,  JMSExpiration, JMSPriority, JMSMessageID, and JMSTimestamp
+            // must be set by sending a message.
+
+            assertNotNull(message.getJMSDestination());
+            assertEquals(Session.AUTO_ACKNOWLEDGE, message.getJMSDeliveryMode());
+            assertTrue(start  + timeToLive <= message.getJMSExpiration());
+            assertTrue(end + timeToLive >= message.getJMSExpiration());
+            assertEquals(7, message.getJMSPriority());
+            assertNotNull(message.getJMSMessageID());
+            assertTrue(start <= message.getJMSTimestamp());
+            assertTrue(end >= message.getJMSTimestamp());
+        }
+        {
+            TextMessage message = (TextMessage)consumer.receive(10000);
+            assertNotNull(message);
+            assertEquals("Hello", message.getText());
+            assertEquals("value", message.getStringProperty("test"));
+        }
+
+        assertNull(consumer.receiveNoWait());
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsQueueConnectionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsQueueConnectionTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsQueueConnectionTest.java
new file mode 100644
index 0000000..36253c6
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsQueueConnectionTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import static org.junit.Assert.assertNotNull;
+
+import javax.jms.JMSException;
+import javax.jms.JMSSecurityException;
+import javax.jms.QueueConnection;
+
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+
+/**
+ * Test basic QueueConnection creation etc.
+ */
+public class JmsQueueConnectionTest extends AmqpTestSupport {
+
+    @Test
+    public void testCreateQueueConnection() throws JMSException {
+        JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+        QueueConnection connection = factory.createQueueConnection();
+        assertNotNull(connection);
+        connection.close();
+    }
+
+    @Test(timeout=30000)
+    public void testCreateConnectionAsSystemAdmin() throws Exception {
+        JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+        factory.setUsername("system");
+        factory.setPassword("manager");
+        QueueConnection connection = factory.createQueueConnection();
+        assertNotNull(connection);
+        connection.start();
+        connection.close();
+    }
+
+    @Test(timeout=30000, expected = JMSSecurityException.class)
+    public void testCreateConnectionAsUnknwonUser() throws Exception {
+        JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+        factory.setUsername("unknown");
+        factory.setPassword("unknown");
+        QueueConnection connection = factory.createQueueConnection();
+        assertNotNull(connection);
+        connection.start();
+        connection.close();
+    }
+
+    @Test(timeout=30000)
+    public void testCreateConnectionCallSystemAdmin() throws Exception {
+        JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+        QueueConnection connection = factory.createQueueConnection("system", "manager");
+        assertNotNull(connection);
+        connection.start();
+        connection.close();
+    }
+
+    @Test(timeout=30000, expected = JMSSecurityException.class)
+    public void testCreateConnectionCallUnknwonUser() throws Exception {
+        JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+        QueueConnection connection = factory.createQueueConnection("unknown", "unknown");
+        assertNotNull(connection);
+        connection.start();
+        connection.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSSLConnectionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSSLConnectionTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSSLConnectionTest.java
new file mode 100644
index 0000000..0e61d39
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSSLConnectionTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.net.URI;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test that we can connect to a broker over SSL.
+ */
+public class JmsSSLConnectionTest {
+
+    private BrokerService brokerService;
+
+    public static final String PASSWORD = "password";
+    public static final String KEYSTORE = "src/test/resources/keystore";
+    public static final String KEYSTORE_TYPE = "jks";
+
+    private URI connectionURI;
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty("javax.net.ssl.trustStore", KEYSTORE);
+        System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
+        System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
+        System.setProperty("javax.net.ssl.keyStore", KEYSTORE);
+        System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
+        System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
+
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        brokerService.setAdvisorySupport(false);
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.setUseJmx(true);
+
+        TransportConnector connector = brokerService.addConnector("amqp+ssl://localhost:0");
+        brokerService.start();
+        brokerService.waitUntilStarted();
+
+        connectionURI = connector.getPublishableConnectURI();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+    }
+
+    public String getConnectionURI() throws Exception {
+        return "amqps://" + connectionURI.getHost() + ":" + connectionURI.getPort();
+    }
+
+    @Test(timeout=30000)
+    public void testCreateConnection() throws Exception {
+        JmsConnectionFactory factory = new JmsConnectionFactory(getConnectionURI());
+        JmsConnection connection = (JmsConnection) factory.createConnection();
+        assertNotNull(connection);
+        connection.close();
+    }
+
+    @Test(timeout=30000)
+    public void testCreateConnectionAndStart() throws Exception {
+        JmsConnectionFactory factory = new JmsConnectionFactory(getConnectionURI());
+        JmsConnection connection = (JmsConnection) factory.createConnection();
+        assertNotNull(connection);
+        connection.start();
+        connection.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionClosedTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionClosedTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionClosedTest.java
new file mode 100644
index 0000000..efb3df3
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionClosedTest.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+
+/**
+ * Validates all Session contracts following a close() call.
+ */
+public class JmsSessionClosedTest extends AmqpTestSupport {
+
+    protected Connection connection;
+
+    protected Session createSession() throws Exception {
+        connection = createAmqpConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.close();
+        return session;
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        connection.close();
+        super.tearDown();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testCreateMessageFails() throws Exception {
+        Session session = createSession();
+        session.createMessage();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testCreateTextMessageFails() throws Exception {
+        Session session = createSession();
+        session.createTextMessage();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testCreateTextMessageWithTextFails() throws Exception {
+        Session session = createSession();
+        session.createTextMessage("TEST");
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testCreateMapMessageFails() throws Exception {
+        Session session = createSession();
+        session.createMapMessage();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testCreateStreamMessageFails() throws Exception {
+        Session session = createSession();
+        session.createStreamMessage();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testCreateBytesMessageFails() throws Exception {
+        Session session = createSession();
+        session.createBytesMessage();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testCreateObjectMessageFails() throws Exception {
+        Session session = createSession();
+        session.createObjectMessage();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testCreateObjectMessageWithObjectFails() throws Exception {
+        Session session = createSession();
+        session.createObjectMessage("TEST");
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testGetTransactedFails() throws Exception {
+        Session session = createSession();
+        session.getTransacted();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testGetAcknowledgeModeFails() throws Exception {
+        Session session = createSession();
+        session.getAcknowledgeMode();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testCommitFails() throws Exception {
+        Session session = createSession();
+        session.commit();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testRollbackFails() throws Exception {
+        Session session = createSession();
+        session.rollback();
+    }
+
+    @Test(timeout=30000)
+    public void testCloseDoesNotFail() throws Exception {
+        Session session = createSession();
+        session.close();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testRecoverFails() throws Exception {
+        Session session = createSession();
+        session.recover();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testGetMessageListenerFails() throws Exception {
+        Session session = createSession();
+        session.getMessageListener();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testSetMessageListenerFails() throws Exception {
+        Session session = createSession();
+        MessageListener listener = new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+            }
+        };
+        session.setMessageListener(listener);
+    }
+
+    @Test(timeout=30000, expected=RuntimeException.class)
+    public void testRunFails() throws Exception {
+        Session session = createSession();
+        session.run();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testCreateProducerFails() throws Exception {
+        Session session = createSession();
+        Destination destination = session.createQueue("test");
+        session.createProducer(destination);
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testCreateConsumerDestinatioFails() throws Exception {
+        Session session = createSession();
+        Destination destination = session.createQueue("test");
+        session.createConsumer(destination);
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testCreateConsumerDestinatioSelectorFails() throws Exception {
+        Session session = createSession();
+        Destination destination = session.createQueue("test");
+        session.createConsumer(destination, "a = b");
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testCreateConsumerDestinatioSelectorBooleanFails() throws Exception {
+        Session session = createSession();
+        Destination destination = session.createQueue("test");
+        session.createConsumer(destination, "a = b", true);
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testCreateQueueFails() throws Exception {
+        Session session = createSession();
+        session.createQueue("TEST");
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testCreateTopicFails() throws Exception {
+        Session session = createSession();
+        session.createTopic("TEST");
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testCreateTemporaryQueueFails() throws Exception {
+        Session session = createSession();
+        session.createTemporaryQueue();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testCreateTemporaryTopicFails() throws Exception {
+        Session session = createSession();
+        session.createTemporaryQueue();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testCreateDurableSubscriberFails() throws Exception {
+        Session session = createSession();
+        Topic destination = session.createTopic("TEST");
+        session.createDurableSubscriber(destination, "test");
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testCreateDurableSubscriberSelectorBooleanFails() throws Exception {
+        Session session = createSession();
+        Topic destination = session.createTopic("TEST");
+        session.createDurableSubscriber(destination, "test", "a = b", false);
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testCreateQueueBrowserFails() throws Exception {
+        Session session = createSession();
+        Queue destination = session.createQueue("test");
+        session.createBrowser(destination);
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testCreateQueueBrowserWithSelectorFails() throws Exception {
+        Session session = createSession();
+        Queue destination = session.createQueue("test");
+        session.createBrowser(destination, "a = b");
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testUnsubscribeFails() throws Exception {
+        Session session = createSession();
+        session.unsubscribe("test");
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionFailedTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionFailedTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionFailedTest.java
new file mode 100644
index 0000000..5e6846c
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionFailedTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.support.Wait;
+
+
+/**
+ * Tests the Session method contracts when the underlying connection is lost.
+ */
+public class JmsSessionFailedTest extends JmsSessionClosedTest {
+
+    @Override
+    protected Session createSession() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+        connection = createAmqpConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        connection.setExceptionListener(new ExceptionListener() {
+
+            @Override
+            public void onException(JMSException exception) {
+                latch.countDown();
+            }
+        });
+        connection.start();
+        stopPrimaryBroker();
+        assertTrue(latch.await(20, TimeUnit.SECONDS));
+        final JmsConnection jmsConnection = (JmsConnection) connection;
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return !jmsConnection.isConnected();
+            }
+        }));
+        return session;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
new file mode 100644
index 0000000..6c3ca35
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import static org.junit.Assert.assertNotNull;
+
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+
+/**
+ * Test basic Session functionality.
+ */
+public class JmsSessionTest extends AmqpTestSupport {
+
+    @Test(timeout = 60000)
+    public void testCreateSession() throws Exception {
+        connection = createAmqpConnection();
+        assertNotNull(connection);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+
+        session.close();
+    }
+
+    @Test(timeout=30000)
+    public void testSessionCreateProducer() throws Exception {
+        connection = createAmqpConnection();
+        assertNotNull(connection);
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+
+        Queue queue = session.createQueue("test.queue");
+        MessageProducer producer = session.createProducer(queue);
+
+        producer.close();
+        session.close();
+    }
+
+    @Test(timeout=30000)
+    public void testSessionCreateConsumer() throws Exception {
+        connection = createAmqpConnection();
+        assertNotNull(connection);
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+
+        Queue queue = session.createQueue("test.queue");
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        consumer.close();
+        session.close();
+    }
+
+    @Test(timeout=30000)
+    public void testSessionDoubleCloseWithoutException() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.close();
+        session.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsTopicConnectionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsTopicConnectionTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsTopicConnectionTest.java
new file mode 100644
index 0000000..e423139
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsTopicConnectionTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import static org.junit.Assert.assertNotNull;
+
+import javax.jms.JMSException;
+import javax.jms.JMSSecurityException;
+import javax.jms.TopicConnection;
+
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class JmsTopicConnectionTest extends AmqpTestSupport {
+
+    @Test
+    public void testCreateQueueConnection() throws JMSException {
+        JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+        TopicConnection connection = factory.createTopicConnection();
+        assertNotNull(connection);
+        connection.close();
+    }
+
+    @Test(timeout=30000)
+    public void testCreateConnectionAsSystemAdmin() throws Exception {
+        JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+        factory.setUsername("system");
+        factory.setPassword("manager");
+        TopicConnection connection = factory.createTopicConnection();
+        assertNotNull(connection);
+        connection.start();
+        connection.close();
+    }
+
+    @Test(timeout=30000)
+    public void testCreateConnectionCallSystemAdmin() throws Exception {
+        JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+        TopicConnection connection = factory.createTopicConnection("system", "manager");
+        assertNotNull(connection);
+        connection.start();
+        connection.close();
+    }
+
+    @Test(timeout=30000, expected = JMSSecurityException.class)
+    public void testCreateConnectionAsUnknwonUser() throws Exception {
+        JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+        factory.setUsername("unknown");
+        factory.setPassword("unknown");
+        TopicConnection connection = factory.createTopicConnection();
+        assertNotNull(connection);
+        connection.start();
+        connection.close();
+    }
+
+    @Test(timeout=30000, expected = JMSSecurityException.class)
+    public void testCreateConnectionCallUnknownUser() throws Exception {
+        JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+        TopicConnection connection = factory.createTopicConnection("unknown", "unknown");
+        assertNotNull(connection);
+        connection.start();
+        connection.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ConsumeFromAMQPTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ConsumeFromAMQPTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ConsumeFromAMQPTest.java
new file mode 100644
index 0000000..90d7358
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ConsumeFromAMQPTest.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.bench;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ *
+ */
+@Ignore
+public class ConsumeFromAMQPTest extends AmqpTestSupport {
+
+    private final int MSG_COUNT = 50 * 1000;
+    private final int NUM_RUNS = 10;
+
+    @Override
+    protected boolean isForceAsyncSends() {
+        return true;
+    }
+
+    @Override
+    protected boolean isAlwaysSyncSend() {
+        return false;
+    }
+
+    @Override
+    protected String getAmqpTransformer() {
+        return "raw";
+    }
+
+    @Override
+    protected boolean isMessagePrioritySupported() {
+        return false;
+    }
+
+    @Override
+    protected boolean isSendAcksAsync() {
+        return true;
+    }
+
+    @Override
+    public String getAmqpConnectionURIOptions() {
+        return "provider.presettleProducers=true&provider.presettleConsumers=true";
+    }
+
+    @Test
+    public void oneConsumedForProfile() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName());
+        MessageProducer producer = session.createProducer(queue);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        TextMessage message = session.createTextMessage();
+        message.setText("hello");
+        producer.send(message);
+        producer.close();
+
+        QueueViewMBean queueView = getProxyToQueue(getDestinationName());
+        assertEquals("Queue should have a message", 1, queueView.getQueueSize());
+
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message received = consumer.receive(7000);
+        assertNotNull(received);
+        consumer.close();
+
+        assertEquals("Queue should have ano messages", 0, queueView.getQueueSize());
+    }
+
+    @Test
+    public void testConsumeRateFromQueue() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName());
+
+        // Warm Up the broker.
+        produceMessages(queue, MSG_COUNT);
+        consumerMessages(queue, MSG_COUNT);
+        QueueViewMBean queueView = getProxyToQueue(getDestinationName());
+        queueView.purge();
+
+        List<Long> sendTimes = new ArrayList<Long>();
+        long cumulative = 0;
+
+        for (int i = 0; i < NUM_RUNS; ++i) {
+            produceMessages(queue, MSG_COUNT);
+            long result = consumerMessages(queue, MSG_COUNT);
+            sendTimes.add(result);
+            cumulative += result;
+            LOG.info("Time to send {} topic messages: {} ms", MSG_COUNT, result);
+            queueView.purge();
+        }
+
+        long smoothed = cumulative / NUM_RUNS;
+        LOG.info("Smoothed send time for {} messages: {}", MSG_COUNT, smoothed);
+    }
+
+    @Test
+    public void testConsumeRateFromQueueAsync() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName());
+
+        // Warm Up the broker.
+        produceMessages(queue, MSG_COUNT);
+        consumerMessagesAsync(queue, MSG_COUNT);
+
+        QueueViewMBean queueView = getProxyToQueue(getDestinationName());
+
+        List<Long> sendTimes = new ArrayList<Long>();
+        long cumulative = 0;
+
+        for (int i = 0; i < NUM_RUNS; ++i) {
+            produceMessages(queue, MSG_COUNT);
+            long result = consumerMessagesAsync(queue, MSG_COUNT);
+            sendTimes.add(result);
+            cumulative += result;
+            LOG.info("Time to send {} topic messages: {} ms", MSG_COUNT, result);
+            queueView.purge();
+        }
+
+        long smoothed = cumulative / NUM_RUNS;
+        LOG.info("Smoothed send time for {} messages: {}", MSG_COUNT, smoothed);
+    }
+
+    protected long consumerMessages(Destination destination, int msgCount) throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        long startTime = System.currentTimeMillis();
+        for (int i = 0; i < msgCount; ++i) {
+            Message message = consumer.receive(7000);
+            assertNotNull("Failed to receive message " + i, message);
+        }
+        long result = (System.currentTimeMillis() - startTime);
+
+        consumer.close();
+        return result;
+    }
+
+    protected long consumerMessagesAsync(Destination destination, int msgCount) throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        final CountDownLatch doneLatch = new CountDownLatch(MSG_COUNT);
+        long startTime = System.currentTimeMillis();
+        consumer.setMessageListener(new MessageListener() {
+
+            @Override
+            public void onMessage(Message message) {
+                doneLatch.countDown();
+            }
+        });
+        assertTrue(doneLatch.await(60, TimeUnit.SECONDS));
+        long result = (System.currentTimeMillis() - startTime);
+
+        consumer.close();
+        return result;
+    }
+
+    protected void produceMessages(Destination destination, int msgCount) throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        TextMessage message = session.createTextMessage();
+        message.setText("hello");
+
+        for (int i = 0; i < msgCount; ++i) {
+            producer.send(message);
+        }
+
+        producer.close();
+    }
+
+    @Override
+    protected void configureBrokerPolicies(BrokerService broker) {
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
+        policyEntry.setPrioritizedMessages(false);
+        policyEntry.setExpireMessagesPeriod(0);
+        policyEntry.setEnableAudit(false);
+        policyEntry.setOptimizedDispatch(false);
+        policyEntry.setQueuePrefetch(1000);
+
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(policyEntry);
+        broker.setDestinationPolicy(policyMap);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProduceToAMQPTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProduceToAMQPTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProduceToAMQPTest.java
new file mode 100644
index 0000000..9077f08
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProduceToAMQPTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.bench;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Collect some basic throughput data on message producer.
+ */
+@Ignore
+public class ProduceToAMQPTest extends AmqpTestSupport {
+
+    private final int MSG_COUNT = 50 * 1000;
+    private final int NUM_RUNS = 20;
+
+    @Override
+    protected boolean isForceAsyncSends() {
+        return true;
+    }
+
+    @Override
+    protected boolean isAlwaysSyncSend() {
+        return false;
+    }
+
+    @Override
+    protected String getAmqpTransformer() {
+        return "raw";
+    }
+
+    @Override
+    public String getAmqpConnectionURIOptions() {
+        return "provider.presettle=true";
+    }
+
+    @Test
+    public void singleSendProfile() throws Exception {
+        connection = createAmqpConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic(getDestinationName());
+        MessageProducer producer = session.createProducer(topic);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        TextMessage message = session.createTextMessage();
+        message.setText("hello");
+        producer.send(message);
+        producer.close();
+    }
+
+    @Test
+    public void testProduceRateToTopic() throws Exception {
+
+        connection = createAmqpConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic(getDestinationName());
+
+        // Warm Up the broker.
+        produceMessages(topic, MSG_COUNT);
+
+        List<Long> sendTimes = new ArrayList<Long>();
+        long cumulative = 0;
+
+        for (int i = 0; i < NUM_RUNS; ++i) {
+            long result = produceMessages(topic, MSG_COUNT);
+            sendTimes.add(result);
+            cumulative += result;
+            LOG.info("Time to send {} topic messages: {} ms", MSG_COUNT, result);
+        }
+
+        long smoothed = cumulative / NUM_RUNS;
+        LOG.info("Smoothed send time for {} messages: {}", MSG_COUNT, smoothed);
+        TimeUnit.SECONDS.sleep(1);
+    }
+
+    @Test
+    public void testProduceRateToQueue() throws Exception {
+
+        connection = createAmqpConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName());
+
+        // Warm Up the broker.
+        produceMessages(queue, MSG_COUNT);
+
+        QueueViewMBean queueView = getProxyToQueue(getDestinationName());
+        queueView.purge();
+
+        List<Long> sendTimes = new ArrayList<Long>();
+        long cumulative = 0;
+
+        for (int i = 0; i < NUM_RUNS; ++i) {
+            long result = produceMessages(queue, MSG_COUNT);
+            sendTimes.add(result);
+            cumulative += result;
+            LOG.info("Time to send {} queue messages: {} ms", MSG_COUNT, result);
+            queueView.purge();
+        }
+
+        long smoothed = cumulative / NUM_RUNS;
+        LOG.info("Smoothed send time for {} messages: {}", MSG_COUNT, smoothed);
+        TimeUnit.SECONDS.sleep(1);
+    }
+
+    protected long produceMessages(Destination destination, int msgCount) throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        TextMessage message = session.createTextMessage();
+        message.setText("hello");
+
+        long startTime = System.currentTimeMillis();
+        for (int i = 0; i < msgCount; ++i) {
+            producer.send(message);
+        }
+        long result = (System.currentTimeMillis() - startTime);
+
+        producer.close();
+        return result;
+    }
+
+    @Override
+    protected void configureBrokerPolicies(BrokerService broker) {
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
+        policyEntry.setPrioritizedMessages(false);
+        policyEntry.setExpireMessagesPeriod(0);
+        policyEntry.setEnableAudit(false);
+        policyEntry.setOptimizedDispatch(true);
+        policyEntry.setQueuePrefetch(100);
+
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(policyEntry);
+        broker.setDestinationPolicy(policyMap);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProduceToOpenWireTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProduceToOpenWireTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProduceToOpenWireTest.java
new file mode 100644
index 0000000..3fa8fd5
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProduceToOpenWireTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.bench;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Ignore
+public class ProduceToOpenWireTest extends AmqpTestSupport {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(ProduceToOpenWireTest.class);
+
+    private final int MSG_COUNT = 50 * 1000;
+    private final int NUM_RUNS = 40;
+
+    @Test
+    public void singleSendProfile() throws Exception {
+        connection = createActiveMQConnection();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic(getDestinationName());
+        MessageProducer producer = session.createProducer(topic);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        TextMessage message = session.createTextMessage();
+        message.setText("hello");
+        producer.send(message);
+    }
+
+    @Test
+    public void testProduceRateToTopic() throws Exception {
+
+        connection = createActiveMQConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic(getDestinationName());
+
+        // Warm Up the broker.
+        produceMessages(topic, MSG_COUNT);
+
+        List<Long> sendTimes = new ArrayList<Long>();
+        long cumulative = 0;
+
+        for (int i = 0; i < NUM_RUNS; ++i) {
+            long result = produceMessages(topic, MSG_COUNT);
+            sendTimes.add(result);
+            cumulative += result;
+            LOG.info("Time to send {} topic messages: {} ms", MSG_COUNT, result);
+        }
+
+        long smoothed = cumulative / NUM_RUNS;
+        LOG.info("Smoothed send time for {} messages: {}", MSG_COUNT, smoothed);
+    }
+
+    @Test
+    public void testProduceRateToQueue() throws Exception {
+
+        connection = createActiveMQConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName());
+
+        // Warm Up the broker.
+        produceMessages(queue, MSG_COUNT);
+
+        QueueViewMBean queueView = getProxyToQueue(getDestinationName());
+        queueView.purge();
+
+        List<Long> sendTimes = new ArrayList<Long>();
+        long cumulative = 0;
+
+        for (int i = 0; i < NUM_RUNS; ++i) {
+            long result = produceMessages(queue, MSG_COUNT);
+            sendTimes.add(result);
+            cumulative += result;
+            LOG.info("Time to send {} queue messages: {} ms", MSG_COUNT, result);
+            queueView.purge();
+        }
+
+        long smoothed = cumulative / NUM_RUNS;
+        LOG.info("Smoothed send time for {} messages: {}", MSG_COUNT, smoothed);
+    }
+
+    protected long produceMessages(Destination destination, int msgCount) throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        TextMessage message = session.createTextMessage();
+        message.setText("hello");
+
+        long startTime = System.currentTimeMillis();
+        for (int i = 0; i < msgCount; ++i) {
+            producer.send(message);
+        }
+
+        long result = (System.currentTimeMillis() - startTime);
+
+        producer.close();
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProducerAndConsumerBench.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProducerAndConsumerBench.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProducerAndConsumerBench.java
new file mode 100644
index 0000000..0dacc84
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProducerAndConsumerBench.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.bench;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Vector;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+@Ignore
+public class ProducerAndConsumerBench extends AmqpTestSupport  {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ProducerAndConsumerBench.class);
+
+    public static final int payload = 64 * 1024;
+    public static final int ioBuffer = 2 * payload;
+    public static final int socketBuffer = 64 * payload;
+
+    private final String payloadString = new String(new byte[payload]);
+    private final int parallelProducer = 1;
+    private final int parallelConsumer = 1;
+    private final Vector<Throwable> exceptions = new Vector<Throwable>();
+    private ConnectionFactory factory;
+
+    private final long NUM_SENDS = 30000;
+
+    @Test
+    public void testProduceConsume() throws Exception {
+        this.factory = createAmqpConnectionFactory();
+
+        final AtomicLong sharedSendCount = new AtomicLong(NUM_SENDS);
+        final AtomicLong sharedReceiveCount = new AtomicLong(NUM_SENDS);
+
+        Thread.sleep(2000);
+
+        long start = System.currentTimeMillis();
+        ExecutorService executorService = Executors.newFixedThreadPool(parallelConsumer + parallelProducer);
+
+        for (int i = 0; i < parallelConsumer; i++) {
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        consumeMessages(sharedReceiveCount);
+                    } catch (Throwable e) {
+                        exceptions.add(e);
+                    }
+                }
+            });
+        }
+        for (int i = 0; i < parallelProducer; i++) {
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        publishMessages(sharedSendCount);
+                    } catch (Throwable e) {
+                        exceptions.add(e);
+                    }
+                }
+            });
+        }
+
+        executorService.shutdown();
+        executorService.awaitTermination(30, TimeUnit.MINUTES);
+        assertTrue("Producers done in time", executorService.isTerminated());
+        assertTrue("No exceptions: " + exceptions, exceptions.isEmpty());
+
+        double duration = System.currentTimeMillis() - start;
+        LOG.info("Duration:            " + duration + "ms");
+        LOG.info("Rate:                " + (NUM_SENDS * 1000 / duration) + "m/s");
+    }
+
+    private void consumeMessages(AtomicLong count) throws Exception {
+        JmsConnection connection = (JmsConnection) factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, ActiveMQSession.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName());
+        MessageConsumer consumer = session.createConsumer(queue);
+        long v;
+        while ((v = count.decrementAndGet()) > 0) {
+            assertNotNull("got message " + v, consumer.receive(15000));
+        }
+        consumer.close();
+    }
+
+    private void publishMessages(AtomicLong count) throws Exception {
+        JmsConnection connection = (JmsConnection) factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName());
+
+        MessageProducer producer = session.createProducer(queue);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        Message message = session.createBytesMessage();
+        ((BytesMessage) message).writeBytes(payloadString.getBytes());
+
+        while (count.getAndDecrement() > 0) {
+            producer.send(message);
+        }
+        producer.close();
+        connection.close();
+    }
+
+    @Override
+    protected void configureBrokerPolicies(BrokerService broker) {
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
+        policyEntry.setPrioritizedMessages(false);
+        policyEntry.setExpireMessagesPeriod(0);
+        policyEntry.setEnableAudit(false);
+        policyEntry.setOptimizedDispatch(true);
+        policyEntry.setQueuePrefetch(1); // ensure no contention on add with
+                                         // matched producer/consumer
+
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(policyEntry);
+        broker.setDestinationPolicy(policyMap);
+    }
+
+    @Override
+    protected boolean isForceAsyncSends() {
+        return true;
+    }
+
+    @Override
+    protected boolean isAlwaysSyncSend() {
+        return false;
+    }
+
+    @Override
+    protected String getAmqpTransformer() {
+        return "raw";
+    }
+
+    @Override
+    protected boolean isMessagePrioritySupported() {
+        return false;
+    }
+
+    @Override
+    protected boolean isSendAcksAsync() {
+        return true;
+    }
+
+    @Override
+    public String getAmqpConnectionURIOptions() {
+        return "provider.presettleProducers=true&provider.presettleConsumers=true";
+    }
+
+    @Override
+    protected int getSocketBufferSize() {
+        return socketBuffer;
+    }
+
+    @Override
+    protected int getIOBufferSize() {
+        return ioBuffer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsAutoAckTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsAutoAckTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsAutoAckTest.java
new file mode 100644
index 0000000..db2a104
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsAutoAckTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.apache.qpid.jms.support.Wait;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class JmsAutoAckTest extends AmqpTestSupport {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(JmsAutoAckTest.class);
+
+    @Test(timeout = 60000)
+    public void testAckedMessageAreConsumed() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        sendToAmqQueue(1);
+
+        final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(1, proxy.getQueueSize());
+
+        assertNotNull("Failed to receive any message.", consumer.receive(2000));
+
+        assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return proxy.getQueueSize() == 0;
+            }
+        }));
+    }
+
+    @Test(timeout = 60000)
+    public void testAckedMessageAreConsumedAsync() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        sendToAmqQueue(1);
+
+        final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(1, proxy.getQueueSize());
+
+        consumer.setMessageListener(new MessageListener() {
+
+            @Override
+            public void onMessage(Message message) {
+                LOG.debug("Received async message: {}", message);
+            }
+        });
+
+        assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return proxy.getQueueSize() == 0;
+            }
+        }));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org