You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/10/05 17:04:58 UTC

svn commit: r821826 - in /qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack: AcknowledgeAfterFailoverOnMessageTest.java AcknowledgeAfterFailoverTest.java AcknowledgeOnMessageTest.java

Author: ritchiem
Date: Mon Oct  5 15:04:57 2009
New Revision: 821826

URL: http://svn.apache.org/viewvc?rev=821826&view=rev
Log:
QPID-1816 : Add testing for ack after failover, and testing for acknowledgement in a dirty session after failover.

Modified:
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java?rev=821826&r1=821825&r2=821826&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java Mon Oct  5 15:04:57 2009
@@ -20,17 +20,26 @@
  */
 package org.apache.qpid.test.unit.ack;
 
-import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.jms.ConnectionListener;
 
 import javax.jms.Connection;
-import javax.jms.Session;
-import javax.jms.Message;
 import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TransactionRolledBackException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
-public class AcknowledgeAfterFailoverOnMessageTest  extends AcknowledgeOnMessageTest
+public class AcknowledgeAfterFailoverOnMessageTest extends AcknowledgeOnMessageTest implements ConnectionListener
 {
 
+    protected CountDownLatch _failoverCompleted = new CountDownLatch(1);
+    private MessageListener _listener = null;
+
     @Override
     public void setUp() throws Exception
     {
@@ -38,6 +47,27 @@
         NUM_MESSAGES = 10;
     }
 
+    /**
+     * Override default init to add connectionListener so we can verify that
+     * failover took place
+     *
+     * @param transacted create a transacted session for this test
+     * @param mode       if not transacted what ack mode to use for this test
+     *
+     * @throws Exception if a problem occured during test setup.
+     */
+    @Override
+    public void init(boolean transacted, int mode) throws Exception
+    {
+        super.init(transacted, mode);
+        ((AMQConnection) _connection).setConnectionListener(this);
+        // Override the listener for the dirtyAck testing.
+        if (_listener != null)
+        {
+            _consumer.setMessageListener(_listener);
+        }
+    }
+
     protected void prepBroker(int count) throws Exception
     {
         //Stop the connection whilst we repopulate the broker, or the no_ack
@@ -107,4 +137,220 @@
         }
 
     }
