You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2008/07/16 14:37:40 UTC

svn commit: r677259 - in /incubator/qpid/trunk/qpid/java: 010ExcludeList 08ExcludeList systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java

Author: ritchiem
Date: Wed Jul 16 05:37:40 2008
New Revision: 677259

URL: http://svn.apache.org/viewvc?rev=677259&view=rev
Log:
Converted client.failover.FailoverTest so it can utilise the standard mechanism for failover testing, as the local CruiseControl had testP2PFailoveWithMessagesLeft fail with extra messages being left on broker.

Modified:
    incubator/qpid/trunk/qpid/java/010ExcludeList
    incubator/qpid/trunk/qpid/java/08ExcludeList
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java

Modified: incubator/qpid/trunk/qpid/java/010ExcludeList
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/010ExcludeList?rev=677259&r1=677258&r2=677259&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/010ExcludeList (original)
+++ incubator/qpid/trunk/qpid/java/010ExcludeList Wed Jul 16 05:37:40 2008
@@ -32,6 +32,7 @@
 org.apache.qpid.test.client.QueueBrowserTransactedTest#testFailoverAsQueueBrowserCreated
 org.apache.qpid.test.client.QueueBrowserTransactedTest#testFailoverWithQueueBrowser
 org.apache.qpid.test.testcases.FailoverTest#*
+org.apache.qpid.test.client.failover.FailoverTest#*
 // Those tests are testing 0.8 specific semantics
 org.apache.qpid.test.testcases.ImmediateMessageTest#test_QPID_517_ImmediateFailsConsumerDisconnectedNoTxP2P
 org.apache.qpid.test.testcases.ImmediateMessageTest#test_QPID_517_ImmediateFailsConsumerDisconnectedTxP2P

Modified: incubator/qpid/trunk/qpid/java/08ExcludeList
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/08ExcludeList?rev=677259&r1=677258&r2=677259&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/08ExcludeList (original)
+++ incubator/qpid/trunk/qpid/java/08ExcludeList Wed Jul 16 05:37:40 2008
@@ -5,3 +5,5 @@
 // Those tests are not finished
 org.apache.qpid.test.testcases.TTLTest#*
 org.apache.qpid.test.testcases.FailoverTest#*
+// This is a long running test so should exclude from normal runs
+org.apache.qpid.test.client.failover.FailoverTest#test4MinuteFailover

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java?rev=677259&r1=677258&r2=677259&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java Wed Jul 16 05:37:40 2008
@@ -21,14 +21,15 @@
 
 package org.apache.qpid.test.client.failover;
 
-import junit.framework.TestCase;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQConnectionURL;
 import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.test.utils.FailoverBaseCase;
 import org.apache.log4j.Logger;
 
 import javax.jms.Connection;
@@ -38,78 +39,62 @@
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.jms.Queue;
+import javax.naming.NamingException;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
-public class FailoverTest extends TestCase implements ConnectionListener
+public class FailoverTest extends FailoverBaseCase implements ConnectionListener
 {
     private static final Logger _logger = Logger.getLogger(FailoverTest.class);
 
-    private static final int NUM_BROKERS = 2;
-    private static final String BROKER = "amqp://guest:guest@/test?brokerlist='vm://:%d;vm://:%d'";
     private static final String QUEUE = "queue";
     private static final int NUM_MESSAGES = 10;
-    private Connection con;
-    private AMQConnectionFactory conFactory;
-    private Session prodSess;
-    private AMQQueue q;
-    private MessageProducer prod;
-    private Session conSess;
+    private Connection connnection;
+    private Session producerSession;
+    private Queue queue;
+    private MessageProducer producer;
+    private Session consumerSession;
     private MessageConsumer consumer;
 
     private static int usedBrokers = 0;
     private CountDownLatch failoverComplete;
+    private static final long DEFAULT_FAILOVER_TIME = 10000L;
 
     @Override
     protected void setUp() throws Exception
     {
         super.setUp();
-        // Create two VM brokers
 
-        for (int i = 0; i < NUM_BROKERS; i++)
-        {
-            usedBrokers++;
-
-            TransportConnection.createVMBroker(usedBrokers);
-        }
-
-        conFactory = new AMQConnectionFactory(String.format(BROKER, usedBrokers - 1, usedBrokers));
-        _logger.info("Connecting on:" + conFactory.getConnectionURL());
-        con = conFactory.createConnection();
-        ((AMQConnection) con).setConnectionListener(this);
-        con.start();
+        connnection = getConnection();
+        ((AMQConnection) connnection).setConnectionListener(this);
+        connnection.start();
         failoverComplete = new CountDownLatch(1);
     }
 
-    private void init(boolean transacted, int mode) throws JMSException
+    private void init(boolean transacted, int mode) throws JMSException, NamingException
     {
-        prodSess = con.createSession(transacted, mode);
-        q = new AMQQueue("amq.direct", QUEUE);
-        prod = prodSess.createProducer(q);
-        conSess = con.createSession(transacted, mode);
-        consumer = conSess.createConsumer(q);
+        queue = (Queue) getInitialContext().lookup(QUEUE);
+
+        consumerSession = connnection.createSession(transacted, mode);
+        consumer = consumerSession.createConsumer(queue);
+
+        producerSession = connnection.createSession(transacted, mode);
+        producer = producerSession.createProducer(queue);
     }
 
     @Override
-    protected void tearDown() throws Exception
+    public void tearDown() throws Exception
     {
         try
         {
-            con.close();
+            connnection.close();
         }
         catch (Exception e)
         {
 
         }
 
-        try
-        {
-            TransportConnection.killAllVMBrokers();
-            ApplicationRegistry.removeAll();
-        }
-        catch (Exception e)
-        {
-            fail("Unable to clean up");
-        }
         super.tearDown();
     }
 
@@ -128,17 +113,8 @@
     {
         for (int i = 0; i < totalMessages; i++)
         {
-            prod.send(prodSess.createTextMessage("message " + i));
+            producer.send(producerSession.createTextMessage("message " + i));
         }
-
-//        try
-//        {
-//            Thread.sleep(100 * totalMessages);
-//        }
-//        catch (InterruptedException e)
-//        {
-//            //evil ignoring of IE
-//        }
     }
 
     public void testP2PFailover() throws Exception
@@ -151,7 +127,7 @@
         testP2PFailover(NUM_MESSAGES, false);
     }
 
