You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2008/07/31 15:38:01 UTC

svn commit: r681367 - /incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java

Author: arnaudsimon
Date: Thu Jul 31 06:38:00 2008
New Revision: 681367

URL: http://svn.apache.org/viewvc?rev=681367&view=rev
Log:
qpid-1163: Added test for qpid-1163 (Note: I have checked that this test did not pass before r673074)

Modified:
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java?rev=681367&r1=681366&r2=681367&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java Thu Jul 31 06:38:00 2008
@@ -25,13 +25,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
+import javax.jms.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 /**
  * This class tests a number of commits and roll back scenarios
@@ -503,4 +499,74 @@
         _session.commit();
     }
 
+    /**
+     * Qpid-1163
+     * Check that when commt is called inside onMessage then
+     * the last message is nor redelivered. 
+     *
+     * @throws Exception
+     */
+    public void testCommitWhithinOnMessage() throws Exception
+    {
+        Queue queue = (Queue) getInitialContext().lookup("queue");
+        // create a consumer
+             MessageConsumer cons = _session.createConsumer(queue);           
+        MessageProducer prod = _session.createProducer(queue);
+        Message message =  _session.createTextMessage("Message");
+        message.setJMSCorrelationID("m1");
+        prod.send(message);
+        _session.commit();
+        _logger.info("Sent message to queue");
+        CountDownLatch cd = new CountDownLatch(1);
+        cons.setMessageListener(new CommitWhithinOnMessageListener(cd));
+        conn.start();
+        cd.await(30, TimeUnit.SECONDS);
+        if( cd.getCount() > 0 )
+        {
+           fail("Did not received message");
+        }
+        // Check that the message has been dequeued
+        _session.close();
+        conn.close();
+        conn = (AMQConnection) getConnection("guest", "guest");
+        conn.start();
+        Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        cons = session.createConsumer(queue);        
+        message = cons.receiveNoWait();
+        if(message != null)
+        {
+            if(message.getJMSCorrelationID().equals("m1"))
+            {
+                fail("received message twice");
+            }
+            else
+            {
+                fail("queue should have been empty, received message: " + message);
+            }
+        }
+    }
+
+    private class CommitWhithinOnMessageListener implements MessageListener
+    {
+        private CountDownLatch _cd;
+        private CommitWhithinOnMessageListener(CountDownLatch cd)
+        {
+            _cd = cd;
+        }
+        public void onMessage(Message message)
+        {
+            try
+            {
+                _logger.info("received message " + message);
+                assertEquals("Wrong message received", message.getJMSCorrelationID(), "m1");
+                _logger.info("commit session");
+                _session.commit();
+                _cd.countDown();
+            }
+            catch (JMSException e)
+            {
+                e.printStackTrace();
+            }
+        }
+    }
 }