+
+    int msgCount = 0;
+    boolean cleaned = false;
+
+    class DirtyAckingHandler implements MessageListener
+    {
+        /**
+         * Validate first message but do nothing with it.
+         *
+         * Failover
+         *
+         * The receive the message again
+         *
+         * @param message
+         */
+        public void onMessage(Message message)
+        {
+            // Stop processing if we have an error and had to stop running.
+            if (_receviedAll.getCount() == 0)
+            {
+                _logger.debug("Dumping msgs due to error(" + _causeOfFailure.get().getMessage() + "):" + message);
+                return;
+            }
+
+            try
+            {
+                // Check we have the next message as expected
+                assertNotNull("Message " + msgCount + " not correctly received.", message);
+                assertEquals("Incorrect message received", msgCount, message.getIntProperty(INDEX));
+
+                if (msgCount == 0 && _failoverCompleted.getCount() != 0)
+                {
+                    // This is the first message we've received so lets fail the broker
+
+                    failBroker(getFailingPort());
+
+                    repopulateBroker();
+
+                    _logger.error("Received first msg so failing over");
+
+                    return;
+                }
+
+                msgCount++;
+
+                // Don't acknowlege the first message after failover so we can commit
+                // them together
+                if (msgCount == 1)
+                {
+                    _logger.error("Received first msg after failover ignoring:" + msgCount);
+
+                    // Acknowledge the first message if we are now on the cleaned pass
+                    if (cleaned)
+                    {
+                        _receviedAll.countDown();
+                    }
+
+                    return;
+                }
+
+                if (_consumerSession.getTransacted())
+                {
+                    try
+                    {
+                        _consumerSession.commit();
+                        if (!cleaned)
+                        {
+                            fail("Session is dirty we should get an TransactionRolledBackException");
+                        }
+                    }
+                    catch (TransactionRolledBackException trbe)
+                    {
+                        //expected path
+                    }
+                }
+                else
+                {
+                    try
+                    {
+                        message.acknowledge();
+                        if (!cleaned)
+                        {
+                            fail("Session is dirty we should get an IllegalStateException");
+                        }
+                    }
+                    catch (javax.jms.IllegalStateException ise)
+                    {
+                        assertEquals("Incorrect Exception thrown", "has failed over", ise.getMessage());
+                        // Recover the sesion and try again.
+                        _consumerSession.recover();
+                    }
+                }
+
+                // Acknowledge the last message if we are in a clean state
+                // this will then trigger test teardown.
+                if (cleaned)
+                {
+                    _receviedAll.countDown();
+                }
+
+                //Reset message count so we can try again.
+                msgCount = 0;
+                cleaned = true;
+            }
+            catch (Exception e)
+            {
+                // If something goes wrong stop and notifiy main thread.
+                fail(e);
+            }
+        }
+    }
+
+    /**
+     * Test that Acking/Committing a message received before failover causes
+     * an exception at commit/ack time.
+     *
+     * Expected behaviour is that in:
+     * * tx mode commit() throws a transacted RolledBackException
+     * * client ack mode throws an IllegalStateException
+     *
+     * @param transacted is this session trasacted
+     * @param mode       What ack mode should be used if not trasacted
+     *
+     * @throws Exception if something goes wrong.
+     */
+    protected void testDirtyAcking(boolean transacted, int mode) throws Exception
+    {
+        NUM_MESSAGES = 2;
+        _listener = new DirtyAckingHandler();
+
+        super.testAcking(transacted, mode);
+    }
+
+    public void testDirtyClientAck() throws Exception
+    {
+        testDirtyAcking(false, Session.CLIENT_ACKNOWLEDGE);
+    }
+
+    public void testDirtyAckingTransacted() throws Exception
+    {
+        testDirtyAcking(true, Session.SESSION_TRANSACTED);
+    }
+
+    private void repopulateBroker() throws Exception
+    {
+        // Repopulate this new broker so we can test what happends after failover
+
+        //Get the connection to the first (main port) broker.
+        Connection connection = getConnection();
+        // Use a transaction to send messages so we can be sure they arrive.
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        // ensure destination is created.
+        session.createConsumer(_queue).close();
+
+        sendMessage(session, _queue, NUM_MESSAGES);
+
+        assertEquals("Wrong number of messages on queue", NUM_MESSAGES,
+                     ((AMQSession) session).getQueueDepth((AMQDestination) _queue));
+
+        connection.close();
+    }
+
+    // AMQConnectionListener Interface.. used so we can validate that we
+    // actually failed over.
+
+    public void bytesSent(long count)
+    {
+    }
+
+    public void bytesReceived(long count)
+    {
+    }
+
+    public boolean preFailover(boolean redirect)
+    {
+        //Allow failover
+        return true;
+    }
+
+    public boolean preResubscribe()
+    {
+        //Allow failover
+        return true;
+    }
+
+    public void failoverComplete()
+    {
+        _failoverCompleted.countDown();
+    }
+
+    /**
+     * Override so we can block until failover has completd
+     *
+     * @param port
+     */
+    @Override
+    public void failBroker(int port)
+    {
+        super.failBroker(port);
+
+        try
+        {
+            if (!_failoverCompleted.await(DEFAULT_FAILOVER_TIME, TimeUnit.MILLISECONDS))
+            {
+                // Use an exception so that we use our local fail() that notifies the main thread of failure
+                throw new Exception("Failover did not occur in specified time:" + DEFAULT_FAILOVER_TIME);
+            }
+
+        }
+        catch (Exception e)
+        {
+            // Use an exception so that we use our local fail() that notifies the main thread of failure
+            fail(e);
+        }
+    }
+
 }

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java?rev=821826&r1=821825&r2=821826&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java Mon Oct  5 15:04:57 2009
@@ -54,6 +54,14 @@
         NUM_MESSAGES = 10;
     }
 
+    /**
+     * Override default init to add connectionListener so we can verify that
+     * failover took place
+     *
+     * @param transacted create a transacted session for this test
+     * @param mode if not transacted what ack mode to use for this test
+     * @throws Exception if a problem occured during test setup.
+     */
     @Override
     protected void init(boolean transacted, int mode) throws Exception
     {
@@ -244,75 +252,6 @@
         testDirtyAcking(true, Session.SESSION_TRANSACTED);
     }
 