-    private void testP2PFailover(int totalMessages, boolean consumeAll) throws JMSException
+    private void testP2PFailover(int totalMessages, boolean consumeAll) throws JMSException, NamingException
     {
         Message msg = null;
         init(false, Session.AUTO_ACKNOWLEDGE);
@@ -168,27 +144,25 @@
 
         _logger.info("Failing over");
 
-        causeFailure();
+        causeFailure(DEFAULT_FAILOVER_TIME);
 
         msg = consumer.receive(500);
-        //todo: reinstate
+
         assertNull("Should not have received message from new broker!", msg);
         // Check that messages still sent / received
         sendMessages(totalMessages);
         consumeMessages(totalMessages);
     }
 
-    private void causeFailure()
+    private void causeFailure(long delay)
     {
-        _logger.info("Failover");
 
-        TransportConnection.killVMBroker(usedBrokers - 1);
-        ApplicationRegistry.remove(usedBrokers - 1);
+        failBroker();
 
         _logger.info("Awaiting Failover completion");
         try
         {
-            failoverComplete.await();
+            failoverComplete.await(delay, TimeUnit.MILLISECONDS);
         }
         catch (InterruptedException e)
         {
@@ -203,8 +177,7 @@
         Message msg = consumer.receive();
         assertNotNull("Expected msgs not received", msg);
 
-
-        causeFailure();
+        causeFailure(DEFAULT_FAILOVER_TIME);
 
         Exception failure = null;
         try
@@ -218,18 +191,41 @@
         assertNotNull("Exception should be thrown", failure);
     }
 
-    // This test disabled so that it doesn't add 4 minnutes to the length of time it takes to run, which would be lame
-    public void txest4MinuteFailover() throws Exception
+    /**
+     * The client used to have a fixed timeout of 4 minutes after which failover would no longer work.
+     * Check that this code has not regressed
+     *
+     * @throws Exception if something unexpected occurs in the test.
+     */
+    public void test4MinuteFailover() throws Exception
     {
-        conFactory = new AMQConnectionFactory("amqp://guest:guest@/test?brokerlist='vm://:"+(usedBrokers-1)+"?connectdelay='60000'&retries='2''");
-        _logger.info("Connecting on:" + conFactory.getConnectionURL());
-        con = conFactory.createConnection();
-        ((AMQConnection) con).setConnectionListener(this);
-        con.start();
-
-        long failTime = System.currentTimeMillis() + 60000;
-        causeFailure();
-        assertTrue("Failover did not take long enough", System.currentTimeMillis() > failTime);
+        ConnectionURL connectionURL = getConnectionFactory().getConnectionURL();
+
+        int RETRIES = 4;
+        int DELAY = 60000;
+
+        //Set up a long delay on and large number of retries
+        BrokerDetails details = connectionURL.getBrokerDetails(1);
+        details.setProperty(BrokerDetails.OPTIONS_RETRY, String.valueOf(RETRIES));
+        details.setProperty(BrokerDetails.OPTIONS_CONNECT_DELAY, String.valueOf(DELAY));
+
+        connnection = new AMQConnection(connectionURL, null);
+
+        ((AMQConnection) connnection).setConnectionListener(this);
+
+        //Start the connection
+        connnection.start();
+
+        long FAILOVER_DELAY = (RETRIES * DELAY);
+
+        // Use Nano seconds as it is more accurate for comparision.
+        long failTime = System.nanoTime() + FAILOVER_DELAY * 1000000;
+
+        //Fail the first broker
+        causeFailure(FAILOVER_DELAY + DEFAULT_FAILOVER_TIME);
+
+        //Reconnection should occur
+        assertTrue("Failover did not take long enough", System.nanoTime() > failTime);
     }
 
     public void bytesSent(long count)