You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2010/02/24 17:56:16 UTC

svn commit: r915866 - /qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java

Author: robbie
Date: Wed Feb 24 16:56:16 2010
New Revision: 915866

URL: http://svn.apache.org/viewvc?rev=915866&view=rev
Log:
QPID-2417: update the auto-ack tests to leave an unacked message on the durable subscriptions queue between disconnect and reconnect, create consumer1 on the correct connection in SessionPerConnection tests, add single connection NO_ACK test.

Modified:
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=915866&r1=915865&r2=915866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java Wed Feb 24 16:56:16 2010
@@ -115,6 +115,11 @@
         _logger.info("Close connection");
         con.close();
     }
+    
+    public void testDurabilityNOACK() throws Exception
+    {
+        durabilityImpl(AMQSession.NO_ACKNOWLEDGE);
+    }
 
     public void testDurabilityAUTOACK() throws Exception
     {
@@ -138,49 +143,78 @@
         Session session1 = con.createSession(false, ackMode);
         MessageConsumer consumer1 = session1.createConsumer(topic);
 
-        Session sessionProd = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+        Session sessionProd = con.createSession(false, ackMode);
         MessageProducer producer = sessionProd.createProducer(topic);
 
-        Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+        Session session2 = con.createSession(false, ackMode);
         TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
 
         con.start();
 
+        //send message A and check both consumers receive
         producer.send(session1.createTextMessage("A"));
 
         Message msg;
+        _logger.info("Receive message on consumer 1 :expecting A");
         msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
         assertNotNull("Message should have been received",msg);
         assertEquals("A", ((TextMessage) msg).getText());
         msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertEquals(null, msg);
 
+        _logger.info("Receive message on consumer 2 :expecting A");
         msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
         assertNotNull("Message should have been received",msg);
         assertEquals("A", ((TextMessage) msg).getText());
         msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertEquals(null, msg);
 
-        consumer2.close();
-        session2.close();
-
+        //send message B, receive with consumer 1, and disconnect consumer 2 to leave the message behind (if not NO_ACK)
         producer.send(session1.createTextMessage("B"));
 
         _logger.info("Receive message on consumer 1 :expecting B");
         msg = consumer1.receive(500);
         assertNotNull("Consumer 1 should get message 'B'.", msg);
-        assertEquals("Incorrect Message recevied on consumer1.", "B", ((TextMessage) msg).getText());
+        assertEquals("Incorrect Message received on consumer1.", "B", ((TextMessage) msg).getText());
         _logger.info("Receive message on consumer 1 :expecting null");
         msg = consumer1.receive(500);
         assertNull("There should be no more messages for consumption on consumer1.", msg);
 
+        consumer2.close();
+        session2.close();
+        
+        //Send message C, then connect consumer 3 to durable subscription and get
+        //message B if not using NO_ACK, then receive C with consumer 1 and 3
+        producer.send(session1.createTextMessage("C"));
+
         Session session3 = con.createSession(false, ackMode);
         MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "MySubscription");
 
-        _logger.info("Receive message on consumer 3 :expecting B");
+        if(ackMode == AMQSession.NO_ACKNOWLEDGE)
+        {
+            //Do nothing if NO_ACK was used, as prefetch means the message was dropped
+            //when we didn't call receive() to get it before closing consumer 2
+        }
+        else
+        {
+            _logger.info("Receive message on consumer 3 :expecting B");
+            msg = consumer3.receive(500);
+            assertNotNull("Consumer 3 should get message 'B'.", msg);
+            assertEquals("Incorrect Message received on consumer3.", "B", ((TextMessage) msg).getText());
+        }
+
+        _logger.info("Receive message on consumer 1 :expecting C");
+        msg = consumer1.receive(500);
+        assertNotNull("Consumer 1 should get message 'C'.", msg);
+        assertEquals("Incorrect Message received on consumer1.", "C", ((TextMessage) msg).getText());
+        _logger.info("Receive message on consumer 1 :expecting null");
+        msg = consumer1.receive(500);
+        assertNull("There should be no more messages for consumption on consumer1.", msg);
+
+        _logger.info("Receive message on consumer 3 :expecting C");
         msg = consumer3.receive(500);
