You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/12/11 22:58:12 UTC

git commit: Some additional JMS Tests focused on Topics. Useful when updating the AMQP JMS Client version as it shows some new problem in the latest SNAPSHOT builds.

Updated Branches:
  refs/heads/trunk 78c4e4337 -> e1e8c5b08


Some additional JMS Tests focused on Topics.  Useful when updating the
AMQP JMS Client version as it shows some new problem in the latest
SNAPSHOT builds.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e1e8c5b0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e1e8c5b0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e1e8c5b0

Branch: refs/heads/trunk
Commit: e1e8c5b083e0c9f6e6c6f4dd25d35ad039fc4ca1
Parents: 78c4e43
Author: Timothy Bish <ta...@gmai.com>
Authored: Wed Dec 11 16:58:08 2013 -0500
Committer: Timothy Bish <ta...@gmai.com>
Committed: Wed Dec 11 16:58:08 2013 -0500

----------------------------------------------------------------------
 .../activemq/transport/amqp/JMSClientTest.java  | 182 +++++++++++++++++--
 1 file changed, 168 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/e1e8c5b0/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index e35127d..ebed8d6 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -23,7 +23,9 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.util.Enumeration;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
@@ -31,6 +33,7 @@ import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.QueueBrowser;
 import javax.jms.Session;
@@ -38,8 +41,10 @@ import javax.jms.TextMessage;
 
 import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
+import org.apache.activemq.util.Wait;
 import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
 import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
