You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/05/07 15:25:39 UTC

svn commit: r654097 - in /incubator/qpid/branches/M2.x/dotnet: Qpid.Client/Client/AMQConnection.cs Qpid.Client/Client/AmqChannel.cs Qpid.Integration.Tests/interactive/FailoverTest.cs

Author: aidan
Date: Wed May  7 06:25:38 2008
New Revision: 654097

URL: http://svn.apache.org/viewvc?rev=654097&view=rev
Log:
QPID-952, QPID-951, QPID-1032 Fix failover, ensure that it is properly detected, that frames are replayed approrpiately and that failover does not timeout.

Modified:
    incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AMQConnection.cs
    incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs
    incubator/qpid/branches/M2.x/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs

Modified: incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AMQConnection.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AMQConnection.cs?rev=654097&r1=654096&r2=654097&view=diff
==============================================================================
--- incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AMQConnection.cs (original)
+++ incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AMQConnection.cs Wed May  7 06:25:38 2008
@@ -69,7 +69,7 @@
 
         internal bool IsFailoverAllowed
         {
-            get { return _failoverPolicy.FailoverAllowed(); }
+            get { if(!_connected) return false; else return _failoverPolicy.FailoverAllowed(); }
         }
 
         /// <summary>
@@ -151,34 +151,22 @@
                     _log.Error("Unable to connect to broker " + _failoverPolicy.GetCurrentBrokerInfo(), e);
                     // XXX: Should perhaps break out of the do/while here if not a SocketException...
                 }
-            } while (_failoverPolicy.FailoverAllowed());
+            } while (!_connected && _failoverPolicy.FailoverAllowed());
 
             _log.Debug("Are we connected:" + _connected);
-            
-            if (!_failoverPolicy.FailoverAllowed())
-            {
-                if ( lastException is AMQException )
-                   throw lastException;
-                else
-                   throw new AMQConnectionException("Unable to connect", lastException);
-            }
 
-            // TODO: this needs to be redone so that we are not spinning.
-            // A suitable object should be set that is then waited on
-            // and only notified when a connection is made or when
-            // the AMQConnection gets closed.
-            while (!_connected && !Closed)
+            if (!_connected)
             {
-                _log.Debug("Sleeping.");
-                Thread.Sleep(100);
-            }
-            if (!_failoverPolicy.FailoverAllowed() || _failoverPolicy.GetCurrentBrokerInfo() == null)
-            {
-                if (_lastAMQException != null)
-                {
-                    throw _lastAMQException;
-                }
+            	if ( lastException is AMQException )
+            	{
+            		throw lastException;
+            	}
+            	else
+            	{
+            		throw new AMQConnectionException("Unable to connect", lastException);
+            	}
             }