-        assertNotNull("Consumer 3 should get message 'B'.", msg);
-        assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText());
+        assertNotNull("Consumer 3 should get message 'C'.", msg);
+        assertEquals("Incorrect Message received on consumer3.", "C", ((TextMessage) msg).getText());
         _logger.info("Receive message on consumer 3 :expecting null");
         msg = consumer3.receive(500);
         assertNull("There should be no more messages for consumption on consumer3.", msg);
@@ -211,7 +245,7 @@
         con1.start();
         Session session1 = con1.createSession(false, ackMode);
 
-        MessageConsumer consumer1 = session0.createConsumer(topic);
+        MessageConsumer consumer1 = session1.createConsumer(topic);
 
         // Create consumer 2.
         AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
@@ -232,37 +266,60 @@
         msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
         assertNotNull("Message should have been received",msg);
         assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText());
-        msg = consumer2.receive(500);
+        msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertNull("There should be no more messages for consumption on consumer2.", msg);
 
+        // Send message and receive on consumer 1.
+        producer.send(session0.createTextMessage("B"));
+
+        _logger.info("Receive message on consumer 1 :expecting B");
+        msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+        assertEquals("B", ((TextMessage) msg).getText());
+        _logger.info("Receive message on consumer 1 :expecting null");
+        msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
+        assertEquals(null, msg);
+        
         // Detach the durable subscriber.
         consumer2.close();
         session2.close();
         con2.close();
+        
+        // Send message C and receive on consumer 1
+        producer.send(session0.createTextMessage("C"));
 
-        // Send message and receive on open consumer.
-        producer.send(session0.createTextMessage("B"));
-
-        _logger.info("Receive message on consumer 1 :expecting B");
-        msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
-        assertEquals("B", ((TextMessage) msg).getText());
+        _logger.info("Receive message on consumer 1 :expecting C");
+        msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+        assertEquals("C", ((TextMessage) msg).getText());
         _logger.info("Receive message on consumer 1 :expecting null");
         msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertEquals(null, msg);
 
-        // Re-attach a new consumer to the durable subscription, and check that it gets the message that it missed.
+        // Re-attach a new consumer to the durable subscription, and check that it gets message B it left (if not NO_ACK)
+        // and also gets message C sent after it was disconnected.
         AMQConnection con3 = (AMQConnection) getConnection("guest", "guest");
         con3.start();
         Session session3 = con3.createSession(false, ackMode);
 
         TopicSubscriber consumer3 = session3.createDurableSubscriber(topic, "MySubscription");
 
-        _logger.info("Receive message on consumer 3 :expecting B");
-        msg = consumer3.receive(500);
-        assertNotNull("Consumer 3 should get message 'B'.", msg);
-        assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText());
+        if(ackMode == AMQSession.NO_ACKNOWLEDGE)
+        {
+            //Do nothing if NO_ACK was used, as prefetch means the message was dropped
+            //when we didn't call receive() to get it before closing consumer 2
+        }
+        else
+        {
+            _logger.info("Receive message on consumer 3 :expecting B");
+            msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT);
+            assertEquals("B", ((TextMessage) msg).getText());
+        }
+        
+        _logger.info("Receive message on consumer 3 :expecting C");
+        msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT);
+        assertNotNull("Consumer 3 should get message 'C'.", msg);
+        assertEquals("Incorrect Message recevied on consumer3.", "C", ((TextMessage) msg).getText());
         _logger.info("Receive message on consumer 3 :expecting null");
-        msg = consumer3.receive(500);
+        msg = consumer3.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertNull("There should be no more messages for consumption on consumer3.", msg);
 
         consumer1.close();



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org