+import org.apache.qpid.amqp_1_0.jms.impl.TopicImpl;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
@@ -347,26 +352,175 @@ public class JMSClientTest extends AmqpTestSupport {
 
     @Test(timeout=30000)
     public void testTTL() throws Exception {
-        QueueImpl queue = new QueueImpl("queue://" + name);
+        Connection connection = null;
+        try {
+            QueueImpl queue = new QueueImpl("queue://" + name);
+            connection = createConnection();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            connection.start();
+            MessageProducer producer = session.createProducer(queue);
+            producer.setTimeToLive(1000);
+            Message toSend = session.createTextMessage("Sample text");
+            producer.send(toSend);
+            MessageConsumer consumer = session.createConsumer(queue);
+            Message received = consumer.receive(5000);
+            assertNotNull(received);
+            LOG.info("Message JMSExpiration = {}", received.getJMSExpiration());
+            producer.setTimeToLive(100);
+            producer.send(toSend);
+            TimeUnit.SECONDS.sleep(2);
+            received = consumer.receive(5000);
+            if (received != null) {
+                LOG.info("Message JMSExpiration = {} JMSTimeStamp = {} TTL = {}",
+                         new Object[] { received.getJMSExpiration(), received.getJMSTimestamp(),
+                                        received.getJMSExpiration() - received.getJMSTimestamp()});
+            }
+            assertNull(received);
+        } finally {
+            connection.close();
+        }
+    }
+
+    @Test(timeout=30000)
+    public void testDurableConsumerAsync() throws Exception {
+        ActiveMQAdmin.enableJMSFrameTracing();
+        TopicImpl topic = new TopicImpl("topic://"+name);
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicReference<Message> received = new AtomicReference<Message>();
+
         Connection connection = createConnection();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        connection.start();
-        MessageProducer producer = session.createProducer(queue);
-        producer.setTimeToLive(1000);
-        Message toSend = session.createTextMessage("Sample text");
-        producer.send(toSend);
-        MessageConsumer consumer = session.createConsumer(queue);
-        Message received = consumer.receive(5000);
-        assertNotNull(received);
-        producer.setTimeToLive(100);
-        producer.send(toSend);
-        TimeUnit.SECONDS.sleep(1);
-        assertNull(consumer.receive(5000));
+        {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = session.createDurableSubscriber(topic, "DurbaleTopic");
+            consumer.setMessageListener(new MessageListener() {
+
+                @Override
+                public void onMessage(Message message) {
+                    received.set(message);
+                    latch.countDown();
+                }
+            });
+
+            MessageProducer producer = session.createProducer(topic);
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+            connection.start();
+
+            TextMessage message = session.createTextMessage();
+            message.setText("hello");
+            producer.send(message);
+
+            assertTrue(latch.await(10, TimeUnit.SECONDS));
+            assertNotNull("Should have received a message by now.", received.get());
+            assertTrue("Should be an instance of TextMessage", received.get() instanceof TextMessage);
+        }
+        connection.close();
+    }
+
+    @Test(timeout=30000)
+    public void testDurableConsumerSync() throws Exception {
+        ActiveMQAdmin.enableJMSFrameTracing();
+        TopicImpl topic = new TopicImpl("topic://"+name);
+
+        Connection connection = createConnection();
+        {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            final MessageConsumer consumer = session.createDurableSubscriber(topic, "DurbaleTopic");
+            MessageProducer producer = session.createProducer(topic);
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+            connection.start();
+
+            TextMessage message = session.createTextMessage();
+            message.setText("hello");
+            producer.send(message);
+
+            final AtomicReference<Message> msg = new AtomicReference<Message>();
+            assertTrue(Wait.waitFor(new Wait.Condition() {
+
+                @Override
+                public boolean isSatisified() throws Exception {
+                    msg.set(consumer.receiveNoWait());
+                    return msg.get() != null;
+                }
+            }));
+
+            assertNotNull("Should have received a message by now.", msg.get());
+            assertTrue("Should be an instance of TextMessage", msg.get() instanceof TextMessage);
+        }
+        connection.close();
+    }
+
+    @Test(timeout=30000)
+    public void testTopicConsumerAsync() throws Exception {
+        ActiveMQAdmin.enableJMSFrameTracing();
+        TopicImpl topic = new TopicImpl("topic://"+name);
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicReference<Message> received = new AtomicReference<Message>();
+
+        Connection connection = createConnection();
+        {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = session.createConsumer(topic);
+            consumer.setMessageListener(new MessageListener() {
+
+                @Override
+                public void onMessage(Message message) {
+                    received.set(message);
+                    latch.countDown();
+                }
+            });
+
+            MessageProducer producer = session.createProducer(topic);
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+            connection.start();
+
+            TextMessage message = session.createTextMessage();
+            message.setText("hello");
+            producer.send(message);
+
+            assertTrue(latch.await(10, TimeUnit.SECONDS));
+            assertNotNull("Should have received a message by now.", received.get());
+            assertTrue("Should be an instance of TextMessage", received.get() instanceof TextMessage);
+        }
+        connection.close();
+    }
+
+    @Test(timeout=45000)
+    public void testTopicConsumerSync() throws Exception {
+        ActiveMQAdmin.enableJMSFrameTracing();
+        TopicImpl topic = new TopicImpl("topic://"+name);
+
+        Connection connection = createConnection();
+        {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            final MessageConsumer consumer = session.createConsumer(topic);
+            MessageProducer producer = session.createProducer(topic);
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+            connection.start();
+
+            TextMessage message = session.createTextMessage();
+            message.setText("hello");
+            producer.send(message);
+
+            final AtomicReference<Message> msg = new AtomicReference<Message>();
+            assertTrue(Wait.waitFor(new Wait.Condition() {
+
+                @Override
+                public boolean isSatisified() throws Exception {
+                    msg.set(consumer.receiveNoWait());
+                    return msg.get() != null;
+                }
+            }));
+
+            assertNotNull("Should have received a message by now.", msg.get());
+            assertTrue("Should be an instance of TextMessage", msg.get() instanceof TextMessage);
+        }
+        connection.close();
     }
 
     private Connection createConnection() throws JMSException {
         final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password");
         final Connection connection = factory.createConnection();
+        connection.setClientID(name.toString());
         connection.setExceptionListener(new ExceptionListener() {
             @Override
             public void onException(JMSException exception) {