You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/09/23 20:20:28 UTC
[04/27] Initial drop of donated AMQP Client Code.
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsMessageIntegrityTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsMessageIntegrityTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsMessageIntegrityTest.java
new file mode 100644
index 0000000..f4b54a6
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsMessageIntegrityTest.java
@@ -0,0 +1,497 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.UUID;
+import java.util.Vector;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Tests that messages sent and received don't lose data and have expected
+ * JMS Message property values.
+ */
+public class JmsMessageIntegrityTest extends AmqpTestSupport {
+
+ private Connection connection;
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ connection = createAmqpConnection();
+ }
+
+ @Test
+ public void testTextMessage() throws Exception {
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(destination);
+ MessageProducer producer = session.createProducer(destination);
+
+ {
+ TextMessage message = session.createTextMessage();
+ message.setText("Hi");
+ producer.send(message);
+ }
+ {
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ assertNotNull(message);
+ assertEquals("Hi", message.getText());
+ }
+
+ assertNull(consumer.receiveNoWait());
+ }
+
+ @Test
+ public void testBytesMessageLength() throws Exception {
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(destination);
+ MessageProducer producer = session.createProducer(destination);
+
+ {
+ BytesMessage message = session.createBytesMessage();
+ message.writeInt(1);
+ message.writeInt(2);
+ message.writeInt(3);
+ message.writeInt(4);
+ producer.send(message);
+ }
+ {
+ BytesMessage message = (BytesMessage)consumer.receive(1000);
+ assertNotNull(message);
+ assertEquals(16, message.getBodyLength());
+ }
+
+ assertNull(consumer.receiveNoWait());
+ }
+
+ @Test
+ public void testObjectMessage() throws Exception {
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(destination);
+ MessageProducer producer = session.createProducer(destination);
+
+ UUID payload = UUID.randomUUID();
+
+ {
+ ObjectMessage message = session.createObjectMessage();
+ message.setObject(payload);
+ producer.send(message);
+ }
+ {
+ ObjectMessage message = (ObjectMessage)consumer.receive(1000);
+ assertNotNull(message);
+ assertEquals(payload, message.getObject());
+ }
+ assertNull(consumer.receiveNoWait());
+ }
+
+ @Test
+ public void testBytesMessage() throws Exception {
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(destination);
+ MessageProducer producer = session.createProducer(destination);
+
+ {
+ BytesMessage message = session.createBytesMessage();
+ message.writeBoolean(true);
+ producer.send(message);
+ }
+ {
+ BytesMessage message = (BytesMessage)consumer.receive(1000);
+ assertNotNull(message);
+ assertTrue(message.readBoolean());
+
+ try {
+ message.readByte();
+ fail("Expected exception not thrown.");
+ } catch (MessageEOFException e) {
+ }
+ }
+ assertNull(consumer.receiveNoWait());
+ }
+
+ @Test
+ public void testStreamMessage() throws Exception {
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(destination);
+ MessageProducer producer = session.createProducer(destination);
+
+ {
+ StreamMessage message = session.createStreamMessage();
+ message.writeString("This is a test to see how it works.");
+ producer.send(message);
+ }
+ {
+ StreamMessage message = (StreamMessage)consumer.receive(1000);
+ assertNotNull(message);
+
+ // Invalid conversion should throw exception and not move the stream position.
+ try {
+ message.readByte();
+ fail("Should have received NumberFormatException");
+ } catch (NumberFormatException e) {
+ }
+
+ assertEquals("This is a test to see how it works.", message.readString());
+
+ // Invalid conversion should throw exception and not move the stream position.
+ try {
+ message.readByte();
+ fail("Should have received MessageEOFException");
+ } catch (MessageEOFException e) {
+ }
+ }
+ assertNull(consumer.receiveNoWait());
+ }
+
+ @Test
+ public void testMapMessage() throws Exception {
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(destination);
+ MessageProducer producer = session.createProducer(destination);
+
+ {
+ MapMessage message = session.createMapMessage();
+ message.setBoolean("boolKey", true);
+ producer.send(message);
+ }
+ {
+ MapMessage message = (MapMessage)consumer.receive(1000);
+ assertNotNull(message);
+ assertTrue(message.getBoolean("boolKey"));
+ }
+ assertNull(consumer.receiveNoWait());
+ }
+
+ static class ForeignMessage implements TextMessage {
+
+ public int deliveryMode;
+
+ private String messageId;
+ private long timestamp;
+ private String correlationId;
+ private Destination replyTo;
+ private Destination destination;
+ private boolean redelivered;
+ private String type;
+ private long expiration;
+ private int priority;
+ private String text;
+ private final HashMap<String, Object> props = new HashMap<String, Object>();
+
+ @Override
+ public String getJMSMessageID() throws JMSException {
+ return messageId;
+ }
+
+ @Override
+ public void setJMSMessageID(String arg0) throws JMSException {
+ messageId = arg0;
+ }
+
+ @Override
+ public long getJMSTimestamp() throws JMSException {
+ return timestamp;
+ }
+
+ @Override
+ public void setJMSTimestamp(long arg0) throws JMSException {
+ timestamp = arg0;
+ }
+
+ @Override
+ public byte[] getJMSCorrelationIDAsBytes() throws JMSException {
+ return null;
+ }
+
+ @Override
+ public void setJMSCorrelationIDAsBytes(byte[] arg0) throws JMSException {
+ }
+
+ @Override
+ public void setJMSCorrelationID(String arg0) throws JMSException {
+ correlationId = arg0;
+ }
+
+ @Override
+ public String getJMSCorrelationID() throws JMSException {
+ return correlationId;
+ }
+
+ @Override
+ public Destination getJMSReplyTo() throws JMSException {
+ return replyTo;
+ }
+
+ @Override
+ public void setJMSReplyTo(Destination arg0) throws JMSException {
+ replyTo = arg0;
+ }
+
+ @Override
+ public Destination getJMSDestination() throws JMSException {
+ return destination;
+ }
+
+ @Override
+ public void setJMSDestination(Destination arg0) throws JMSException {
+ destination = arg0;
+ }
+
+ @Override
+ public int getJMSDeliveryMode() throws JMSException {
+ return deliveryMode;
+ }
+
+ @Override
+ public void setJMSDeliveryMode(int arg0) throws JMSException {
+ deliveryMode = arg0;
+ }
+
+ @Override
+ public boolean getJMSRedelivered() throws JMSException {
+ return redelivered;
+ }
+
+ @Override
+ public void setJMSRedelivered(boolean arg0) throws JMSException {
+ redelivered = arg0;
+ }
+
+ @Override
+ public String getJMSType() throws JMSException {
+ return type;
+ }
+
+ @Override
+ public void setJMSType(String arg0) throws JMSException {
+ type = arg0;
+ }
+
+ @Override
+ public long getJMSExpiration() throws JMSException {
+ return expiration;
+ }
+
+ @Override
+ public void setJMSExpiration(long arg0) throws JMSException {
+ expiration = arg0;
+ }
+
+ @Override
+ public int getJMSPriority() throws JMSException {
+ return priority;
+ }
+
+ @Override
+ public void setJMSPriority(int arg0) throws JMSException {
+ priority = arg0;
+ }
+
+ @Override
+ public void clearProperties() throws JMSException {
+ }
+
+ @Override
+ public boolean propertyExists(String arg0) throws JMSException {
+ return false;
+ }
+
+ @Override
+ public boolean getBooleanProperty(String arg0) throws JMSException {
+ return false;
+ }
+
+ @Override
+ public byte getByteProperty(String arg0) throws JMSException {
+ return 0;
+ }
+
+ @Override
+ public short getShortProperty(String arg0) throws JMSException {
+ return 0;
+ }
+
+ @Override
+ public int getIntProperty(String arg0) throws JMSException {
+ return 0;
+ }
+
+ @Override
+ public long getLongProperty(String arg0) throws JMSException {
+ return 0;
+ }
+
+ @Override
+ public float getFloatProperty(String arg0) throws JMSException {
+ return 0;
+ }
+
+ @Override
+ public double getDoubleProperty(String arg0) throws JMSException {
+ return 0;
+ }
+
+ @Override
+ public String getStringProperty(String arg0) throws JMSException {
+ return (String)props.get(arg0);
+ }
+
+ @Override
+ public Object getObjectProperty(String arg0) throws JMSException {
+ return props.get(arg0);
+ }
+
+ @Override
+ public Enumeration<?> getPropertyNames() throws JMSException {
+ return new Vector<String>(props.keySet()).elements();
+ }
+
+ @Override
+ public void setBooleanProperty(String arg0, boolean arg1) throws JMSException {
+ }
+
+ @Override
+ public void setByteProperty(String arg0, byte arg1) throws JMSException {
+ }
+
+ @Override
+ public void setShortProperty(String arg0, short arg1) throws JMSException {
+ }
+
+ @Override
+ public void setIntProperty(String arg0, int arg1) throws JMSException {
+ }
+
+ @Override
+ public void setLongProperty(String arg0, long arg1) throws JMSException {
+ }
+
+ @Override
+ public void setFloatProperty(String arg0, float arg1) throws JMSException {
+ }
+
+ @Override
+ public void setDoubleProperty(String arg0, double arg1) throws JMSException {
+ }
+
+ @Override
+ public void setStringProperty(String arg0, String arg1) throws JMSException {
+ props.put(arg0, arg1);
+ }
+
+ @Override
+ public void setObjectProperty(String arg0, Object arg1) throws JMSException {
+ props.put(arg0, arg1);
+ }
+
+ @Override
+ public void acknowledge() throws JMSException {
+ }
+
+ @Override
+ public void clearBody() throws JMSException {
+ }
+
+ @Override
+ public void setText(String arg0) throws JMSException {
+ text = arg0;
+ }
+
+ @Override
+ public String getText() throws JMSException {
+ return text;
+ }
+ }
+
+ // TODO - implement proper handling of foreign JMS Message and Destination types.
+ @Ignore("ActiveMQ is dropping messages as expired with current proton lib")
+ @Test
+ public void testForeignMessage() throws Exception {
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(destination);
+ MessageProducer producer = session.createProducer(destination);
+
+ {
+ ForeignMessage message = new ForeignMessage();
+ message.text = "Hello";
+ message.setStringProperty("test", "value");
+ long timeToLive = 10000L;
+ long start = System.currentTimeMillis();
+ producer.send(message, Session.AUTO_ACKNOWLEDGE, 7, timeToLive);
+ long end = System.currentTimeMillis();
+
+ // validate jms spec 1.1 section 3.4.11 table 3.1
+ // JMSDestination, JMSDeliveryMode, JMSExpiration, JMSPriority, JMSMessageID, and JMSTimestamp
+ // must be set by sending a message.
+
+ assertNotNull(message.getJMSDestination());
+ assertEquals(Session.AUTO_ACKNOWLEDGE, message.getJMSDeliveryMode());
+ assertTrue(start + timeToLive <= message.getJMSExpiration());
+ assertTrue(end + timeToLive >= message.getJMSExpiration());
+ assertEquals(7, message.getJMSPriority());
+ assertNotNull(message.getJMSMessageID());
+ assertTrue(start <= message.getJMSTimestamp());
+ assertTrue(end >= message.getJMSTimestamp());
+ }
+ {
+ TextMessage message = (TextMessage)consumer.receive(10000);
+ assertNotNull(message);
+ assertEquals("Hello", message.getText());
+ assertEquals("value", message.getStringProperty("test"));
+ }
+
+ assertNull(consumer.receiveNoWait());
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsQueueConnectionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsQueueConnectionTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsQueueConnectionTest.java
new file mode 100644
index 0000000..36253c6
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsQueueConnectionTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import static org.junit.Assert.assertNotNull;
+
+import javax.jms.JMSException;
+import javax.jms.JMSSecurityException;
+import javax.jms.QueueConnection;
+
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+
+/**
+ * Test basic QueueConnection creation etc.
+ */
+public class JmsQueueConnectionTest extends AmqpTestSupport {
+
+ @Test
+ public void testCreateQueueConnection() throws JMSException {
+ JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+ QueueConnection connection = factory.createQueueConnection();
+ assertNotNull(connection);
+ connection.close();
+ }
+
+ @Test(timeout=30000)
+ public void testCreateConnectionAsSystemAdmin() throws Exception {
+ JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+ factory.setUsername("system");
+ factory.setPassword("manager");
+ QueueConnection connection = factory.createQueueConnection();
+ assertNotNull(connection);
+ connection.start();
+ connection.close();
+ }
+
+ @Test(timeout=30000, expected = JMSSecurityException.class)
+ public void testCreateConnectionAsUnknwonUser() throws Exception {
+ JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+ factory.setUsername("unknown");
+ factory.setPassword("unknown");
+ QueueConnection connection = factory.createQueueConnection();
+ assertNotNull(connection);
+ connection.start();
+ connection.close();
+ }
+
+ @Test(timeout=30000)
+ public void testCreateConnectionCallSystemAdmin() throws Exception {
+ JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+ QueueConnection connection = factory.createQueueConnection("system", "manager");
+ assertNotNull(connection);
+ connection.start();
+ connection.close();
+ }
+
+ @Test(timeout=30000, expected = JMSSecurityException.class)
+ public void testCreateConnectionCallUnknwonUser() throws Exception {
+ JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+ QueueConnection connection = factory.createQueueConnection("unknown", "unknown");
+ assertNotNull(connection);
+ connection.start();
+ connection.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSSLConnectionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSSLConnectionTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSSLConnectionTest.java
new file mode 100644
index 0000000..0e61d39
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSSLConnectionTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.net.URI;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test that we can connect to a broker over SSL.
+ */
+public class JmsSSLConnectionTest {
+
+ private BrokerService brokerService;
+
+ public static final String PASSWORD = "password";
+ public static final String KEYSTORE = "src/test/resources/keystore";
+ public static final String KEYSTORE_TYPE = "jks";
+
+ private URI connectionURI;
+
+ @Before
+ public void setUp() throws Exception {
+ System.setProperty("javax.net.ssl.trustStore", KEYSTORE);
+ System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
+ System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
+ System.setProperty("javax.net.ssl.keyStore", KEYSTORE);
+ System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
+ System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
+
+ brokerService = new BrokerService();
+ brokerService.setPersistent(false);
+ brokerService.setAdvisorySupport(false);
+ brokerService.setDeleteAllMessagesOnStartup(true);
+ brokerService.setUseJmx(true);
+
+ TransportConnector connector = brokerService.addConnector("amqp+ssl://localhost:0");
+ brokerService.start();
+ brokerService.waitUntilStarted();
+
+ connectionURI = connector.getPublishableConnectURI();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ brokerService.stop();
+ brokerService.waitUntilStopped();
+ }
+
+ public String getConnectionURI() throws Exception {
+ return "amqps://" + connectionURI.getHost() + ":" + connectionURI.getPort();
+ }
+
+ @Test(timeout=30000)
+ public void testCreateConnection() throws Exception {
+ JmsConnectionFactory factory = new JmsConnectionFactory(getConnectionURI());
+ JmsConnection connection = (JmsConnection) factory.createConnection();
+ assertNotNull(connection);
+ connection.close();
+ }
+
+ @Test(timeout=30000)
+ public void testCreateConnectionAndStart() throws Exception {
+ JmsConnectionFactory factory = new JmsConnectionFactory(getConnectionURI());
+ JmsConnection connection = (JmsConnection) factory.createConnection();
+ assertNotNull(connection);
+ connection.start();
+ connection.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionClosedTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionClosedTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionClosedTest.java
new file mode 100644
index 0000000..efb3df3
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionClosedTest.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+
+/**
+ * Validates all Session contracts following a close() call.
+ */
+public class JmsSessionClosedTest extends AmqpTestSupport {
+
+ protected Connection connection;
+
+ protected Session createSession() throws Exception {
+ connection = createAmqpConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.close();
+ return session;
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ connection.close();
+ super.tearDown();
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testCreateMessageFails() throws Exception {
+ Session session = createSession();
+ session.createMessage();
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testCreateTextMessageFails() throws Exception {
+ Session session = createSession();
+ session.createTextMessage();
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testCreateTextMessageWithTextFails() throws Exception {
+ Session session = createSession();
+ session.createTextMessage("TEST");
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testCreateMapMessageFails() throws Exception {
+ Session session = createSession();
+ session.createMapMessage();
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testCreateStreamMessageFails() throws Exception {
+ Session session = createSession();
+ session.createStreamMessage();
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testCreateBytesMessageFails() throws Exception {
+ Session session = createSession();
+ session.createBytesMessage();
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testCreateObjectMessageFails() throws Exception {
+ Session session = createSession();
+ session.createObjectMessage();
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testCreateObjectMessageWithObjectFails() throws Exception {
+ Session session = createSession();
+ session.createObjectMessage("TEST");
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testGetTransactedFails() throws Exception {
+ Session session = createSession();
+ session.getTransacted();
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testGetAcknowledgeModeFails() throws Exception {
+ Session session = createSession();
+ session.getAcknowledgeMode();
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testCommitFails() throws Exception {
+ Session session = createSession();
+ session.commit();
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testRollbackFails() throws Exception {
+ Session session = createSession();
+ session.rollback();
+ }
+
+ @Test(timeout=30000)
+ public void testCloseDoesNotFail() throws Exception {
+ Session session = createSession();
+ session.close();
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testRecoverFails() throws Exception {
+ Session session = createSession();
+ session.recover();
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testGetMessageListenerFails() throws Exception {
+ Session session = createSession();
+ session.getMessageListener();
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testSetMessageListenerFails() throws Exception {
+ Session session = createSession();
+ MessageListener listener = new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ }
+ };
+ session.setMessageListener(listener);
+ }
+
+ @Test(timeout=30000, expected=RuntimeException.class)
+ public void testRunFails() throws Exception {
+ Session session = createSession();
+ session.run();
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testCreateProducerFails() throws Exception {
+ Session session = createSession();
+ Destination destination = session.createQueue("test");
+ session.createProducer(destination);
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testCreateConsumerDestinatioFails() throws Exception {
+ Session session = createSession();
+ Destination destination = session.createQueue("test");
+ session.createConsumer(destination);
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testCreateConsumerDestinatioSelectorFails() throws Exception {
+ Session session = createSession();
+ Destination destination = session.createQueue("test");
+ session.createConsumer(destination, "a = b");
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testCreateConsumerDestinatioSelectorBooleanFails() throws Exception {
+ Session session = createSession();
+ Destination destination = session.createQueue("test");
+ session.createConsumer(destination, "a = b", true);
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testCreateQueueFails() throws Exception {
+ Session session = createSession();
+ session.createQueue("TEST");
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testCreateTopicFails() throws Exception {
+ Session session = createSession();
+ session.createTopic("TEST");
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testCreateTemporaryQueueFails() throws Exception {
+ Session session = createSession();
+ session.createTemporaryQueue();
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testCreateTemporaryTopicFails() throws Exception {
+ Session session = createSession();
+ session.createTemporaryQueue();
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testCreateDurableSubscriberFails() throws Exception {
+ Session session = createSession();
+ Topic destination = session.createTopic("TEST");
+ session.createDurableSubscriber(destination, "test");
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testCreateDurableSubscriberSelectorBooleanFails() throws Exception {
+ Session session = createSession();
+ Topic destination = session.createTopic("TEST");
+ session.createDurableSubscriber(destination, "test", "a = b", false);
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testCreateQueueBrowserFails() throws Exception {
+ Session session = createSession();
+ Queue destination = session.createQueue("test");
+ session.createBrowser(destination);
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testCreateQueueBrowserWithSelectorFails() throws Exception {
+ Session session = createSession();
+ Queue destination = session.createQueue("test");
+ session.createBrowser(destination, "a = b");
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testUnsubscribeFails() throws Exception {
+ Session session = createSession();
+ session.unsubscribe("test");
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionFailedTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionFailedTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionFailedTest.java
new file mode 100644
index 0000000..5e6846c
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionFailedTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.support.Wait;
+
+
+/**
+ * Tests the Session method contracts when the underlying connection is lost.
+ */
+public class JmsSessionFailedTest extends JmsSessionClosedTest {
+
+ @Override
+ protected Session createSession() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ connection = createAmqpConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ connection.setExceptionListener(new ExceptionListener() {
+
+ @Override
+ public void onException(JMSException exception) {
+ latch.countDown();
+ }
+ });
+ connection.start();
+ stopPrimaryBroker();
+ assertTrue(latch.await(20, TimeUnit.SECONDS));
+ final JmsConnection jmsConnection = (JmsConnection) connection;
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return !jmsConnection.isConnected();
+ }
+ }));
+ return session;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
new file mode 100644
index 0000000..6c3ca35
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import static org.junit.Assert.assertNotNull;
+
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+
+/**
+ * Test basic Session functionality.
+ */
+public class JmsSessionTest extends AmqpTestSupport {
+
+ @Test(timeout = 60000)
+ public void testCreateSession() throws Exception {
+ connection = createAmqpConnection();
+ assertNotNull(connection);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+
+ session.close();
+ }
+
+ @Test(timeout=30000)
+ public void testSessionCreateProducer() throws Exception {
+ connection = createAmqpConnection();
+ assertNotNull(connection);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+
+ Queue queue = session.createQueue("test.queue");
+ MessageProducer producer = session.createProducer(queue);
+
+ producer.close();
+ session.close();
+ }
+
+ @Test(timeout=30000)
+ public void testSessionCreateConsumer() throws Exception {
+ connection = createAmqpConnection();
+ assertNotNull(connection);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+
+ Queue queue = session.createQueue("test.queue");
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ consumer.close();
+ session.close();
+ }
+
+ @Test(timeout=30000)
+ public void testSessionDoubleCloseWithoutException() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.close();
+ session.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsTopicConnectionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsTopicConnectionTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsTopicConnectionTest.java
new file mode 100644
index 0000000..e423139
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsTopicConnectionTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import static org.junit.Assert.assertNotNull;
+
+import javax.jms.JMSException;
+import javax.jms.JMSSecurityException;
+import javax.jms.TopicConnection;
+
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class JmsTopicConnectionTest extends AmqpTestSupport {
+
+ @Test
+ public void testCreateQueueConnection() throws JMSException {
+ JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+ TopicConnection connection = factory.createTopicConnection();
+ assertNotNull(connection);
+ connection.close();
+ }
+
+ @Test(timeout=30000)
+ public void testCreateConnectionAsSystemAdmin() throws Exception {
+ JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+ factory.setUsername("system");
+ factory.setPassword("manager");
+ TopicConnection connection = factory.createTopicConnection();
+ assertNotNull(connection);
+ connection.start();
+ connection.close();
+ }
+
+ @Test(timeout=30000)
+ public void testCreateConnectionCallSystemAdmin() throws Exception {
+ JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+ TopicConnection connection = factory.createTopicConnection("system", "manager");
+ assertNotNull(connection);
+ connection.start();
+ connection.close();
+ }
+
+ @Test(timeout=30000, expected = JMSSecurityException.class)
+ public void testCreateConnectionAsUnknwonUser() throws Exception {
+ JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+ factory.setUsername("unknown");
+ factory.setPassword("unknown");
+ TopicConnection connection = factory.createTopicConnection();
+ assertNotNull(connection);
+ connection.start();
+ connection.close();
+ }
+
+ @Test(timeout=30000, expected = JMSSecurityException.class)
+ public void testCreateConnectionCallUnknownUser() throws Exception {
+ JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+ TopicConnection connection = factory.createTopicConnection("unknown", "unknown");
+ assertNotNull(connection);
+ connection.start();
+ connection.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ConsumeFromAMQPTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ConsumeFromAMQPTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ConsumeFromAMQPTest.java
new file mode 100644
index 0000000..90d7358
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ConsumeFromAMQPTest.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.bench;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ *
+ */
+@Ignore
+public class ConsumeFromAMQPTest extends AmqpTestSupport {
+
+ private final int MSG_COUNT = 50 * 1000;
+ private final int NUM_RUNS = 10;
+
+ @Override
+ protected boolean isForceAsyncSends() {
+ return true;
+ }
+
+ @Override
+ protected boolean isAlwaysSyncSend() {
+ return false;
+ }
+
+ @Override
+ protected String getAmqpTransformer() {
+ return "raw";
+ }
+
+ @Override
+ protected boolean isMessagePrioritySupported() {
+ return false;
+ }
+
+ @Override
+ protected boolean isSendAcksAsync() {
+ return true;
+ }
+
+ @Override
+ public String getAmqpConnectionURIOptions() {
+ return "provider.presettleProducers=true&provider.presettleConsumers=true";
+ }
+
+ @Test
+ public void oneConsumedForProfile() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getDestinationName());
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ TextMessage message = session.createTextMessage();
+ message.setText("hello");
+ producer.send(message);
+ producer.close();
+
+ QueueViewMBean queueView = getProxyToQueue(getDestinationName());
+ assertEquals("Queue should have a message", 1, queueView.getQueueSize());
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message received = consumer.receive(7000);
+ assertNotNull(received);
+ consumer.close();
+
+ assertEquals("Queue should have ano messages", 0, queueView.getQueueSize());
+ }
+
+ @Test
+ public void testConsumeRateFromQueue() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getDestinationName());
+
+ // Warm Up the broker.
+ produceMessages(queue, MSG_COUNT);
+ consumerMessages(queue, MSG_COUNT);
+ QueueViewMBean queueView = getProxyToQueue(getDestinationName());
+ queueView.purge();
+
+ List<Long> sendTimes = new ArrayList<Long>();
+ long cumulative = 0;
+
+ for (int i = 0; i < NUM_RUNS; ++i) {
+ produceMessages(queue, MSG_COUNT);
+ long result = consumerMessages(queue, MSG_COUNT);
+ sendTimes.add(result);
+ cumulative += result;
+ LOG.info("Time to send {} topic messages: {} ms", MSG_COUNT, result);
+ queueView.purge();
+ }
+
+ long smoothed = cumulative / NUM_RUNS;
+ LOG.info("Smoothed send time for {} messages: {}", MSG_COUNT, smoothed);
+ }
+
+ @Test
+ public void testConsumeRateFromQueueAsync() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getDestinationName());
+
+ // Warm Up the broker.
+ produceMessages(queue, MSG_COUNT);
+ consumerMessagesAsync(queue, MSG_COUNT);
+
+ QueueViewMBean queueView = getProxyToQueue(getDestinationName());
+
+ List<Long> sendTimes = new ArrayList<Long>();
+ long cumulative = 0;
+
+ for (int i = 0; i < NUM_RUNS; ++i) {
+ produceMessages(queue, MSG_COUNT);
+ long result = consumerMessagesAsync(queue, MSG_COUNT);
+ sendTimes.add(result);
+ cumulative += result;
+ LOG.info("Time to send {} topic messages: {} ms", MSG_COUNT, result);
+ queueView.purge();
+ }
+
+ long smoothed = cumulative / NUM_RUNS;
+ LOG.info("Smoothed send time for {} messages: {}", MSG_COUNT, smoothed);
+ }
+
+ protected long consumerMessages(Destination destination, int msgCount) throws Exception {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ long startTime = System.currentTimeMillis();
+ for (int i = 0; i < msgCount; ++i) {
+ Message message = consumer.receive(7000);
+ assertNotNull("Failed to receive message " + i, message);
+ }
+ long result = (System.currentTimeMillis() - startTime);
+
+ consumer.close();
+ return result;
+ }
+
+ protected long consumerMessagesAsync(Destination destination, int msgCount) throws Exception {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ final CountDownLatch doneLatch = new CountDownLatch(MSG_COUNT);
+ long startTime = System.currentTimeMillis();
+ consumer.setMessageListener(new MessageListener() {
+
+ @Override
+ public void onMessage(Message message) {
+ doneLatch.countDown();
+ }
+ });
+ assertTrue(doneLatch.await(60, TimeUnit.SECONDS));
+ long result = (System.currentTimeMillis() - startTime);
+
+ consumer.close();
+ return result;
+ }
+
+ protected void produceMessages(Destination destination, int msgCount) throws Exception {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(destination);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ TextMessage message = session.createTextMessage();
+ message.setText("hello");
+
+ for (int i = 0; i < msgCount; ++i) {
+ producer.send(message);
+ }
+
+ producer.close();
+ }
+
+ @Override
+ protected void configureBrokerPolicies(BrokerService broker) {
+ PolicyEntry policyEntry = new PolicyEntry();
+ policyEntry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
+ policyEntry.setPrioritizedMessages(false);
+ policyEntry.setExpireMessagesPeriod(0);
+ policyEntry.setEnableAudit(false);
+ policyEntry.setOptimizedDispatch(false);
+ policyEntry.setQueuePrefetch(1000);
+
+ PolicyMap policyMap = new PolicyMap();
+ policyMap.setDefaultEntry(policyEntry);
+ broker.setDestinationPolicy(policyMap);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProduceToAMQPTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProduceToAMQPTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProduceToAMQPTest.java
new file mode 100644
index 0000000..9077f08
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProduceToAMQPTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.bench;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Collect some basic throughput data on message producer.
+ */
+@Ignore
+public class ProduceToAMQPTest extends AmqpTestSupport {
+
+ private final int MSG_COUNT = 50 * 1000;
+ private final int NUM_RUNS = 20;
+
+ @Override
+ protected boolean isForceAsyncSends() {
+ return true;
+ }
+
+ @Override
+ protected boolean isAlwaysSyncSend() {
+ return false;
+ }
+
+ @Override
+ protected String getAmqpTransformer() {
+ return "raw";
+ }
+
+ @Override
+ public String getAmqpConnectionURIOptions() {
+ return "provider.presettle=true";
+ }
+
+ @Test
+ public void singleSendProfile() throws Exception {
+ connection = createAmqpConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic(getDestinationName());
+ MessageProducer producer = session.createProducer(topic);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ TextMessage message = session.createTextMessage();
+ message.setText("hello");
+ producer.send(message);
+ producer.close();
+ }
+
+ @Test
+ public void testProduceRateToTopic() throws Exception {
+
+ connection = createAmqpConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic(getDestinationName());
+
+ // Warm Up the broker.
+ produceMessages(topic, MSG_COUNT);
+
+ List<Long> sendTimes = new ArrayList<Long>();
+ long cumulative = 0;
+
+ for (int i = 0; i < NUM_RUNS; ++i) {
+ long result = produceMessages(topic, MSG_COUNT);
+ sendTimes.add(result);
+ cumulative += result;
+ LOG.info("Time to send {} topic messages: {} ms", MSG_COUNT, result);
+ }
+
+ long smoothed = cumulative / NUM_RUNS;
+ LOG.info("Smoothed send time for {} messages: {}", MSG_COUNT, smoothed);
+ TimeUnit.SECONDS.sleep(1);
+ }
+
+ @Test
+ public void testProduceRateToQueue() throws Exception {
+
+ connection = createAmqpConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getDestinationName());
+
+ // Warm Up the broker.
+ produceMessages(queue, MSG_COUNT);
+
+ QueueViewMBean queueView = getProxyToQueue(getDestinationName());
+ queueView.purge();
+
+ List<Long> sendTimes = new ArrayList<Long>();
+ long cumulative = 0;
+
+ for (int i = 0; i < NUM_RUNS; ++i) {
+ long result = produceMessages(queue, MSG_COUNT);
+ sendTimes.add(result);
+ cumulative += result;
+ LOG.info("Time to send {} queue messages: {} ms", MSG_COUNT, result);
+ queueView.purge();
+ }
+
+ long smoothed = cumulative / NUM_RUNS;
+ LOG.info("Smoothed send time for {} messages: {}", MSG_COUNT, smoothed);
+ TimeUnit.SECONDS.sleep(1);
+ }
+
+ protected long produceMessages(Destination destination, int msgCount) throws Exception {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(destination);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ TextMessage message = session.createTextMessage();
+ message.setText("hello");
+
+ long startTime = System.currentTimeMillis();
+ for (int i = 0; i < msgCount; ++i) {
+ producer.send(message);
+ }
+ long result = (System.currentTimeMillis() - startTime);
+
+ producer.close();
+ return result;
+ }
+
+ @Override
+ protected void configureBrokerPolicies(BrokerService broker) {
+ PolicyEntry policyEntry = new PolicyEntry();
+ policyEntry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
+ policyEntry.setPrioritizedMessages(false);
+ policyEntry.setExpireMessagesPeriod(0);
+ policyEntry.setEnableAudit(false);
+ policyEntry.setOptimizedDispatch(true);
+ policyEntry.setQueuePrefetch(100);
+
+ PolicyMap policyMap = new PolicyMap();
+ policyMap.setDefaultEntry(policyEntry);
+ broker.setDestinationPolicy(policyMap);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProduceToOpenWireTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProduceToOpenWireTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProduceToOpenWireTest.java
new file mode 100644
index 0000000..3fa8fd5
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProduceToOpenWireTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.bench;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Ignore
+public class ProduceToOpenWireTest extends AmqpTestSupport {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(ProduceToOpenWireTest.class);
+
+ private final int MSG_COUNT = 50 * 1000;
+ private final int NUM_RUNS = 40;
+
+ @Test
+ public void singleSendProfile() throws Exception {
+ connection = createActiveMQConnection();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic(getDestinationName());
+ MessageProducer producer = session.createProducer(topic);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ TextMessage message = session.createTextMessage();
+ message.setText("hello");
+ producer.send(message);
+ }
+
+ @Test
+ public void testProduceRateToTopic() throws Exception {
+
+ connection = createActiveMQConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic(getDestinationName());
+
+ // Warm Up the broker.
+ produceMessages(topic, MSG_COUNT);
+
+ List<Long> sendTimes = new ArrayList<Long>();
+ long cumulative = 0;
+
+ for (int i = 0; i < NUM_RUNS; ++i) {
+ long result = produceMessages(topic, MSG_COUNT);
+ sendTimes.add(result);
+ cumulative += result;
+ LOG.info("Time to send {} topic messages: {} ms", MSG_COUNT, result);
+ }
+
+ long smoothed = cumulative / NUM_RUNS;
+ LOG.info("Smoothed send time for {} messages: {}", MSG_COUNT, smoothed);
+ }
+
+ @Test
+ public void testProduceRateToQueue() throws Exception {
+
+ connection = createActiveMQConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getDestinationName());
+
+ // Warm Up the broker.
+ produceMessages(queue, MSG_COUNT);
+
+ QueueViewMBean queueView = getProxyToQueue(getDestinationName());
+ queueView.purge();
+
+ List<Long> sendTimes = new ArrayList<Long>();
+ long cumulative = 0;
+
+ for (int i = 0; i < NUM_RUNS; ++i) {
+ long result = produceMessages(queue, MSG_COUNT);
+ sendTimes.add(result);
+ cumulative += result;
+ LOG.info("Time to send {} queue messages: {} ms", MSG_COUNT, result);
+ queueView.purge();
+ }
+
+ long smoothed = cumulative / NUM_RUNS;
+ LOG.info("Smoothed send time for {} messages: {}", MSG_COUNT, smoothed);
+ }
+
+ protected long produceMessages(Destination destination, int msgCount) throws Exception {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(destination);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ TextMessage message = session.createTextMessage();
+ message.setText("hello");
+
+ long startTime = System.currentTimeMillis();
+ for (int i = 0; i < msgCount; ++i) {
+ producer.send(message);
+ }
+
+ long result = (System.currentTimeMillis() - startTime);
+
+ producer.close();
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProducerAndConsumerBench.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProducerAndConsumerBench.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProducerAndConsumerBench.java
new file mode 100644
index 0000000..0dacc84
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProducerAndConsumerBench.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.bench;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Vector;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+@Ignore
+public class ProducerAndConsumerBench extends AmqpTestSupport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ProducerAndConsumerBench.class);
+
+ public static final int payload = 64 * 1024;
+ public static final int ioBuffer = 2 * payload;
+ public static final int socketBuffer = 64 * payload;
+
+ private final String payloadString = new String(new byte[payload]);
+ private final int parallelProducer = 1;
+ private final int parallelConsumer = 1;
+ private final Vector<Throwable> exceptions = new Vector<Throwable>();
+ private ConnectionFactory factory;
+
+ private final long NUM_SENDS = 30000;
+
+ @Test
+ public void testProduceConsume() throws Exception {
+ this.factory = createAmqpConnectionFactory();
+
+ final AtomicLong sharedSendCount = new AtomicLong(NUM_SENDS);
+ final AtomicLong sharedReceiveCount = new AtomicLong(NUM_SENDS);
+
+ Thread.sleep(2000);
+
+ long start = System.currentTimeMillis();
+ ExecutorService executorService = Executors.newFixedThreadPool(parallelConsumer + parallelProducer);
+
+ for (int i = 0; i < parallelConsumer; i++) {
+ executorService.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ consumeMessages(sharedReceiveCount);
+ } catch (Throwable e) {
+ exceptions.add(e);
+ }
+ }
+ });
+ }
+ for (int i = 0; i < parallelProducer; i++) {
+ executorService.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ publishMessages(sharedSendCount);
+ } catch (Throwable e) {
+ exceptions.add(e);
+ }
+ }
+ });
+ }
+
+ executorService.shutdown();
+ executorService.awaitTermination(30, TimeUnit.MINUTES);
+ assertTrue("Producers done in time", executorService.isTerminated());
+ assertTrue("No exceptions: " + exceptions, exceptions.isEmpty());
+
+ double duration = System.currentTimeMillis() - start;
+ LOG.info("Duration: " + duration + "ms");
+ LOG.info("Rate: " + (NUM_SENDS * 1000 / duration) + "m/s");
+ }
+
+ private void consumeMessages(AtomicLong count) throws Exception {
+ JmsConnection connection = (JmsConnection) factory.createConnection();
+ connection.start();
+ Session session = connection.createSession(false, ActiveMQSession.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getDestinationName());
+ MessageConsumer consumer = session.createConsumer(queue);
+ long v;
+ while ((v = count.decrementAndGet()) > 0) {
+ assertNotNull("got message " + v, consumer.receive(15000));
+ }
+ consumer.close();
+ }
+
+ private void publishMessages(AtomicLong count) throws Exception {
+ JmsConnection connection = (JmsConnection) factory.createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getDestinationName());
+
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ Message message = session.createBytesMessage();
+ ((BytesMessage) message).writeBytes(payloadString.getBytes());
+
+ while (count.getAndDecrement() > 0) {
+ producer.send(message);
+ }
+ producer.close();
+ connection.close();
+ }
+
+ @Override
+ protected void configureBrokerPolicies(BrokerService broker) {
+ PolicyEntry policyEntry = new PolicyEntry();
+ policyEntry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
+ policyEntry.setPrioritizedMessages(false);
+ policyEntry.setExpireMessagesPeriod(0);
+ policyEntry.setEnableAudit(false);
+ policyEntry.setOptimizedDispatch(true);
+ policyEntry.setQueuePrefetch(1); // ensure no contention on add with
+ // matched producer/consumer
+
+ PolicyMap policyMap = new PolicyMap();
+ policyMap.setDefaultEntry(policyEntry);
+ broker.setDestinationPolicy(policyMap);
+ }
+
+ @Override
+ protected boolean isForceAsyncSends() {
+ return true;
+ }
+
+ @Override
+ protected boolean isAlwaysSyncSend() {
+ return false;
+ }
+
+ @Override
+ protected String getAmqpTransformer() {
+ return "raw";
+ }
+
+ @Override
+ protected boolean isMessagePrioritySupported() {
+ return false;
+ }
+
+ @Override
+ protected boolean isSendAcksAsync() {
+ return true;
+ }
+
+ @Override
+ public String getAmqpConnectionURIOptions() {
+ return "provider.presettleProducers=true&provider.presettleConsumers=true";
+ }
+
+ @Override
+ protected int getSocketBufferSize() {
+ return socketBuffer;
+ }
+
+ @Override
+ protected int getIOBufferSize() {
+ return ioBuffer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsAutoAckTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsAutoAckTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsAutoAckTest.java
new file mode 100644
index 0000000..db2a104
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsAutoAckTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.apache.qpid.jms.support.Wait;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class JmsAutoAckTest extends AmqpTestSupport {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(JmsAutoAckTest.class);
+
+ @Test(timeout = 60000)
+ public void testAckedMessageAreConsumed() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ sendToAmqQueue(1);
+
+ final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(1, proxy.getQueueSize());
+
+ assertNotNull("Failed to receive any message.", consumer.receive(2000));
+
+ assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return proxy.getQueueSize() == 0;
+ }
+ }));
+ }
+
+ @Test(timeout = 60000)
+ public void testAckedMessageAreConsumedAsync() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ sendToAmqQueue(1);
+
+ final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(1, proxy.getQueueSize());
+
+ consumer.setMessageListener(new MessageListener() {
+
+ @Override
+ public void onMessage(Message message) {
+ LOG.debug("Received async message: {}", message);
+ }
+ });
+
+ assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return proxy.getQueueSize() == 0;
+ }
+ }));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org