You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2010/04/29 20:06:35 UTC
svn commit: r939410 -
/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java
Author: rajith
Date: Thu Apr 29 18:06:35 2010
New Revision: 939410
URL: http://svn.apache.org/viewvc?rev=939410&view=rev
Log:
QPID-2471
Added two test cases to verify ordering while using recover with sync and async consumer.
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java?rev=939410&r1=939409&r2=939410&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java Thu Apr 29 18:06:35 2010
@@ -27,6 +27,9 @@ import org.apache.qpid.test.utils.Failov
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -34,6 +37,9 @@ import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.TextMessage;
+
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class RecoverTest extends FailoverBaseCase
@@ -90,6 +96,7 @@ public class RecoverTest extends Failove
while (index != SENT_COUNT)
{
message = _consumer.receive(3000);
+ assertNotNull(message);
assertEquals(index++, message.getIntProperty(INDEX));
}
@@ -142,9 +149,11 @@ public class RecoverTest extends Failove
_consumerSession.recover();
Message message2 = _consumer.receive(3000);
+ assertNotNull(message2);
assertEquals(2, message2.getIntProperty(INDEX));
Message message3 = _consumer.receive(3000);
+ assertNotNull(message3);
assertEquals(3, message3.getIntProperty(INDEX));
_logger.info("Received redelivery of two messages. calling acknolwedgeThis() first of those message");
@@ -155,6 +164,7 @@ public class RecoverTest extends Failove
_consumerSession.recover();
message3 = _consumer.receive(3000);
+ assertNotNull(message3);
assertEquals(3, message3.getIntProperty(INDEX));
((org.apache.qpid.jms.Message) message3).acknowledgeThis();
@@ -194,7 +204,6 @@ public class RecoverTest extends Failove
assertEquals("msg2", tm2.getText());
tm2.acknowledge();
-
consumerSession.recover();
TextMessage tm1 = (TextMessage) consumer.receive(2000);
@@ -303,4 +312,134 @@ public class RecoverTest extends Failove
{
_error = e;
}
+
+ private void sendMessages(javax.jms.Session session,Destination dest,int count) throws Exception
+ {
+ MessageProducer prod = session.createProducer(dest);
+ for (int i=0; i<count; i++)
+ {
+ prod.send(session.createTextMessage("Msg" + i));
+ }
+ prod.close();
+ }
+
+ /**
+ * Test strategy
+ * Send 8 messages to a topic.
+ * The consumer will call recover until it sees a message 5 times,
+ * at which point it will ack that message.
+ * It will continue the above until it acks all the messages.
+ * While doing so it will verify that the messages are not
+ * delivered out of order.
+ */
+ public void testOderingWithSyncConsumer() throws Exception
+ {
+ Connection con = (Connection) getConnection("guest", "guest");
+ javax.jms.Session session = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Destination topic = session.createTopic("myTopic");
+ MessageConsumer cons = session.createConsumer(topic);
+
+ sendMessages(session,topic,8);
+ con.start();
+
+ int messageSeen = 0;
+ int expectedMsg = 0;
+
+ while(expectedMsg < 8)
+ {
+ Message message = cons.receive();
+ String text=((TextMessage) message).getText();
+
+ assertEquals("Received Message Out Of Order","Msg"+expectedMsg,text);
+
+ //don't ack the message until we receive it 5 times
+ if( messageSeen < 5 )
+ {
+ _logger.debug("Ignoring message " + text + " and calling recover");
+ session.recover();
+ messageSeen++;
+ }
+ else
+ {
+ messageSeen = 0;
+ expectedMsg++;
+ message.acknowledge();
+ _logger.debug("Acknowledging message " + text);
+ }
+ }
+ }
+
+ /**
+ * Test strategy
+ * Same as testOderingWithSyncConsumer but using a
+ * Message Listener instead of a sync receive().
+ */
+ public void testOderingWithAsyncConsumer() throws Exception
+ {
+ Connection con = (Connection) getConnection("guest", "guest");
+ final javax.jms.Session session = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Destination topic = session.createTopic("myTopic");
+ MessageConsumer cons = session.createConsumer(topic);
+
+ sendMessages(session,topic,8);
+ con.start();
+
+ final Object lock = new Object();
+ final AtomicBoolean pass = new AtomicBoolean(false); //used as work around for 'final'
+ cons.setMessageListener(new MessageListener()
+ {
+ int messageSeen = 0;
+ int expectedMsg = 0;
+
+ public void onMessage(Message message)
+ {
+ try
+ {
+ String text = ((TextMessage) message).getText();
+ assertEquals("Received Message Out Of Order","Msg"+expectedMsg,text);
+
+ //don't ack the message until we receive it 5 times
+ if( messageSeen < 5 )
+ {
+ _logger.debug("Ignoring message " + text + " and calling recover");
+ session.recover();
+ messageSeen++;
+ }
+ else
+ {
+ messageSeen = 0;
+ expectedMsg++;
+ message.acknowledge();
+ _logger.debug("Acknowledging message " + text);
+ if (expectedMsg == 8)
+ {
+ pass.set(true);
+ synchronized (lock)
+ {
+ lock.notifyAll();
+ }
+ }
+ }
+ }
+ catch (JMSException e)
+ {
+ fail("Exception : " + e.getMessage());
+ synchronized (lock)
+ {
+ lock.notifyAll();
+ }
+ }
+ }
+ });
+
+ synchronized(lock)
+ {
+ lock.wait(5000);
+ }
+
+ if (!pass.get())
+ {
+ fail("Test did not complete on time. Please check the logs");
+ }
+ }
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org