+            
         }
 
         /*private ITransport LoadTransportFromAssembly(string host, int port, String assemblyName, String transportType)

Modified: incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs?rev=654097&r1=654096&r2=654097&view=diff
==============================================================================
--- incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs (original)
+++ incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs Wed May  7 06:25:38 2008
@@ -907,12 +907,16 @@
             AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0,
                                                               queueName, exchangeName,
                                                               routingKey, false, args);
-            _replayFrames.Add(queueBind);
+            
 
             lock (_connection.FailoverMutex)
             {
                 _connection.ConvenientProtocolWriter.SyncWrite(queueBind, typeof(QueueBindOkBody));
             }
+            // AS FIXME: wasnae me
+            _replayFrames.Add(QueueBindBody.CreateAMQFrame(_channelId, 0,
+                                                           queueName, exchangeName,
+                                                           routingKey, true, args));
         }
 
         private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode, String tag)
@@ -936,9 +940,7 @@
                 _logger.Debug(string.Format("DeleteQueue name={0}", queueName));
                 
                 AMQFrame queueDelete = QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, noWait);
-
-                _replayFrames.Add(queueDelete);
-
+                
                 if (noWait)
                 {
                     _connection.ProtocolWriter.Write(queueDelete);
@@ -947,6 +949,8 @@
                 {
                     _connection.ConvenientProtocolWriter.SyncWrite(queueDelete, typeof(QueueDeleteOkBody));
                 }
+                // AS FIXME: wasnae me
+                _replayFrames.Add(QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, true));
             }
             catch (AMQException)
             {
@@ -962,12 +966,14 @@
             AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive,
                                                                     isAutoDelete, false, null);
 
-            _replayFrames.Add(queueDeclare);
 
             lock (_connection.FailoverMutex)
             {
                 _connection.ConvenientProtocolWriter.SyncWrite(queueDeclare, typeof(QueueDeclareOkBody));
             }
+            // AS FIXME: wasnae me
+            _replayFrames.Add(QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive,
+                                                                    isAutoDelete, true, null));
         }
 
         // AMQP-level method.
@@ -980,8 +986,6 @@
 
             AMQFrame declareExchange = ExchangeDeclareBody.CreateAMQFrame(channelId, ticket, exchangeName, exchangeClass, passive, 
                                                                           durable, autoDelete, xinternal, noWait, args);
-
-            _replayFrames.Add(declareExchange);
             
             if (noWait)
             {
@@ -989,6 +993,8 @@
                 {
                     _connection.ProtocolWriter.Write(declareExchange);
                 }
+                // AS FIXME: wasnae me
+            	_replayFrames.Add(declareExchange);
             }
             else
             {

Modified: incubator/qpid/branches/M2.x/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs?rev=654097&r1=654096&r2=654097&view=diff
==============================================================================
--- incubator/qpid/branches/M2.x/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs (original)
+++ incubator/qpid/branches/M2.x/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs Wed May  7 06:25:38 2008
@@ -1,320 +1,397 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Runtime.InteropServices;
-using System.Threading;
-using log4net;
-using NUnit.Framework;
-using Apache.Qpid.Client.Qms;
-using Apache.Qpid.Client;
-using Apache.Qpid.Messaging;
-
-namespace Apache.Qpid.Integration.Tests.interactive
-{
-    [TestFixture, Category("Interactive")]
-    public class FailoverTest : IConnectionListener
-    {
-        private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverTest));
-
-        /// <summary>Specifies the number of times to run the test cycle.</summary>
-        const int NUM_MESSAGES = 10;
-
-        /// <summary>Determines how many messages to send within each commit.</summary>
-        const int COMMIT_BATCH_SIZE = 1;
-
-        /// <summary>Specifies the duration of the pause to place between each message sent in the test.</summary>
-        //const int SLEEP_MILLIS = 1;
-
-        /// <summary>Specified the maximum time in milliseconds to wait for the test to complete.</summary>
-        const int TIMEOUT = 10000;
-
-        /// <summary>Defines the number of test messages to send, before prompting the user to fail a broker.</summary>
-        const int FAIL_POINT = 5;
-
-        /// <summary>Specified the ack mode to use for the test.</summary>
-        AcknowledgeMode _acknowledgeMode = AcknowledgeMode.AutoAcknowledge;
-
-        /// <summary>Determines whether this test runs transactionally or not. </summary>
-        bool transacted = false;
-
-        /// <summary>Holds the connection to run the test over.</summary>
-        AMQConnection _connection;
-
-        /// <summary>Holds the channel for the test message publisher. </summary>
-        IChannel publishingChannel;
-
-        /// <summary>Holds the test message publisher. </summary>
-        IMessagePublisher publisher;
-
-        /// <summary>Used to keep count of the number of messages sent. </summary>
-        int messagesSent;
-
-        /// <summary>Used to keep count of the number of messages received. </summary>
-        int messagesReceived;
-
-        /// <summary>Used to wait for test completion on. </summary>
-        private static object testComplete = new Object();
-
-        /// <summary>
-        /// Creates the test connection with a fail-over set up, and a producer/consumer pair on that connection.
-        /// </summary>
-        /// [SetUp]
-        public void Init(IConnectionInfo connectionInfo)
-        {
-            // Reset all counts.
-            messagesSent = 0;
-            messagesReceived = 0;
-
-            // Create a connection for the test.
-            _connection = new AMQConnection(connectionInfo);
-            _connection.ConnectionListener = this;
-
-            // Create a consumer to receive the test messages.
-            IChannel receivingChannel = _connection.CreateChannel(false, _acknowledgeMode);
-
-            string queueName = receivingChannel.GenerateUniqueName();
-            receivingChannel.DeclareQueue(queueName, false, true, true);
-            receivingChannel.Bind(queueName, "amq.direct", queueName);
-
-            IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName)
-                .WithPrefetchLow(30)
-                .WithPrefetchHigh(60).Create();
-
-            consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
-            _connection.Start();
-
-            // Create a publisher to send the test messages.
-            publishingChannel = _connection.CreateChannel(transacted, AcknowledgeMode.NoAcknowledge);
-            publisher = publishingChannel.CreatePublisherBuilder()
-                .WithRoutingKey(queueName)
-                .Create();
-
-            _log.Debug("connection = " + _connection);
-            _log.Debug("connectionInfo = " + connectionInfo);
-            _log.Debug("connection.AsUrl = " + _connection.toURL());
-            _log.Debug("AcknowledgeMode is " + _acknowledgeMode);
-        }
-
-        /// <summary>
-        /// Clean up the test connection.
-        /// </summary>
-        [TearDown]
-        public virtual void Shutdown()
-        {
-            Thread.Sleep(2000);
-            _connection.Close();
-        }
-
-        /// <summary>
-        /// Runs a failover test, building up the connection information from its component parts. In particular the brokers
-        /// to fail between are seperately added into the connection info.
-        /// </summary>
-        /*[Test]
-        public void TestWithBasicInfo()
-        {
-            _log.Debug("public void TestWithBasicInfo(): called");
-
-            // Manually create the connection parameters.
-            QpidConnectionInfo connectionInfo = new QpidConnectionInfo();
-            connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false));
-            connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5673, false));
-
-            Init(connectionInfo);
-            DoFailoverTest();
-        }*/
-
-        /// <summary>
-        /// Runs a failover test, with the failover configuration specified in the Qpid connection URL format.
-        /// </summary>
-        [Test]
-        public void TestWithUrl()
-        {
-            _log.Debug("public void runTestWithUrl(): called");
-
-            // Parse the connection parameters from a URL.
-            String clientId = "failover" + DateTime.Now.Ticks;
-            string defaultUrl = "amqp://guest:guest@" + clientId + "/test" +
-                "?brokerlist='tcp://localhost:5672;tcp://localhost:5673'&failover='roundrobin'";            
-            IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(defaultUrl);
-            
-            Init(connectionInfo);
-            DoFailoverTest();
-        }
-
-        /// <summary>
-        /// Send the test messages, prompting at the fail point for the user to cause a broker failure. The test checks that all messages sent
-        /// are received within the test time limit.
-        /// </summary>
-        ///
-        /// <param name="connectionInfo">The connection parameters, specifying the brokers to fail between.</param>
-        void DoFailoverTest()
-        {
-            _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): called");
-
-            for (int i = 1; i <= NUM_MESSAGES; ++i)
-            {
-                ITextMessage msg = publishingChannel.CreateTextMessage("message=" + messagesSent);
-                //_log.Debug("sending message = " + msg.Text);
-                publisher.Send(msg);
-                messagesSent++;
-
-                _log.Debug("messagesSent = " + messagesSent);
-
-                if (transacted)
-                {
-                    publishingChannel.Commit();
-                }
-
-                // Prompt the user to cause a failure if at the fail point.
-                if (i == FAIL_POINT)
-                {
-                    PromptAndWait("Cause a broker failure now, then press return...");
-                }
-
-                //Thread.Sleep(SLEEP_MILLIS);               
-            }
-
-            // Wait for all of the test messages to be received, checking that this occurs within the test time limit.
-            bool withinTimeout;
-
-            lock(testComplete)
-            {
-                withinTimeout = Monitor.Wait(testComplete, TIMEOUT);
-            }            
-
-            if (!withinTimeout)
-            {
-                Assert.Fail("Test timed out, before all messages received.");
-            }
-
-            _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): exiting");
-        }
-
-        /// <summary>
-        /// Receives all of the test messages.
-        /// </summary>
-        ///
-        /// <param name="message">The newly arrived test message.</param>
-        public void OnMessage(IMessage message)
-        {
-            try
-            {
-                if (_acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
-                {
-                    message.Acknowledge();
-                }
-
-                messagesReceived++;
-
-                _log.Debug("messagesReceived = " + messagesReceived);
-
-                // Check if all of the messages in the test have been received, in which case notify the message producer that the test has 
-                // succesfully completed.
-                if (messagesReceived == NUM_MESSAGES)
-                {
-                    lock (testComplete)
-                    {
-                        Monitor.Pulse(testComplete);
-                    }
-                }
-            }
-            catch (QpidException e)
-            {
-                _log.Fatal("Exception received. About to stop.", e);
-                Stop();
-            }
-        }
-
-        /// <summary>Prompts the user on stdout and waits for a reply on stdin, using the specified prompt message.</summary>
-        ///
-        /// <param name="message">The message to prompt the user with.</param>
-        private void PromptAndWait(string message)
-        {
-            Console.WriteLine("\n" + message);
-            Console.ReadLine();
-        }
-
-        // <summary>Closes the test connection.</summary>
-        private void Stop()
-        {
-            _log.Debug("Stopping...");
-            try
-            {
-                _connection.Close();
-            }
-            catch (QpidException e)
-            {
-                _log.Debug("Failed to shutdown: ", e);
-            }
-        }
-
-        /// <summary>
-        /// Called when bytes have been transmitted to the server
-        /// </summary>
-        ///
-        /// <param>count the number of bytes sent in total since the connection was opened</param>     
-        public void BytesSent(long count) {}
-
-        /// <summary>
-        /// Called when some bytes have been received on a connection
-        /// </summary>
-        ///
-        /// <param>count the number of bytes received in total since the connection was opened</param>         
-        public void BytesReceived(long count) {}
-
-        /// <summary>
-        /// Called after the infrastructure has detected that failover is required but before attempting failover.
-        /// </summary>
-        ///
-        /// <param>redirect true if the broker requested redirect. false if failover is occurring due to a connection error.</param>
-        ///
-        /// <return>true to continue failing over, false to veto failover and raise a connection exception</return>         
-        public bool PreFailover(bool redirect) 
-        {
-            _log.Debug("public bool PreFailover(bool redirect): called");
-            return true; 
-        }
-
-        /// <summary>
-        /// Called after connection has been made to another broker after failover has been started but before
-        /// any resubscription has been done.
-        /// </summary>
-        ///
-        /// <return> true to continue with resubscription, false to prevent automatic resubscription. This is useful in
-        /// cases where the application wants to handle resubscription. Note that in the latter case all sessions, producers
-        /// and consumers are invalidated.
-        /// </return>
-        public bool PreResubscribe() 
-        {
-            _log.Debug("public bool PreResubscribe(): called");
-            return true; 
-        }
-
-        /// <summary>
-        /// Called once failover has completed successfully. This is called irrespective of whether the client has
-        /// vetoed automatic resubscription.
-        /// </summary>
-        public void FailoverComplete() 
-        {
-            _log.Debug("public void FailoverComplete(): called");
-        }
-    }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Runtime.InteropServices;
+using System.Threading;
+using log4net;
+using NUnit.Framework;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client;
+using Apache.Qpid.Messaging;
+
+namespace Apache.Qpid.Integration.Tests.interactive
+{
+    [TestFixture, Category("Interactive")]
+    public class FailoverTest : IConnectionListener
+    {
+        private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverTest));
+
+        /// <summary>Specifies the number of times to run the test cycle.</summary>
+        const int NUM_MESSAGES = 10;
+
+        /// <summary>Determines how many messages to send within each commit.</summary>
+        const int COMMIT_BATCH_SIZE = 1;
+
+        /// <summary>Specifies the duration of the pause to place between each message sent in the test.</summary>
+        //const int SLEEP_MILLIS = 1;
+
+        /// <summary>Specified the maximum time in milliseconds to wait for the test to complete.</summary>
+        const int TIMEOUT = 10000;
+
+        /// <summary>Defines the number of test messages to send, before prompting the user to fail a broker.</summary>
+        const int FAIL_POINT = 5;
+
+        /// <summary>Specified the ack mode to use for the test.</summary>
+        AcknowledgeMode _acknowledgeMode = AcknowledgeMode.AutoAcknowledge;
+
+        /// <summary>Determines whether this test runs transactionally or not. </summary>
+        bool transacted = false;
+
+        /// <summary>Holds the connection to run the test over.</summary>
+        AMQConnection _connection;
+
+        /// <summary>Holds the channel for the test message publisher. </summary>
+        IChannel publishingChannel;
+
+        /// <summary>Holds the test message publisher. </summary>
+        IMessagePublisher publisher;
+
+        /// <summary>Used to keep count of the number of messages sent. </summary>
+        int messagesSent;
+
+        /// <summary>Used to keep count of the number of messages received. </summary>
+        int messagesReceived;
+
+        /// <summary>Used to wait for test completion on. </summary>
+        private static object testComplete = new Object();
+
+        /// <summary>Used to wait for failover completion on. </summary>
+	private static object failoverComplete = new Object();
+	
+        bool failedOver=false;
+
+        /// <summary>Used to record the extra message count (1) if the message sent right after failover actually made it to the new broker.</summary>
+        int _extraMessage = 0;
+	
+        /// <summary>
+        /// Creates the test connection with a fail-over set up, and a producer/consumer pair on that connection.
+        /// </summary>
+        /// [SetUp]
+        public void Init(IConnectionInfo connectionInfo)
+        {
+	    //log4net.Config.BasicConfigurator.Configure();
+            // Reset all counts.
+            messagesSent = 0;
+            messagesReceived = 0;
+            failedOver=false;
+            _extraMessage = 0;
+
+	    PromptAndWait("Ensure both brokers are running, then press Enter");	    
+	    
+            // Create a connection for the test.
+            _connection = new AMQConnection(connectionInfo);
+            _connection.ConnectionListener = this;
+
+            // Create a consumer to receive the test messages.
+            IChannel receivingChannel = _connection.CreateChannel(false, _acknowledgeMode);
+
+            string queueName = receivingChannel.GenerateUniqueName();
+            receivingChannel.DeclareQueue(queueName, false, true, true);
+            receivingChannel.Bind(queueName, "amq.direct", queueName);
+
+            IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName)
+                .WithPrefetchLow(30)
+                .WithPrefetchHigh(60).Create();
+
+            consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
+            _connection.Start();
+
+            // Create a publisher to send the test messages.
+            publishingChannel = _connection.CreateChannel(transacted, AcknowledgeMode.NoAcknowledge);
+            publisher = publishingChannel.CreatePublisherBuilder()
+                .WithRoutingKey(queueName)
+                .Create();
+
+            _log.Debug("connection = " + _connection);
+            _log.Debug("connectionInfo = " + connectionInfo);
+            _log.Debug("connection.AsUrl = " + _connection.toURL());
+            _log.Debug("AcknowledgeMode is " + _acknowledgeMode);
+        }
+
+        /// <summary>
+        /// Clean up the test connection.
+        /// </summary>
+        [TearDown]
+        public virtual void Shutdown()
+        {
+ 	    if (!failedOver)
+	    {
+                 Assert.Fail("The failover callback never occured.");
+            }
+
+            Console.WriteLine("Test done shutting down");
+	    Thread.Sleep(2000);
+            _connection.Close();
+        }
+
+        /// <summary>
+        /// Runs a failover test, building up the connection information from its component parts. In particular the brokers
+        /// to fail between are seperately added into the connection info.
+        /// </summary>
+        /*[Test]
+        public void TestWithBasicInfo()
+        {
+            _log.Debug("public void TestWithBasicInfo(): called");
+
+            // Manually create the connection parameters.
+            QpidConnectionInfo connectionInfo = new QpidConnectionInfo();
+            connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false));
+            connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5673, false));
+
+            Init(connectionInfo);
+            DoFailoverTest();
+        }*/
+
+        /// <summary>
+        /// Runs a failover test, with the failover configuration specified in the Qpid connection URL format.
+        /// </summary>
+        [Test]
+        public void TestWithUrl()
+        {
+            _log.Debug("public void runTestWithUrl(): called");
+
+            // Parse the connection parameters from a URL.
+            String clientId = "failover" + DateTime.Now.Ticks;
+            string defaultUrl = "amqp://guest:guest@" + clientId + "/test" +
+                "?brokerlist='tcp://localhost:9672;tcp://localhost:9673'&failover='roundrobin'";            
+            IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(defaultUrl);
+            
+            Init(connectionInfo);
+            DoFailoverTest(0);
+        }
+
+        /// <summary>
+        /// Send the test messages, prompting at the fail point for the user to cause a broker failure. The test checks that all messages sent
+        /// are received within the test time limit.
+        /// </summary>
+        ///
+        /// <param name="connectionInfo">The connection parameters, specifying the brokers to fail between.</param>
+        void DoFailoverTest(int delay)
+        {
+            _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): called");
+
+            // Wait for all of the test messages to be received, checking that this occurs within the test time limit.
+	    bool withinTimeout = false;
+
+            for (int i = 1; i <= NUM_MESSAGES; ++i)
+            {
+		SendMessage();
+
+		// Prompt the user to cause a failure if at the fail point.
+		if (i == FAIL_POINT)
+		{
+		    for( int min = delay ; min > 0 ; min--)
+		    {
+		       Console.WriteLine("Waiting for "+min+" minutes to test connection time bug.");
+		       Thread.Sleep(60*1000);
+		    }
+
+		    PromptAndWait("Cause a broker failure now, then press return.");
+		    Console.WriteLine("NOTE: ensure that the delay between killing the broker and continuing here is less than 20 second");
+		    
+		    Console.WriteLine("Sending a message to ensure send right after works");
+
+		    SendMessage();
+
+		    Console.WriteLine("Waiting for fail-over to complete before continuing...");
+
+
+		    lock(failoverComplete)
+		    {
+			if (!failedOver)
+			{
+			    withinTimeout = Monitor.Wait(failoverComplete, TIMEOUT);
+			}
+			else
+			{
+			    withinTimeout=true;
+			}
+		    }
+
+		    if (!withinTimeout)
+		    {
+			PromptAndWait("Failover has not yet occured. Press enter to give up waiting.");
+		    }
+		}
+	    }
+
+            lock(testComplete)
+            {
+                withinTimeout = Monitor.Wait(testComplete, TIMEOUT);
+            }            
+
+            if (!withinTimeout)
+            {
+                Assert.Fail("Test timed out, before all messages received.");
+            }
+
+            _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): exiting");
+        }
+
+	[Test]
+        public void Test5MinuteWait()
+	{
+	    String clientId = "failover" + DateTime.Now.Ticks;
+
+	    QpidConnectionInfo connectionInfo = new QpidConnectionInfo();
+	    connectionInfo.Username = "guest";
+	    connectionInfo.Password = "guest";
+	    connectionInfo.ClientName = clientId;
+	    connectionInfo.VirtualHost = "/test";
+	    connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 9672, false));
+	    connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 9673, false));
+	    
+	    Init(connectionInfo);
+	    DoFailoverTest(5);
+	}
+
+	void SendMessage()
+	{
+	    ITextMessage msg = publishingChannel.CreateTextMessage("message=" + messagesSent);
+
+	    publisher.Send(msg);
+	    messagesSent++;
+
+	    if (transacted)
+	    {
+		publishingChannel.Commit();
+	    }
+	    
+	    Console.WriteLine("messagesSent = " + messagesSent);
+	}
+	
+        /// <summary>
+        /// Receives all of the test messages.
+        /// </summary>
+        ///
+        /// <param name="message">The newly arrived test message.</param>
+        public void OnMessage(IMessage message)
+        {
+            try
+            {
+                if (_acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
+                {
+                    message.Acknowledge();
+                }
+
+                messagesReceived++;
+
+                _log.Debug("messagesReceived = " + messagesReceived);
+
+                // Check if all of the messages in the test have been received, in which case notify the message producer that the test has 
+                // succesfully completed.
+                if (messagesReceived == NUM_MESSAGES + _extraMessage)
+                {
+                    lock (testComplete)
+                    {
+			failedOver = true;
+                        Monitor.Pulse(testComplete);
+                    }
+                }
+            }
+            catch (QpidException e)
+            {
+                _log.Fatal("Exception received. About to stop.", e);
+                Stop();
+            }
+        }
+
+        /// <summary>Prompts the user on stdout and waits for a reply on stdin, using the specified prompt message.</summary>
+        ///
+        /// <param name="message">The message to prompt the user with.</param>
+        private void PromptAndWait(string message)
+        {
+            Console.WriteLine("\n" + message);
+            Console.ReadLine();
+        }
+
+        // <summary>Closes the test connection.</summary>
+        private void Stop()
+        {
+            _log.Debug("Stopping...");
+            try
+            {
+                _connection.Close();
+            }
+            catch (QpidException e)
+            {
+                _log.Debug("Failed to shutdown: ", e);
+            }
+        }
+
+        /// <summary>
+        /// Called when bytes have been transmitted to the server
+        /// </summary>
+        ///
+        /// <param>count the number of bytes sent in total since the connection was opened</param>     
+        public void BytesSent(long count) {}
+
+        /// <summary>
+        /// Called when some bytes have been received on a connection
+        /// </summary>
+        ///
+        /// <param>count the number of bytes received in total since the connection was opened</param>         
+        public void BytesReceived(long count) {}
+
+        /// <summary>
+        /// Called after the infrastructure has detected that failover is required but before attempting failover.
+        /// </summary>
+        ///
+        /// <param>redirect true if the broker requested redirect. false if failover is occurring due to a connection error.</param>
+        ///
+        /// <return>true to continue failing over, false to veto failover and raise a connection exception</return>         
+        public bool PreFailover(bool redirect) 
+        {
+            _log.Debug("public bool PreFailover(bool redirect): called");
+            return true; 
+        }
+
+        /// <summary>
+        /// Called after connection has been made to another broker after failover has been started but before
+        /// any resubscription has been done.
+        /// </summary>
+        ///
+        /// <return> true to continue with resubscription, false to prevent automatic resubscription. This is useful in
+        /// cases where the application wants to handle resubscription. Note that in the latter case all sessions, producers
+        /// and consumers are invalidated.
+        /// </return>
+        public bool PreResubscribe() 
+        {
+            _log.Debug("public bool PreResubscribe(): called");
+            return true; 
+        }
+
+        /// <summary>
+        /// Called once failover has completed successfully. This is called irrespective of whether the client has
+        /// vetoed automatic resubscription.
+        /// </summary>
+        public void FailoverComplete() 
+        {
+	    failedOver = true;
+            _log.Debug("public void FailoverComplete(): called");
+	    Console.WriteLine("public void FailoverComplete(): called");
+	    lock (failoverComplete) 
+	    {
+	      Monitor.Pulse(failoverComplete);
+	    }
+        }
+    }
+}