You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/12/11 22:58:12 UTC
git commit: Some additional JMS Tests focused on Topics. Useful when
updating the AMQP JMS Client version as it shows some new problem in the
latest SNAPSHOT builds.
Updated Branches:
refs/heads/trunk 78c4e4337 -> e1e8c5b08
Some additional JMS Tests focused on Topics. Useful when updating the
AMQP JMS Client version as it shows some new problem in the latest
SNAPSHOT builds.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e1e8c5b0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e1e8c5b0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e1e8c5b0
Branch: refs/heads/trunk
Commit: e1e8c5b083e0c9f6e6c6f4dd25d35ad039fc4ca1
Parents: 78c4e43
Author: Timothy Bish <ta...@gmai.com>
Authored: Wed Dec 11 16:58:08 2013 -0500
Committer: Timothy Bish <ta...@gmai.com>
Committed: Wed Dec 11 16:58:08 2013 -0500
----------------------------------------------------------------------
.../activemq/transport/amqp/JMSClientTest.java | 182 +++++++++++++++++--
1 file changed, 168 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/e1e8c5b0/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index e35127d..ebed8d6 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -23,7 +23,9 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.Enumeration;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
@@ -31,6 +33,7 @@ import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.QueueBrowser;
import javax.jms.Session;
@@ -38,8 +41,10 @@ import javax.jms.TextMessage;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
+import org.apache.activemq.util.Wait;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
+import org.apache.qpid.amqp_1_0.jms.impl.TopicImpl;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
@@ -347,26 +352,175 @@ public class JMSClientTest extends AmqpTestSupport {
@Test(timeout=30000)
public void testTTL() throws Exception {
- QueueImpl queue = new QueueImpl("queue://" + name);
+ Connection connection = null;
+ try {
+ QueueImpl queue = new QueueImpl("queue://" + name);
+ connection = createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ connection.start();
+ MessageProducer producer = session.createProducer(queue);
+ producer.setTimeToLive(1000);
+ Message toSend = session.createTextMessage("Sample text");
+ producer.send(toSend);
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message received = consumer.receive(5000);
+ assertNotNull(received);
+ LOG.info("Message JMSExpiration = {}", received.getJMSExpiration());
+ producer.setTimeToLive(100);
+ producer.send(toSend);
+ TimeUnit.SECONDS.sleep(2);
+ received = consumer.receive(5000);
+ if (received != null) {
+ LOG.info("Message JMSExpiration = {} JMSTimeStamp = {} TTL = {}",
+ new Object[] { received.getJMSExpiration(), received.getJMSTimestamp(),
+ received.getJMSExpiration() - received.getJMSTimestamp()});
+ }
+ assertNull(received);
+ } finally {
+ connection.close();
+ }
+ }
+
+ @Test(timeout=30000)
+ public void testDurableConsumerAsync() throws Exception {
+ ActiveMQAdmin.enableJMSFrameTracing();
+ TopicImpl topic = new TopicImpl("topic://"+name);
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicReference<Message> received = new AtomicReference<Message>();
+
Connection connection = createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- connection.start();
- MessageProducer producer = session.createProducer(queue);
- producer.setTimeToLive(1000);
- Message toSend = session.createTextMessage("Sample text");
- producer.send(toSend);
- MessageConsumer consumer = session.createConsumer(queue);
- Message received = consumer.receive(5000);
- assertNotNull(received);
- producer.setTimeToLive(100);
- producer.send(toSend);
- TimeUnit.SECONDS.sleep(1);
- assertNull(consumer.receive(5000));
+ {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createDurableSubscriber(topic, "DurbaleTopic");
+ consumer.setMessageListener(new MessageListener() {
+
+ @Override
+ public void onMessage(Message message) {
+ received.set(message);
+ latch.countDown();
+ }
+ });
+
+ MessageProducer producer = session.createProducer(topic);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ connection.start();
+
+ TextMessage message = session.createTextMessage();
+ message.setText("hello");
+ producer.send(message);
+
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertNotNull("Should have received a message by now.", received.get());
+ assertTrue("Should be an instance of TextMessage", received.get() instanceof TextMessage);
+ }
+ connection.close();
+ }
+
+ @Test(timeout=30000)
+ public void testDurableConsumerSync() throws Exception {
+ ActiveMQAdmin.enableJMSFrameTracing();
+ TopicImpl topic = new TopicImpl("topic://"+name);
+
+ Connection connection = createConnection();
+ {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer consumer = session.createDurableSubscriber(topic, "DurbaleTopic");
+ MessageProducer producer = session.createProducer(topic);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ connection.start();
+
+ TextMessage message = session.createTextMessage();
+ message.setText("hello");
+ producer.send(message);
+
+ final AtomicReference<Message> msg = new AtomicReference<Message>();
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ msg.set(consumer.receiveNoWait());
+ return msg.get() != null;
+ }
+ }));
+
+ assertNotNull("Should have received a message by now.", msg.get());
+ assertTrue("Should be an instance of TextMessage", msg.get() instanceof TextMessage);
+ }
+ connection.close();
+ }
+
+ @Test(timeout=30000)
+ public void testTopicConsumerAsync() throws Exception {
+ ActiveMQAdmin.enableJMSFrameTracing();
+ TopicImpl topic = new TopicImpl("topic://"+name);
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicReference<Message> received = new AtomicReference<Message>();
+
+ Connection connection = createConnection();
+ {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(topic);
+ consumer.setMessageListener(new MessageListener() {
+
+ @Override
+ public void onMessage(Message message) {
+ received.set(message);
+ latch.countDown();
+ }
+ });
+
+ MessageProducer producer = session.createProducer(topic);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ connection.start();
+
+ TextMessage message = session.createTextMessage();
+ message.setText("hello");
+ producer.send(message);
+
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertNotNull("Should have received a message by now.", received.get());
+ assertTrue("Should be an instance of TextMessage", received.get() instanceof TextMessage);
+ }
+ connection.close();
+ }
+
+ @Test(timeout=45000)
+ public void testTopicConsumerSync() throws Exception {
+ ActiveMQAdmin.enableJMSFrameTracing();
+ TopicImpl topic = new TopicImpl("topic://"+name);
+
+ Connection connection = createConnection();
+ {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer consumer = session.createConsumer(topic);
+ MessageProducer producer = session.createProducer(topic);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ connection.start();
+
+ TextMessage message = session.createTextMessage();
+ message.setText("hello");
+ producer.send(message);
+
+ final AtomicReference<Message> msg = new AtomicReference<Message>();
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ msg.set(consumer.receiveNoWait());
+ return msg.get() != null;
+ }
+ }));
+
+ assertNotNull("Should have received a message by now.", msg.get());
+ assertTrue("Should be an instance of TextMessage", msg.get() instanceof TextMessage);
+ }
+ connection.close();
}
private Connection createConnection() throws JMSException {
final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password");
final Connection connection = factory.createConnection();
+ connection.setClientID(name.toString());
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {