You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2010/02/05 18:04:13 UTC

svn commit: r907004 - in /qpid/branches/0.5.x-dev/qpid/java: client/src/main/java/org/apache/qpid/client/ systests/src/main/java/org/apache/qpid/test/unit/ack/

Author: ritchiem
Date: Fri Feb  5 17:04:13 2010
New Revision: 907004

URL: http://svn.apache.org/viewvc?rev=907004&view=rev
Log:
QPID-2346 : Addressed the problems with AcknowledgeAfterFailoverOnMessageTest, The issues were the same as AckAfterFailoverTest. So used same prepBroker approach. Test also need timeout increased as broke restarts took to long for a message to be sent/received. Finally the last change was thatthe queue needed to be re-declared so that the final queue depth check would have a queue to query.

Modified:
    qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java
    qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
    qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java

Modified: qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=907004&r1=907003&r2=907004&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Feb  5 17:04:13 2010
@@ -504,6 +504,12 @@
         {
             _queue = new FlowControllingBlockingQueue(_prefetchHighMark, null);
         }
+        
+        // Add creation logging to tie in with the existing close logging
+        if (_logger.isInfoEnabled())
+        {
+            _logger.info("Created session:" + this);
+        }
     }
 
     /**

Modified: qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java?rev=907004&r1=907003&r2=907004&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java Fri Feb  5 17:04:13 2010
@@ -24,6 +24,7 @@
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.util.FileUtils;
 
 import javax.jms.Connection;
 import javax.jms.JMSException;
@@ -33,7 +34,25 @@
 import javax.jms.TransactionRolledBackException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.io.File;
 
+/**
+ * The AcknowlegeAfterFailoverOnMessageTests
+ *
+ * Extends the OnMessage AcknowledgeTests to validate that after the client has
+ * failed over that the client can still receive and ack messages.
+ *
+ * All the AcknowledgeTest ack modes are exercised here though some are disabled
+ * due to know issues (e.g. DupsOk, AutoAck : QPID-143 and the clientAck
+ * and dirtyClientAck due to QPID-1816)
+ *
+ * This class has two main test structures, overrides of AcknowledgeOnMessageTest
+ * to perform the clean acking based on session ack mode and a series of dirty
+ * ack tests that test what happends if you receive a message then try and ack
+ * AFTER you have failed over.
+ *
+ *
+ */
 public class AcknowledgeAfterFailoverOnMessageTest extends AcknowledgeOnMessageTest implements ConnectionListener
 {
 
@@ -68,61 +87,96 @@
         }
     }
 