-    /**
-     * If a transacted session has failed over whilst it has uncommitted sent
-     * data then we need to throw a TransactedRolledbackException on commit()
-     *
-     * The alternative would be to maintain a replay buffer so that the message
-     * could be resent. This is not currently implemented
-     *
-     * @throws Exception if something goes wrong.
-     */
-    public void testDirtySendingTransacted() throws Exception
-    {
-        Session producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED);
-
-        // Ensure we get failover notifications
-        ((AMQConnection) _connection).setConnectionListener(this);        
-
-        MessageProducer producer = producerSession.createProducer(_queue);
-
-        // Create and send message 0
-        Message msg = producerSession.createMessage();
-        msg.setIntProperty(INDEX, 0);
-        producer.send(msg);
-
-        // DON'T commit message .. fail connection
-
-        failBroker(getFailingPort());
-
-        // Ensure destination exists for sending
-        producerSession.createConsumer(_queue).close();
-
-        // Send the next message
-        msg.setIntProperty(INDEX, 1);
-        try
-        {
-            producer.send(msg);
-            fail("Should fail with Qpid as we provide early warning of the dirty session via a JMSException.");
-        }
-        catch (JMSException jmse)
-        {
-            assertEquals("Early warning of dirty session not correct",
-                         "Failover has occurred and session is dirty so unable to send.", jmse.getMessage());
-        }
-
-        // Ignore that the session is dirty and attempt to commit to validate the
-        // exception is thrown. AND that the above failure notification did NOT
-        // clean up the session.
-
-        try
-        {
-            producerSession.commit();
-            fail("Session is dirty we should get an TransactionRolledBackException");
-        }
-        catch (TransactionRolledBackException trbe)
-        {
-            // Normal path.
-        }
-
-        // Resend messages
-        msg.setIntProperty(INDEX, 0);
-        producer.send(msg);
-        msg.setIntProperty(INDEX, 1);
-        producer.send(msg);
-
-        producerSession.commit();
-
-        assertEquals("Wrong number of messages on queue", 2,
-                     ((AMQSession) producerSession).getQueueDepth((AMQDestination) _queue));
-    }
-
     // AMQConnectionListener Interface.. used so we can validate that we
     // actually failed over.
 

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java?rev=821826&r1=821825&r2=821826&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java Mon Oct  5 15:04:57 2009
@@ -32,14 +32,22 @@
 
 public class AcknowledgeOnMessageTest extends AcknowledgeTest implements MessageListener
 {
-    private CountDownLatch _receviedAll;
-    private AtomicReference<Exception> _causeOfFailure = new AtomicReference<Exception>(null);
+    protected CountDownLatch _receviedAll;
+    protected AtomicReference<Exception> _causeOfFailure = new AtomicReference<Exception>(null);
 
     @Override
     public void setUp() throws Exception
     {
         super.setUp();
+    }
+
+    @Override
+    public void init(boolean transacted, int mode) throws Exception
+    {
         _receviedAll = new CountDownLatch(NUM_MESSAGES);
+
+        super.init(transacted, mode);
+        _consumer.setMessageListener(this);
     }
 
     /**
@@ -51,13 +59,22 @@
     protected void testAcking(boolean transacted, int mode) throws Exception
     {
         init(transacted, mode);
-        _consumer.setMessageListener(this);
 
         _connection.start();
 
         if (!_receviedAll.await(10000L, TimeUnit.MILLISECONDS))
         {
-            fail("All messages not received.");
+            // Check to see if we ended due to an exception in the onMessage handler
+            Exception cause = _causeOfFailure.get();
+            if (cause != null)
+            {
+                cause.printStackTrace();
+                fail(cause.getMessage());
+            }
+            else
+            {
+                fail("All messages not received:" + _receviedAll.getCount() + "/" + NUM_MESSAGES);
+            }
         }
 
         // Check to see if we ended due to an exception in the onMessage handler
@@ -102,11 +119,12 @@
 
     /**
      * Pass the given exception back to the waiting thread to fail the test run.
-     * @param e The exception that is causing the test to fail.  
+     *
+     * @param e The exception that is causing the test to fail.
      */
     protected void fail(Exception e)
     {
-       _causeOfFailure.set(e);
+        _causeOfFailure.set(e);
         // End the test.
         while (_receviedAll.getCount() != 0)
         {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org