You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2008/07/31 15:38:01 UTC
svn commit: r681367 -
/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
Author: arnaudsimon
Date: Thu Jul 31 06:38:00 2008
New Revision: 681367
URL: http://svn.apache.org/viewvc?rev=681367&view=rev
Log:
qpid-1163: Added test for qpid-1163 (Note: I have checked that this test did not pass before r673074)
Modified:
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java?rev=681367&r1=681366&r2=681367&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java Thu Jul 31 06:38:00 2008
@@ -25,13 +25,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
+import javax.jms.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
/**
* This class tests a number of commits and roll back scenarios
@@ -503,4 +499,74 @@
_session.commit();
}
+ /**
+ * Qpid-1163
+ * Check that when commt is called inside onMessage then
+ * the last message is nor redelivered.
+ *
+ * @throws Exception
+ */
+ public void testCommitWhithinOnMessage() throws Exception
+ {
+ Queue queue = (Queue) getInitialContext().lookup("queue");
+ // create a consumer
+ MessageConsumer cons = _session.createConsumer(queue);
+ MessageProducer prod = _session.createProducer(queue);
+ Message message = _session.createTextMessage("Message");
+ message.setJMSCorrelationID("m1");
+ prod.send(message);
+ _session.commit();
+ _logger.info("Sent message to queue");
+ CountDownLatch cd = new CountDownLatch(1);
+ cons.setMessageListener(new CommitWhithinOnMessageListener(cd));
+ conn.start();
+ cd.await(30, TimeUnit.SECONDS);
+ if( cd.getCount() > 0 )
+ {
+ fail("Did not received message");
+ }
+ // Check that the message has been dequeued
+ _session.close();
+ conn.close();
+ conn = (AMQConnection) getConnection("guest", "guest");
+ conn.start();
+ Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ cons = session.createConsumer(queue);
+ message = cons.receiveNoWait();
+ if(message != null)
+ {
+ if(message.getJMSCorrelationID().equals("m1"))
+ {
+ fail("received message twice");
+ }
+ else
+ {
+ fail("queue should have been empty, received message: " + message);
+ }
+ }
+ }
+
+ private class CommitWhithinOnMessageListener implements MessageListener
+ {
+ private CountDownLatch _cd;
+ private CommitWhithinOnMessageListener(CountDownLatch cd)
+ {
+ _cd = cd;
+ }
+ public void onMessage(Message message)
+ {
+ try
+ {
+ _logger.info("received message " + message);
+ assertEquals("Wrong message received", message.getJMSCorrelationID(), "m1");
+ _logger.info("commit session");
+ _session.commit();
+ _cd.countDown();
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
}