-    protected void prepBroker(int count) throws Exception
-    {
-        //Stop the connection whilst we repopulate the broker, or the no_ack
-        // test will drain the msgs before we can check we put the right number
-        // back on again.
-//        _connection.stop();
-
-        Connection connection = getConnection();
-        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-        // ensure destination is created.
-        session.createConsumer(_queue).close();
-
-        sendMessage(session, _queue, count, NUM_MESSAGES - count, 0);
-
-        if (_consumerSession.getAcknowledgeMode() != AMQSession.NO_ACKNOWLEDGE)
-        {
-            assertEquals("Wrong number of messages on queue", count,
-                         ((AMQSession) session).getQueueDepth((AMQDestination) _queue));
-        }
-
-        connection.close();
-
-//        _connection.start();
-    }
-
-    @Override
-    public void doAcknowlegement(Message msg) throws JMSException
+    /**
+     * Prepare the broker for the next round.
+     *
+     * Called after acknowledging the messsage this method shuts the current
+     * broker down connnects to the new broker and send a new message for the
+     * client to failover to and receive.
+     *
+     * It ends by restarting the orignal broker so that the cycle can repeat.
+     *
+     * When we are able to cluster the java broker then will not need to do the
+     * message repopulation or QPID_WORK clearing. All that we will need to do
+     * is send the initial NUM_MESSAGES during startup and then bring the
+     * brokers down at the right time to cause the client to fail between them.
+     *
+     * @param index
+     * @throws Exception
+     */
+    protected void prepBroker(int index) throws Exception
     {
-        //Acknowledge current message
-        super.doAcknowlegement(msg);
+        // Alternate killing the broker based on the message index we are at.
 
-        int msgCount = msg.getIntProperty(INDEX);
-
-        if (msgCount % 2 == 0)
+        if (index % 2 == 0)
         {
             failBroker(getFailingPort());
+            // Clean up the failed broker
+            FileUtils.delete(new File(System.getProperty("QPID_WORK") + "/" + getFailingPort()), true);
         }
         else
         {
             failBroker(getPort());
+            // Clean up the failed broker
+            FileUtils.delete(new File(System.getProperty("QPID_WORK") + "/" + getPort()), true);
         }
 
+        _failoverCompleted = new CountDownLatch(1);
+
+        _logger.info("AAFOMT: prepNewBroker for message send");
+        Connection connection = getConnection();
+
         try
         {
-            prepBroker(NUM_MESSAGES - msgCount - 1);
+
+            //Stop the connection whilst we repopulate the broker, or the no_ack
+            // test will drain the msgs before we can check we put the right number
+            // back on again.
+
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            // ensure destination is created.
+            session.createConsumer(_queue).close();
+
+
+            // If this is the last message then we can skip the send.
+            // But we MUST ensure that we have created the queue with the
+            // above createConsumer(_queue).close() as the test will end by
+            // testing the queue depth which will fail if we don't ensure we
+            // declare the queue.
+            // index is 0 based so we need to check +1 against NUM_MESSAGES
+            if ((index + 1) == NUM_MESSAGES)
+            {
+                return;
+            }
+
+
+            sendMessage(session, _queue, 1, index + 1, 0);
+
+            // Validate that we have the message on the queue
+            // In NoAck mode though the messasge may already have been sent to
+            // the client so we have to skip the vaildation.
+            if (_consumerSession.getAcknowledgeMode() != AMQSession.NO_ACKNOWLEDGE)
+            {
+                assertEquals("Wrong number of messages on queue", 1,
+                             ((AMQSession) session).getQueueDepth((AMQDestination) _queue));
+            }
+
+
         }
         catch (Exception e)
         {
             fail("Unable to prep new broker," + e.getMessage());
         }
+        finally
+        {
+            connection.close();
+        }
 
         try
         {
 
-            if (msgCount % 2 == 0)
+            //Restart the broker
+            if (index % 2 == 0)
             {
                 startBroker(getFailingPort());
             }
@@ -138,8 +192,27 @@
 
     }
 
-    int msgCount = 0;
-    boolean cleaned = false;
+    @Override
+    public void doAcknowlegement(Message msg) throws JMSException
+    {
+        //Acknowledge current message
+        super.doAcknowlegement(msg);
+
+        try
+        {
+            prepBroker(msg.getIntProperty(INDEX));
+        }
+        catch (Exception e)
+        {
+            // Provide details of what went wrong with the stack trace
+            e.printStackTrace();
+            fail("Unable to prep new broker," + e);
+        }
+    }
+
+    // Instance varilable for DirtyAcking test    
+    int _msgCount = 0;
+    boolean _cleaned = false;
 
     class DirtyAckingHandler implements MessageListener
     {
@@ -164,10 +237,10 @@
             try
             {
                 // Check we have the next message as expected
-                assertNotNull("Message " + msgCount + " not correctly received.", message);
-                assertEquals("Incorrect message received", msgCount, message.getIntProperty(INDEX));
+                assertNotNull("Message " + _msgCount + " not correctly received.", message);
+                assertEquals("Incorrect message received", _msgCount, message.getIntProperty(INDEX));
 
-                if (msgCount == 0 && _failoverCompleted.getCount() != 0)
+                if (_msgCount == 0 && _failoverCompleted.getCount() != 0)
                 {
                     // This is the first message we've received so lets fail the broker
 
@@ -180,16 +253,16 @@
                     return;
                 }
 
-                msgCount++;
+                _msgCount++;
 
                 // Don't acknowlege the first message after failover so we can commit
                 // them together
-                if (msgCount == 1)
+                if (_msgCount == 1)
                 {
-                    _logger.error("Received first msg after failover ignoring:" + msgCount);
+                    _logger.error("Received first msg after failover ignoring:" + _msgCount);
 
                     // Acknowledge the first message if we are now on the cleaned pass
-                    if (cleaned)
+                    if (_cleaned)
                     {
                         _receivedAll.countDown();
                     }
@@ -202,7 +275,7 @@
                     try
                     {
                         _consumerSession.commit();
-                        if (!cleaned)
+                        if (!_cleaned)
                         {
                             fail("Session is dirty we should get an TransactionRolledBackException");
                         }
@@ -217,7 +290,7 @@
                     try
                     {
                         message.acknowledge();
-                        if (!cleaned)
+                        if (!_cleaned)
                         {
                             fail("Session is dirty we should get an IllegalStateException");
                         }
@@ -232,14 +305,14 @@
 
                 // Acknowledge the last message if we are in a clean state
                 // this will then trigger test teardown.
-                if (cleaned)
+                if (_cleaned)
                 {
                     _receivedAll.countDown();
                 }
 
                 //Reset message count so we can try again.
-                msgCount = 0;
-                cleaned = true;
+                _msgCount = 0;
+                _cleaned = true;
             }
             catch (Exception e)
             {

Modified: qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java?rev=907004&r1=907003&r2=907004&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java Fri Feb  5 17:04:13 2010
@@ -32,6 +32,10 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+/**
+ * This test extends the synchronous AcknowledgeTest to use a MessageListener
+ * and receive messages asynchronously.
+ */
 public class AcknowledgeOnMessageTest extends AcknowledgeTest implements MessageListener
 {
     protected CountDownLatch _receivedAll;
@@ -43,6 +47,13 @@
         super.setUp();
     }
 
+    /**
+     * Override the synchronous AcknowledgeTest init to provide the _receivedAll
+     * CountDownLatch init and ensure that we set the MessageListener.
+     * @param transacted
+     * @param mode
+     * @throws Exception
+     */
     @Override
     public void init(boolean transacted, int mode) throws Exception
     {
@@ -53,11 +64,25 @@
     }
 
     /**
+     * This test overrides the testAcking from the simple recieve() model to all
+     * for asynchronous receiving of messages.
+     *
+     * Again the transaction/ack mode is provided to this main test run
+     *
+     * The init method is called which will setup the listener so that we can
+     * then sit and await using the _receivedAll CountDownLatch. We wait for up
+     * to 10s if no messages have been received in the last 10s then test will
+     * fail.
+     *
+     * If the test fails then it will attempt to retrieve any exception that the
+     * asynchronous delivery thread may have recorded.
+     *
      * @param transacted
      * @param mode
      *
      * @throws Exception
      */
+    @Override
     protected void testAcking(boolean transacted, int mode) throws Exception
     {
         init(transacted, mode);
@@ -69,7 +94,7 @@
         int lastCount = NUM_MESSAGES;
 
         // Wait for messages to arrive
-        boolean complete = _receivedAll.await(5000L, TimeUnit.MILLISECONDS);
+        boolean complete = _receivedAll.await(10000L, TimeUnit.MILLISECONDS);
 
         // If the messasges haven't arrived
         while (!complete)
@@ -90,7 +115,7 @@
             lastCount = currentCount;
 
             // Wait again for messages to arrive.
-            complete = _receivedAll.await(5000L, TimeUnit.MILLISECONDS);
+            complete = _receivedAll.await(10000L, TimeUnit.MILLISECONDS);
         }
 
         // If we failed to receive all the messages then fail the test.
@@ -105,6 +130,7 @@
             }
             else
             {
+                _logger.info("AOMT: Check QueueDepth:" + _queue);
                 long onQueue=((AMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE)).getQueueDepth((AMQDestination) _queue);
                 fail("All messages not received missing:" + _receivedAll.getCount() + "/" + NUM_MESSAGES+" On Queue:"+onQueue);
 
@@ -136,14 +162,27 @@
 
         _consumerSession.close();
 
+        _logger.info("AOMT: check number of message at end of test.");
         assertEquals("Wrong number of messages on queue", 0,
                      ((AMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE)).getQueueDepth((AMQDestination) _queue));
     }
 
+    /**
+     * The MessageListener interface that recieves the message and counts down
+     * the _receivedAll CountDownLatch.
+     *
+     * Again like AcknowledgeTest acknowledgement is actually handled in
+     * doAcknowlegement.
+     *
+     * The message INDEX is validated to ensure the correct message order is
+     * preserved.
+     *
+     * @param message
+     */
     public void onMessage(Message message)
     {
         // Log received Message for debugging
-        System.out.println("RECEIVED MESSAGE:" + message);
+        _logger.info("RECEIVED MESSAGE:" + message);
 
         try
         {
@@ -164,7 +203,7 @@
         }
         catch (Exception e)
         {
-            // This will end the test run by counting down _receviedAll 
+            // This will end the test run by counting down _receivedAll
             fail(e);
         }
     }
@@ -176,6 +215,7 @@
      */
     protected void fail(Exception e)
     {
+        //record the failure
         _causeOfFailure.set(e);
         // End the test.
         while (_receivedAll.getCount() != 0)

Modified: qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java?rev=907004&r1=907003&r2=907004&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java Fri Feb  5 17:04:13 2010
@@ -33,6 +33,12 @@
 import javax.jms.Session;
 import javax.jms.MessageProducer;
 
+/**
+ * Test the various JMS Acknowledge Modes the single testAcking method does all
+ * the work of receiving and validation of acking.
+ *
+ * The ack mode is provided from the various test methods. 
+ */
 public class AcknowledgeTest extends FailoverBaseCase
 {
     protected int NUM_MESSAGES;
@@ -50,6 +56,7 @@
 
         _queue = getTestQueue();
 
+        _logger.info("AT: setup");
         //Create Producer put some messages on the queue
         _connection = getConnection();
     }
@@ -68,6 +75,16 @@
     }
 
     /**
+     * The main test method.
+     *
+     * Receive the initial message and then proceed to send and ack messages
+     * until we have processed NUM_MESSAGES worth of messages.
+     *
+     * Each message is tagged with an INDEX value and these are used to check
+     * that the messages are received in the correct order.
+     *
+     * The test concludes by validating that the queue depth is 0 as expected.
+     *
      * @param transacted
      * @param mode
      *



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org