You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2010/02/24 17:56:16 UTC
svn commit: r915866 -
/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
Author: robbie
Date: Wed Feb 24 16:56:16 2010
New Revision: 915866
URL: http://svn.apache.org/viewvc?rev=915866&view=rev
Log:
QPID-2417: update the auto-ack tests to leave an unacked message on the durable subscriptions queue between disconnect and reconnect, create consumer1 on the correct connection in SessionPerConnection tests, add single connection NO_ACK test.
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=915866&r1=915865&r2=915866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java Wed Feb 24 16:56:16 2010
@@ -115,6 +115,11 @@
_logger.info("Close connection");
con.close();
}
+
+ public void testDurabilityNOACK() throws Exception
+ {
+ durabilityImpl(AMQSession.NO_ACKNOWLEDGE);
+ }
public void testDurabilityAUTOACK() throws Exception
{
@@ -138,49 +143,78 @@
Session session1 = con.createSession(false, ackMode);
MessageConsumer consumer1 = session1.createConsumer(topic);
- Session sessionProd = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ Session sessionProd = con.createSession(false, ackMode);
MessageProducer producer = sessionProd.createProducer(topic);
- Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ Session session2 = con.createSession(false, ackMode);
TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
con.start();
+ //send message A and check both consumers receive
producer.send(session1.createTextMessage("A"));
Message msg;
+ _logger.info("Receive message on consumer 1 :expecting A");
msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull("Message should have been received",msg);
assertEquals("A", ((TextMessage) msg).getText());
msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
+ _logger.info("Receive message on consumer 2 :expecting A");
msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull("Message should have been received",msg);
assertEquals("A", ((TextMessage) msg).getText());
msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
- consumer2.close();
- session2.close();
-
+ //send message B, receive with consumer 1, and disconnect consumer 2 to leave the message behind (if not NO_ACK)
producer.send(session1.createTextMessage("B"));
_logger.info("Receive message on consumer 1 :expecting B");
msg = consumer1.receive(500);
assertNotNull("Consumer 1 should get message 'B'.", msg);
- assertEquals("Incorrect Message recevied on consumer1.", "B", ((TextMessage) msg).getText());
+ assertEquals("Incorrect Message received on consumer1.", "B", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
msg = consumer1.receive(500);
assertNull("There should be no more messages for consumption on consumer1.", msg);
+ consumer2.close();
+ session2.close();
+
+ //Send message C, then connect consumer 3 to durable subscription and get
+ //message B if not using NO_ACK, then receive C with consumer 1 and 3
+ producer.send(session1.createTextMessage("C"));
+
Session session3 = con.createSession(false, ackMode);
MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "MySubscription");
- _logger.info("Receive message on consumer 3 :expecting B");
+ if(ackMode == AMQSession.NO_ACKNOWLEDGE)
+ {
+ //Do nothing if NO_ACK was used, as prefetch means the message was dropped
+ //when we didn't call receive() to get it before closing consumer 2
+ }
+ else
+ {
+ _logger.info("Receive message on consumer 3 :expecting B");
+ msg = consumer3.receive(500);
+ assertNotNull("Consumer 3 should get message 'B'.", msg);
+ assertEquals("Incorrect Message received on consumer3.", "B", ((TextMessage) msg).getText());
+ }
+
+ _logger.info("Receive message on consumer 1 :expecting C");
+ msg = consumer1.receive(500);
+ assertNotNull("Consumer 1 should get message 'C'.", msg);
+ assertEquals("Incorrect Message received on consumer1.", "C", ((TextMessage) msg).getText());
+ _logger.info("Receive message on consumer 1 :expecting null");
+ msg = consumer1.receive(500);
+ assertNull("There should be no more messages for consumption on consumer1.", msg);
+
+ _logger.info("Receive message on consumer 3 :expecting C");
msg = consumer3.receive(500);
- assertNotNull("Consumer 3 should get message 'B'.", msg);
- assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText());
+ assertNotNull("Consumer 3 should get message 'C'.", msg);
+ assertEquals("Incorrect Message received on consumer3.", "C", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 3 :expecting null");
msg = consumer3.receive(500);
assertNull("There should be no more messages for consumption on consumer3.", msg);
@@ -211,7 +245,7 @@
con1.start();
Session session1 = con1.createSession(false, ackMode);
- MessageConsumer consumer1 = session0.createConsumer(topic);
+ MessageConsumer consumer1 = session1.createConsumer(topic);
// Create consumer 2.
AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
@@ -232,37 +266,60 @@
msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull("Message should have been received",msg);
assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText());
- msg = consumer2.receive(500);
+ msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertNull("There should be no more messages for consumption on consumer2.", msg);
+ // Send message and receive on consumer 1.
+ producer.send(session0.createTextMessage("B"));
+
+ _logger.info("Receive message on consumer 1 :expecting B");
+ msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertEquals("B", ((TextMessage) msg).getText());
+ _logger.info("Receive message on consumer 1 :expecting null");
+ msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
+ assertEquals(null, msg);
+
// Detach the durable subscriber.
consumer2.close();
session2.close();
con2.close();
+
+ // Send message C and receive on consumer 1
+ producer.send(session0.createTextMessage("C"));
- // Send message and receive on open consumer.
- producer.send(session0.createTextMessage("B"));
-
- _logger.info("Receive message on consumer 1 :expecting B");
- msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
- assertEquals("B", ((TextMessage) msg).getText());
+ _logger.info("Receive message on consumer 1 :expecting C");
+ msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertEquals("C", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
- // Re-attach a new consumer to the durable subscription, and check that it gets the message that it missed.
+ // Re-attach a new consumer to the durable subscription, and check that it gets message B it left (if not NO_ACK)
+ // and also gets message C sent after it was disconnected.
AMQConnection con3 = (AMQConnection) getConnection("guest", "guest");
con3.start();
Session session3 = con3.createSession(false, ackMode);
TopicSubscriber consumer3 = session3.createDurableSubscriber(topic, "MySubscription");
- _logger.info("Receive message on consumer 3 :expecting B");
- msg = consumer3.receive(500);
- assertNotNull("Consumer 3 should get message 'B'.", msg);
- assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText());
+ if(ackMode == AMQSession.NO_ACKNOWLEDGE)
+ {
+ //Do nothing if NO_ACK was used, as prefetch means the message was dropped
+ //when we didn't call receive() to get it before closing consumer 2
+ }
+ else
+ {
+ _logger.info("Receive message on consumer 3 :expecting B");
+ msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertEquals("B", ((TextMessage) msg).getText());
+ }
+
+ _logger.info("Receive message on consumer 3 :expecting C");
+ msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertNotNull("Consumer 3 should get message 'C'.", msg);
+ assertEquals("Incorrect Message recevied on consumer3.", "C", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 3 :expecting null");
- msg = consumer3.receive(500);
+ msg = consumer3.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertNull("There should be no more messages for consumption on consumer3.", msg);
consumer1.close();
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org