You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2010/04/29 20:06:35 UTC

svn commit: r939410 - /qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java

Author: rajith
Date: Thu Apr 29 18:06:35 2010
New Revision: 939410

URL: http://svn.apache.org/viewvc?rev=939410&view=rev
Log:
QPID-2471
Added two test cases to verify ordering while using recover with sync and async consumer.

Modified:
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java?rev=939410&r1=939409&r2=939410&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java Thu Apr 29 18:06:35 2010
@@ -27,6 +27,9 @@ import org.apache.qpid.test.utils.Failov
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -34,6 +37,9 @@ import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.TextMessage;
+
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class RecoverTest extends FailoverBaseCase
@@ -90,6 +96,7 @@ public class RecoverTest extends Failove
         while (index != SENT_COUNT)
         {
             message =  _consumer.receive(3000);
+            assertNotNull(message);
             assertEquals(index++, message.getIntProperty(INDEX));
         }
 
@@ -142,9 +149,11 @@ public class RecoverTest extends Failove
         _consumerSession.recover();
 
         Message message2 = _consumer.receive(3000);
+        assertNotNull(message2);
         assertEquals(2, message2.getIntProperty(INDEX));
 
         Message message3 = _consumer.receive(3000);
+        assertNotNull(message3);
         assertEquals(3, message3.getIntProperty(INDEX));
 
         _logger.info("Received redelivery of two messages. calling acknolwedgeThis() first of those message");
@@ -155,6 +164,7 @@ public class RecoverTest extends Failove
         _consumerSession.recover();
 
         message3 = _consumer.receive(3000);
+        assertNotNull(message3);
         assertEquals(3, message3.getIntProperty(INDEX));
         ((org.apache.qpid.jms.Message) message3).acknowledgeThis();
 
@@ -194,7 +204,6 @@ public class RecoverTest extends Failove
         assertEquals("msg2", tm2.getText());
 
         tm2.acknowledge();
-
         consumerSession.recover();
 
         TextMessage tm1 = (TextMessage) consumer.receive(2000);
@@ -303,4 +312,134 @@ public class RecoverTest extends Failove
     {
         _error = e;
     }
+    
+    private void sendMessages(javax.jms.Session session,Destination dest,int count) throws Exception
+    {
+        MessageProducer prod = session.createProducer(dest);
+        for (int i=0; i<count; i++)
+        {
+             prod.send(session.createTextMessage("Msg" + i));
+        }
+        prod.close();
+    }
+    
+    /**
+     * Test strategy
+     * Send 8 messages to a topic.
+     * The consumer will call recover until it sees a message 5 times,
+     * at which point it will ack that message.
+     * It will continue the above until it acks all the messages.
+     * While doing so it will verify that the messages are not 
+     * delivered out of order.
+     */
+    public void testOderingWithSyncConsumer() throws Exception
+    {
+        Connection con = (Connection) getConnection("guest", "guest");
+        javax.jms.Session session = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Destination topic = session.createTopic("myTopic");
+        MessageConsumer cons = session.createConsumer(topic);
+        
+        sendMessages(session,topic,8);
+        con.start();
+        
+        int messageSeen = 0;
+        int expectedMsg = 0;
+        
+        while(expectedMsg < 8)
+        {
+            Message message = cons.receive();            
+            String text=((TextMessage) message).getText();            
+            
+            assertEquals("Received Message Out Of Order","Msg"+expectedMsg,text);
+                        
+            //don't ack the message until we receive it 5 times
+            if( messageSeen < 5 ) 
+            {
+                _logger.debug("Ignoring message " + text + " and calling recover"); 
+                session.recover();
+                messageSeen++;
+            }
+            else
+            {
+                messageSeen = 0;
+                expectedMsg++;
+                message.acknowledge();
+                _logger.debug("Acknowledging message " + text);    
+            }
+        }        
+    }
+    
+    /**
+     * Test strategy
+     * Same as testOderingWithSyncConsumer but using a 
+     * Message Listener instead of a sync receive().
+     */
+    public void testOderingWithAsyncConsumer() throws Exception
+    {
+        Connection con = (Connection) getConnection("guest", "guest");
+        final javax.jms.Session session = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Destination topic = session.createTopic("myTopic");
+        MessageConsumer cons = session.createConsumer(topic);
+        
+        sendMessages(session,topic,8);
+        con.start();
+        
+        final Object lock = new Object();
+        final AtomicBoolean pass = new AtomicBoolean(false); //used as work around for 'final'
+        cons.setMessageListener(new MessageListener()
+        {               
+            int messageSeen = 0;
+            int expectedMsg = 0;
+            
+            public void onMessage(Message message)
+            {
+                try
+                {
+                    String text = ((TextMessage) message).getText();
+                    assertEquals("Received Message Out Of Order","Msg"+expectedMsg,text);
+                                
+                    //don't ack the message until we receive it 5 times
+                    if( messageSeen < 5 ) 
+                    {
+                        _logger.debug("Ignoring message " + text + " and calling recover"); 
+                        session.recover();
+                        messageSeen++;
+                    }
+                    else
+                    {
+                        messageSeen = 0;
+                        expectedMsg++;
+                        message.acknowledge();
+                        _logger.debug("Acknowledging message " + text);
+                        if (expectedMsg == 8)
+                        {
+                            pass.set(true);
+                            synchronized (lock) 
+                            {
+                                lock.notifyAll();
+                            }      
+                        }
+                    }                    
+                } 
+                catch (JMSException e)
+                {
+                    fail("Exception : " + e.getMessage());
+                    synchronized (lock) 
+                    {
+                        lock.notifyAll();
+                    }  
+                }
+            }
+        });
+        
+        synchronized(lock)
+        {
+            lock.wait(5000);
+        }
+        
+        if (!pass.get())
+        {
+            fail("Test did not complete on time. Please check the logs");
+        }
+    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org