You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2010/02/26 15:37:14 UTC

svn commit: r916692 - in /qpid/branches/0.5.x-dev/qpid/java: ./ systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java

Author: robbie
Date: Fri Feb 26 14:37:14 2010
New Revision: 916692

URL: http://svn.apache.org/viewvc?rev=916692&view=rev
Log:
QPID-2417: add TTL testing of durable topic subscription

merged from trunk r915867

Modified:
    qpid/branches/0.5.x-dev/qpid/java/   (props changed)
    qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java

Propchange: qpid/branches/0.5.x-dev/qpid/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Feb 26 14:37:14 2010
@@ -1,2 +1,2 @@
 /qpid/branches/java-broker-0-10/qpid/java:829414,829575
-/qpid/trunk/qpid/java:835115,884634-884635,884838,885765,887948,887950-887952,887994,888246,888248,888250,888345,888348,889645,891323-891332,892228,896674,896692-896693,900919,900943,902231,907851,915866
+/qpid/trunk/qpid/java:835115,884634-884635,884838,885765,887948,887950-887952,887994,888246,888248,888250,888345,888348,889645,891323-891332,892228,896674,896692-896693,900919,900943,902231,907851,915866-915867

Modified: qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java?rev=916692&r1=916691&r2=916692&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java Fri Feb 26 14:37:14 2010
@@ -28,12 +28,16 @@
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
+import javax.jms.TopicSubscriber;
 
 import junit.framework.Assert;
 
 import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 import java.util.concurrent.locks.ReentrantLock;
@@ -154,6 +158,7 @@
     {
         Message send = producerSession.createTextMessage("Message " + msg);
         send.setBooleanProperty("first", first);
+        send.setStringProperty("testprop", "TimeToLiveTest");
         send.setLongProperty("TTL", producer.getTimeToLive());
         return send;
     }
@@ -206,5 +211,155 @@
         producerSession.close();
         producerConnection.close();
     }
+    
+    public void testPassiveTTLwithDurableSubscription() throws Exception
+    {
+        //Create Client 1
+        Connection clientConnection = getConnection();
+        
+        Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        
+        // Create and close the durable subscriber
+        AMQTopic topic = new AMQTopic((AMQConnection) clientConnection, getTestQueueName());
+        TopicSubscriber durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName(),"testprop='TimeToLiveTest'", false);
+        durableSubscriber.close();
+        
+        //Create Producer
+        Connection producerConnection = getConnection();
+
+        producerConnection.start();
+
+        // Move to a Transacted session to ensure that all messages have been delivered to broker before
+        // we start waiting for TTL
+        Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED);
+
+        MessageProducer producer = producerSession.createProducer(topic);
+
+        //Set TTL
+        int msg = 0;
+        producer.send(nextMessage(String.valueOf(msg), true, producerSession, producer));
+
+        producer.setTimeToLive(TIME_TO_LIVE);
+
+        for (; msg < MSG_COUNT - 2; msg++)
+        {
+            producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer));
+        }
+
+        //Reset TTL
+        producer.setTimeToLive(0L);
+        producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer));
+
+        producerSession.commit();
+        
+        //resubscribe
+        durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName());
+
+        // Ensure we sleep the required amount of time.
+        ReentrantLock waitLock = new ReentrantLock();
+        Condition wait = waitLock.newCondition();
+        final long MILLIS = 1000000L;
+
+        long waitTime = TIME_TO_LIVE * MILLIS;
+        while (waitTime > 0)
+        {
+            try
+            {
+                waitLock.lock();
+
+                waitTime = wait.awaitNanos(waitTime);
+            }
+            catch (InterruptedException e)
+            {
+                //Stop if we are interrupted
+                fail(e.getMessage());
+            }
+            finally
+            {
+                waitLock.unlock();
+            }
+
+        }
+
+        clientConnection.start();
+
+        //Receive Message 0
+        // Set 5s receive time for messages we expect to receive.
+        Message receivedFirst = durableSubscriber.receive(5000);
+        Message receivedSecond = durableSubscriber.receive(5000);
+        Message receivedThird = durableSubscriber.receive(1000);
+        
+        // Log the messages to help diagnosis incase of failure
+        _logger.info("First:"+receivedFirst);
+        _logger.info("Second:"+receivedSecond);
+        _logger.info("Third:"+receivedThird);
+
+        // Only first and last messages sent should survive expiry
+        Assert.assertNull("More messages received", receivedThird); 
+
+        Assert.assertNotNull("First message not received", receivedFirst);
+        Assert.assertTrue("First message doesn't have first set.", receivedFirst.getBooleanProperty("first"));
+        Assert.assertEquals("First message has incorrect TTL.", 0L, receivedFirst.getLongProperty("TTL"));
+
+        Assert.assertNotNull("Final message not received", receivedSecond);
+        Assert.assertFalse("Final message has first set.", receivedSecond.getBooleanProperty("first"));
+        Assert.assertEquals("Final message has incorrect TTL.", 0L, receivedSecond.getLongProperty("TTL"));
+
+        clientConnection.close();
+
+        producerConnection.close();
+    }
+
+    public void testActiveTTLwithDurableSubscription() throws Exception
+    {
+        //Create Client 1
+        Connection clientConnection = getConnection();
+        Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        
+        // Create and close the durable subscriber
+        AMQTopic topic = new AMQTopic((AMQConnection) clientConnection, getTestQueueName());
+        TopicSubscriber durableSubscriber = clientSession.createDurableSubscriber(topic, "MyDurableTTLSubscription","testprop='TimeToLiveTest'", false);
+        durableSubscriber.close();
+        
+        //Create Producer
+        Connection producerConnection = getConnection();
+        AMQSession producerSession = (AMQSession) producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(topic);
+        producer.setTimeToLive(1000L);
+
+        // send Messages
+        for(int i = 0; i < MSG_COUNT; i++)
+        {
+            producer.send(producerSession.createTextMessage("Message: "+i));
+        }
+        long failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT;
+
+        // check Queue depth for up to TIMEOUT seconds after the Queue Depth hasn't changed for 100ms.
+        long messageCount = MSG_COUNT;
+        long lastPass;
+        AMQQueue subcriptionQueue = new AMQQueue("amq.topic","clientid" + ":" + "MyDurableTTLSubscription");
+        do
+        {
+            lastPass = messageCount;
+            Thread.sleep(100);
+            messageCount = producerSession.getQueueDepth((AMQDestination) subcriptionQueue);
+
+            // If we have received messages in the last loop then extend the timeout time.
+            // if we get messages stuck that are not expiring then the failureTime will occur
+            // failing the test. This will help with the scenario when the broker does not
+            // have enough CPU cycles to process the TTLs.
+            if (lastPass != messageCount)
+            {
+                failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT;
+            }
+        }
+        while(messageCount > 0L && System.currentTimeMillis() < failureTime);
+
+        assertEquals("Messages not automatically expired: ", 0L, messageCount);
+
+        producer.close();
+        producerSession.close();
+        producerConnection.close();
+    }
 
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org