You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/05/07 16:09:28 UTC

svn commit: r654113 - in /incubator/qpid/trunk/qpid: ./ dotnet/Qpid.Client.Transport.Socket.Blocking/ dotnet/Qpid.Client/Client/ dotnet/Qpid.Client/Client/Protocol/ dotnet/Qpid.Integration.Tests/interactive/ dotnet/Qpid.Integration.Tests/testcases/

Author: aidan
Date: Wed May  7 07:09:16 2008
New Revision: 654113

URL: http://svn.apache.org/viewvc?rev=654113&view=rev
Log:
Merged revisions 653420-654109 via svnmerge from 
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.x

........
  r653447 | aidan | 2008-05-05 13:26:29 +0100 (Mon, 05 May 2008) | 1 line
  
  Check  if consumer is closed and dont reclose it 
........
  r653451 | aidan | 2008-05-05 13:29:15 +0100 (Mon, 05 May 2008) | 1 line
  
  QPID-1022 Use synchronous writes to fix race conditions
........
  r653452 | aidan | 2008-05-05 13:30:45 +0100 (Mon, 05 May 2008) | 1 line
  
  QPID-1023 increase some timeouts
........
  r653760 | aidan | 2008-05-06 13:40:34 +0100 (Tue, 06 May 2008) | 3 lines
  
  QPID-1029: Generate temporary queue names using GUIDs to ensure uniqueness.
........
  r654097 | aidan | 2008-05-07 14:25:38 +0100 (Wed, 07 May 2008) | 2 lines
  
  QPID-952, QPID-951, QPID-1032 Fix failover, ensure that it is properly detected, that frames are replayed approrpiately and that failover does not timeout.
........
  r654104 | aidan | 2008-05-07 14:46:51 +0100 (Wed, 07 May 2008) | 1 line
  
  QPID-952 should have been part of previous commit
........
  r654109 | aidan | 2008-05-07 14:56:09 +0100 (Wed, 07 May 2008) | 2 lines
  
  QPID-1036 increase timeouts to more reasonable levels, ensure that durable queues are deleted when no longer needed
........

Modified:
    incubator/qpid/trunk/qpid/   (props changed)
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/BlockingSocketProcessor.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs

Propchange: incubator/qpid/trunk/qpid/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/BlockingSocketProcessor.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/BlockingSocketProcessor.cs?rev=654113&r1=654112&r2=654113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/BlockingSocketProcessor.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/BlockingSocketProcessor.cs Wed May  7 07:09:16 2008
@@ -53,6 +53,11 @@
         {
             _socket = new System.Net.Sockets.Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
 
+	    /// For future note TCP Set NoDelay options may help, though with the blocking io not sure
+	    /// The Don't linger may help with detecting disconnect but that hasn't been the case in testing.
+	    /// _socket.SetSocketOption (SocketOptionLevel.Socket, SocketOptionName.NoDelay, 0);
+	    /// _socket.SetSocketOption (SocketOptionLevel.Socket, SocketOptionName.DontLinger, 0);
+	    
             IPHostEntry ipHostInfo = Dns.Resolve(_host); // Note: don't fix this warning. We do this for .NET 1.1 compatibility.
             IPAddress ipAddress = ipHostInfo.AddressList[0];
 
@@ -77,6 +82,8 @@
             {
                 _log.Error("Write caused exception", e);
                 _protocolListener.OnException(e);
+                // We should provide the error synchronously as we are doing blocking io.
+                throw e;
             }
         }
 
@@ -87,6 +94,17 @@
             
             int numOctets = _networkStream.Read(bytes, 0, bytes.Length);
 
+	    /// Read only returns 0 if the socket has been gracefully shutdown.
+	    /// http://msdn2.microsoft.com/en-us/library/system.net.sockets.networkstream.read(VS.71).aspx            
+	    /// We can use this to block Send so the next Read will force an exception forcing failover.
+	    /// Otherwise we need to wait ~20 seconds for the NetworkStream/Socket code to realise that
+	    /// the socket has been closed.
+	    if (numOctets == 0)
+	    {              
+		_socket.Shutdown(SocketShutdown.Send);
+		_socket.Close();
+	    }
+             
             ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
             byteBuffer.limit(numOctets);
             

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs?rev=654113&r1=654112&r2=654113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs Wed May  7 07:09:16 2008
@@ -69,7 +69,7 @@
 
         internal bool IsFailoverAllowed
         {
-            get { return _failoverPolicy.FailoverAllowed(); }
+            get { if(!_connected) return false; else return _failoverPolicy.FailoverAllowed(); }
         }
 
         /// <summary>
@@ -151,34 +151,22 @@
                     _log.Error("Unable to connect to broker " + _failoverPolicy.GetCurrentBrokerInfo(), e);
                     // XXX: Should perhaps break out of the do/while here if not a SocketException...
                 }
-            } while (_failoverPolicy.FailoverAllowed());
+            } while (!_connected && _failoverPolicy.FailoverAllowed());
 
             _log.Debug("Are we connected:" + _connected);
-            
-            if (!_failoverPolicy.FailoverAllowed())
-            {
-                if ( lastException is AMQException )
-                   throw lastException;
-                else
-                   throw new AMQConnectionException("Unable to connect", lastException);
-            }
 
-            // TODO: this needs to be redone so that we are not spinning.
-            // A suitable object should be set that is then waited on
-            // and only notified when a connection is made or when
-            // the AMQConnection gets closed.
-            while (!_connected && !Closed)
+            if (!_connected)
             {
-                _log.Debug("Sleeping.");
-                Thread.Sleep(100);
-            }
-            if (!_failoverPolicy.FailoverAllowed() || _failoverPolicy.GetCurrentBrokerInfo() == null)
-            {
-                if (_lastAMQException != null)
-                {
-                    throw _lastAMQException;
-                }
+            	if ( lastException is AMQException )
+            	{
+            		throw lastException;
+            	}
+            	else
+            	{
+            		throw new AMQConnectionException("Unable to connect", lastException);
+            	}
             }
+            
         }
 
         /*private ITransport LoadTransportFromAssembly(string host, int port, String assemblyName, String transportType)

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs?rev=654113&r1=654112&r2=654113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs Wed May  7 07:09:16 2008
@@ -888,10 +888,14 @@
         /// <param name="consumer"></param>
         private void RegisterConsumer(BasicMessageConsumer consumer)
         {
+            // Need to generate a consumer tag on the client so we can exploit the nowait flag.
+            String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++);
+            consumer.ConsumerTag = tag;
+            _consumers.Add(tag, consumer);
+            
             String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal,
-                                                  consumer.Exclusive, consumer.AcknowledgeMode);
-            consumer.ConsumerTag = consumerTag;
-            _consumers.Add(consumerTag, consumer);
+                                                  consumer.Exclusive, consumer.AcknowledgeMode, tag);
+            
         }
 
         internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args)
@@ -902,19 +906,21 @@
 
             AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0,
                                                               queueName, exchangeName,
-                                                              routingKey, true, args);
-            _replayFrames.Add(queueBind);
+                                                              routingKey, false, args);
+            
 
             lock (_connection.FailoverMutex)
             {
-                _connection.ProtocolWriter.Write(queueBind);
+                _connection.ConvenientProtocolWriter.SyncWrite(queueBind, typeof(QueueBindOkBody));
             }
+            // AS FIXME: wasnae me
+            _replayFrames.Add(QueueBindBody.CreateAMQFrame(_channelId, 0,
+                                                           queueName, exchangeName,
+                                                           routingKey, true, args));
         }
 
-        private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode)
+        private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode, String tag)
         {
-            // Need to generate a consumer tag on the client so we can exploit the nowait flag.
-            String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++);
             
             AMQFrame basicConsume = BasicConsumeBody.CreateAMQFrame(_channelId, 0,
                                                                     queueName, tag, noLocal,
@@ -934,9 +940,7 @@
                 _logger.Debug(string.Format("DeleteQueue name={0}", queueName));
                 
                 AMQFrame queueDelete = QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, noWait);
-
-                _replayFrames.Add(queueDelete);
-
+                
                 if (noWait)
                 {
                     _connection.ProtocolWriter.Write(queueDelete);
@@ -945,6 +949,8 @@
                 {
                     _connection.ConvenientProtocolWriter.SyncWrite(queueDelete, typeof(QueueDeleteOkBody));
                 }
+                // AS FIXME: wasnae me
+                _replayFrames.Add(QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, true));
             }
             catch (AMQException)
             {
@@ -958,14 +964,16 @@
                                         queueName, isDurable, isExclusive, isAutoDelete));
 
             AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive,
-                                                                    isAutoDelete, true, null);
+                                                                    isAutoDelete, false, null);
 
-            _replayFrames.Add(queueDeclare);
 
             lock (_connection.FailoverMutex)
             {
-                _connection.ProtocolWriter.Write(queueDeclare);
+                _connection.ConvenientProtocolWriter.SyncWrite(queueDeclare, typeof(QueueDeclareOkBody));
             }
+            // AS FIXME: wasnae me
+            _replayFrames.Add(QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive,
+                                                                    isAutoDelete, true, null));
         }
 
         // AMQP-level method.
@@ -978,8 +986,6 @@
 
             AMQFrame declareExchange = ExchangeDeclareBody.CreateAMQFrame(channelId, ticket, exchangeName, exchangeClass, passive, 
                                                                           durable, autoDelete, xinternal, noWait, args);
-
-            _replayFrames.Add(declareExchange);
             
             if (noWait)
             {
@@ -987,6 +993,8 @@
                 {
                     _connection.ProtocolWriter.Write(declareExchange);
                 }
+                // AS FIXME: wasnae me
+            	_replayFrames.Add(declareExchange);
             }
             else
             {

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs?rev=654113&r1=654112&r2=654113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs Wed May  7 07:09:16 2008
@@ -266,7 +266,11 @@
 
         public override void Close()
         {
-            // FIXME: Don't we need FailoverSupport here (as we have SyncWrite). i.e. rather than just locking FailOverMutex
+        	if (_closed == CLOSED) 
+        	{
+        		return;        		
+        	}
+        	// FIXME: Don't we need FailoverSupport here (as we have SyncWrite). i.e. rather than just locking FailOverMutex
             lock (_channel.Connection.FailoverMutex)
             {
                 lock (_closingLock)

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs?rev=654113&r1=654112&r2=654113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs Wed May  7 07:09:16 2008
@@ -35,12 +35,6 @@
         private readonly IProtocolWriter _protocolWriter;
         private readonly IConnectionCloser _connectionCloser;
 
-        /**
-         * Counter to ensure unique queue names
-         */
-        private int _queueId = 1;
-        private readonly Object _queueIdLock = new Object();
-
         /// <summary>
         /// Maps from the channel id to the AmqChannel that it represents.
         /// </summary>
@@ -267,13 +261,7 @@
 
         internal string GenerateQueueName()
         {
-            int id;
-            lock(_queueIdLock)
-            {
-                id = _queueId++;
-            }
-
-            return "tmp_" + _connection.Transport.LocalEndpoint + "_" + id;          
+        	return "ntmp_" + System.Guid.NewGuid();
         }
     }
 }

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs?rev=654113&r1=654112&r2=654113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs Wed May  7 07:09:16 2008
@@ -1,320 +1,397 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Runtime.InteropServices;
-using System.Threading;
-using log4net;
-using NUnit.Framework;
-using Apache.Qpid.Client.Qms;
-using Apache.Qpid.Client;
-using Apache.Qpid.Messaging;
-
-namespace Apache.Qpid.Integration.Tests.interactive
-{
-    [TestFixture, Category("Interactive")]
-    public class FailoverTest : IConnectionListener
-    {
-        private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverTest));
-
-        /// <summary>Specifies the number of times to run the test cycle.</summary>
-        const int NUM_MESSAGES = 10;
-
-        /// <summary>Determines how many messages to send within each commit.</summary>
-        const int COMMIT_BATCH_SIZE = 1;
-
-        /// <summary>Specifies the duration of the pause to place between each message sent in the test.</summary>
-        //const int SLEEP_MILLIS = 1;
-
-        /// <summary>Specified the maximum time in milliseconds to wait for the test to complete.</summary>
-        const int TIMEOUT = 10000;
-
-        /// <summary>Defines the number of test messages to send, before prompting the user to fail a broker.</summary>
-        const int FAIL_POINT = 5;
-
-        /// <summary>Specified the ack mode to use for the test.</summary>
-        AcknowledgeMode _acknowledgeMode = AcknowledgeMode.AutoAcknowledge;
-
-        /// <summary>Determines whether this test runs transactionally or not. </summary>
-        bool transacted = false;
-
-        /// <summary>Holds the connection to run the test over.</summary>
-        AMQConnection _connection;
-
-        /// <summary>Holds the channel for the test message publisher. </summary>
-        IChannel publishingChannel;
-
-        /// <summary>Holds the test message publisher. </summary>
-        IMessagePublisher publisher;
-
-        /// <summary>Used to keep count of the number of messages sent. </summary>
-        int messagesSent;
-
-        /// <summary>Used to keep count of the number of messages received. </summary>
-        int messagesReceived;
-
-        /// <summary>Used to wait for test completion on. </summary>
-        private static object testComplete = new Object();
-
-        /// <summary>
-        /// Creates the test connection with a fail-over set up, and a producer/consumer pair on that connection.
-        /// </summary>
-        /// [SetUp]
-        public void Init(IConnectionInfo connectionInfo)
-        {
-            // Reset all counts.
-            messagesSent = 0;
-            messagesReceived = 0;
-
-            // Create a connection for the test.
-            _connection = new AMQConnection(connectionInfo);
-            _connection.ConnectionListener = this;
-
-            // Create a consumer to receive the test messages.
-            IChannel receivingChannel = _connection.CreateChannel(false, _acknowledgeMode);
-
-            string queueName = receivingChannel.GenerateUniqueName();
-            receivingChannel.DeclareQueue(queueName, false, true, true);
-            receivingChannel.Bind(queueName, "amq.direct", queueName);
-
-            IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName)
-                .WithPrefetchLow(30)
-                .WithPrefetchHigh(60).Create();
-
-            consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
-            _connection.Start();
-
-            // Create a publisher to send the test messages.
-            publishingChannel = _connection.CreateChannel(transacted, AcknowledgeMode.NoAcknowledge);
-            publisher = publishingChannel.CreatePublisherBuilder()
-                .WithRoutingKey(queueName)
-                .Create();
-
-            _log.Debug("connection = " + _connection);
-            _log.Debug("connectionInfo = " + connectionInfo);
-            _log.Debug("connection.AsUrl = " + _connection.toURL());
-            _log.Debug("AcknowledgeMode is " + _acknowledgeMode);
-        }
-
-        /// <summary>
-        /// Clean up the test connection.
-        /// </summary>
-        [TearDown]
-        public virtual void Shutdown()
-        {
-            Thread.Sleep(2000);
-            _connection.Close();
-        }
-
-        /// <summary>
-        /// Runs a failover test, building up the connection information from its component parts. In particular the brokers
-        /// to fail between are seperately added into the connection info.
-        /// </summary>
-        /*[Test]
-        public void TestWithBasicInfo()
-        {
-            _log.Debug("public void TestWithBasicInfo(): called");
-
-            // Manually create the connection parameters.
-            QpidConnectionInfo connectionInfo = new QpidConnectionInfo();
-            connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false));
-            connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5673, false));
-
-            Init(connectionInfo);
-            DoFailoverTest();
-        }*/
-
-        /// <summary>
-        /// Runs a failover test, with the failover configuration specified in the Qpid connection URL format.
-        /// </summary>
-        [Test]
-        public void TestWithUrl()
-        {
-            _log.Debug("public void runTestWithUrl(): called");
-
-            // Parse the connection parameters from a URL.
-            String clientId = "failover" + DateTime.Now.Ticks;
-            string defaultUrl = "amqp://guest:guest@" + clientId + "/test" +
-                "?brokerlist='tcp://localhost:5672;tcp://localhost:5673'&failover='roundrobin'";            
-            IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(defaultUrl);
-            
-            Init(connectionInfo);
-            DoFailoverTest();
-        }
-
-        /// <summary>
-        /// Send the test messages, prompting at the fail point for the user to cause a broker failure. The test checks that all messages sent
-        /// are received within the test time limit.
-        /// </summary>
-        ///
-        /// <param name="connectionInfo">The connection parameters, specifying the brokers to fail between.</param>
-        void DoFailoverTest()
-        {
-            _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): called");
-
-            for (int i = 1; i <= NUM_MESSAGES; ++i)
-            {
-                ITextMessage msg = publishingChannel.CreateTextMessage("message=" + messagesSent);
-                //_log.Debug("sending message = " + msg.Text);
-                publisher.Send(msg);
-                messagesSent++;
-
-                _log.Debug("messagesSent = " + messagesSent);
-
-                if (transacted)
-                {
-                    publishingChannel.Commit();
-                }
-
-                // Prompt the user to cause a failure if at the fail point.
-                if (i == FAIL_POINT)
-                {
-                    PromptAndWait("Cause a broker failure now, then press return...");
-                }
-
-                //Thread.Sleep(SLEEP_MILLIS);               
-            }
-
-            // Wait for all of the test messages to be received, checking that this occurs within the test time limit.
-            bool withinTimeout;
-
-            lock(testComplete)
-            {
-                withinTimeout = Monitor.Wait(testComplete, TIMEOUT);
-            }            
-
-            if (!withinTimeout)
-            {
-                Assert.Fail("Test timed out, before all messages received.");
-            }
-
-            _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): exiting");
-        }
-
-        /// <summary>
-        /// Receives all of the test messages.
-        /// </summary>
-        ///
-        /// <param name="message">The newly arrived test message.</param>
-        public void OnMessage(IMessage message)
-        {
-            try
-            {
-                if (_acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
-                {
-                    message.Acknowledge();
-                }
-
-                messagesReceived++;
-
-                _log.Debug("messagesReceived = " + messagesReceived);
-
-                // Check if all of the messages in the test have been received, in which case notify the message producer that the test has 
-                // succesfully completed.
-                if (messagesReceived == NUM_MESSAGES)
-                {
-                    lock (testComplete)
-                    {
-                        Monitor.Pulse(testComplete);
-                    }
-                }
-            }
-            catch (QpidException e)
-            {
-                _log.Fatal("Exception received. About to stop.", e);
-                Stop();
-            }
-        }
-
-        /// <summary>Prompts the user on stdout and waits for a reply on stdin, using the specified prompt message.</summary>
-        ///
-        /// <param name="message">The message to prompt the user with.</param>
-        private void PromptAndWait(string message)
-        {
-            Console.WriteLine("\n" + message);
-            Console.ReadLine();
-        }
-
-        // <summary>Closes the test connection.</summary>
-        private void Stop()
-        {
-            _log.Debug("Stopping...");
-            try
-            {
-                _connection.Close();
-            }
-            catch (QpidException e)
-            {
-                _log.Debug("Failed to shutdown: ", e);
-            }
-        }
-
-        /// <summary>
-        /// Called when bytes have been transmitted to the server
-        /// </summary>
-        ///
-        /// <param>count the number of bytes sent in total since the connection was opened</param>     
-        public void BytesSent(long count) {}
-
-        /// <summary>
-        /// Called when some bytes have been received on a connection
-        /// </summary>
-        ///
-        /// <param>count the number of bytes received in total since the connection was opened</param>         
-        public void BytesReceived(long count) {}
-
-        /// <summary>
-        /// Called after the infrastructure has detected that failover is required but before attempting failover.
-        /// </summary>
-        ///
-        /// <param>redirect true if the broker requested redirect. false if failover is occurring due to a connection error.</param>
-        ///
-        /// <return>true to continue failing over, false to veto failover and raise a connection exception</return>         
-        public bool PreFailover(bool redirect) 
-        {
-            _log.Debug("public bool PreFailover(bool redirect): called");
-            return true; 
-        }
-
-        /// <summary>
-        /// Called after connection has been made to another broker after failover has been started but before
-        /// any resubscription has been done.
-        /// </summary>
-        ///
-        /// <return> true to continue with resubscription, false to prevent automatic resubscription. This is useful in
-        /// cases where the application wants to handle resubscription. Note that in the latter case all sessions, producers
-        /// and consumers are invalidated.
-        /// </return>
-        public bool PreResubscribe() 
-        {
-            _log.Debug("public bool PreResubscribe(): called");
-            return true; 
-        }
-
-        /// <summary>
-        /// Called once failover has completed successfully. This is called irrespective of whether the client has
-        /// vetoed automatic resubscription.
-        /// </summary>
-        public void FailoverComplete() 
-        {
-            _log.Debug("public void FailoverComplete(): called");
-        }
-    }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Runtime.InteropServices;
+using System.Threading;
+using log4net;
+using NUnit.Framework;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client;
+using Apache.Qpid.Messaging;
+
+namespace Apache.Qpid.Integration.Tests.interactive
+{
+    [TestFixture, Category("Interactive")]
+    public class FailoverTest : IConnectionListener
+    {
+        private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverTest));
+
+        /// <summary>Specifies the number of times to run the test cycle.</summary>
+        const int NUM_MESSAGES = 10;
+
+        /// <summary>Determines how many messages to send within each commit.</summary>
+        const int COMMIT_BATCH_SIZE = 1;
+
+        /// <summary>Specifies the duration of the pause to place between each message sent in the test.</summary>
+        //const int SLEEP_MILLIS = 1;
+
+        /// <summary>Specified the maximum time in milliseconds to wait for the test to complete.</summary>
+        const int TIMEOUT = 10000;
+
+        /// <summary>Defines the number of test messages to send, before prompting the user to fail a broker.</summary>
+        const int FAIL_POINT = 5;
+
+        /// <summary>Specified the ack mode to use for the test.</summary>
+        AcknowledgeMode _acknowledgeMode = AcknowledgeMode.AutoAcknowledge;
+
+        /// <summary>Determines whether this test runs transactionally or not. </summary>
+        bool transacted = false;
+
+        /// <summary>Holds the connection to run the test over.</summary>
+        AMQConnection _connection;
+
+        /// <summary>Holds the channel for the test message publisher. </summary>
+        IChannel publishingChannel;
+
+        /// <summary>Holds the test message publisher. </summary>
+        IMessagePublisher publisher;
+
+        /// <summary>Used to keep count of the number of messages sent. </summary>
+        int messagesSent;
+
+        /// <summary>Used to keep count of the number of messages received. </summary>
+        int messagesReceived;
+
+        /// <summary>Used to wait for test completion on. </summary>
+        private static object testComplete = new Object();
+
+        /// <summary>Used to wait for failover completion on. </summary>
+	private static object failoverComplete = new Object();
+	
+        bool failedOver=false;
+
+        /// <summary>Used to record the extra message count (1) if the message sent right after failover actually made it to the new broker.</summary>
+        int _extraMessage = 0;
+	
+        /// <summary>
+        /// Creates the test connection with a fail-over set up, and a producer/consumer pair on that connection.
+        /// </summary>
+        /// [SetUp]
+        public void Init(IConnectionInfo connectionInfo)
+        {
+	    //log4net.Config.BasicConfigurator.Configure();
+            // Reset all counts.
+            messagesSent = 0;
+            messagesReceived = 0;
+            failedOver=false;
+            _extraMessage = 0;
+
+	    PromptAndWait("Ensure both brokers are running, then press Enter");	    
+	    
+            // Create a connection for the test.
+            _connection = new AMQConnection(connectionInfo);
+            _connection.ConnectionListener = this;
+
+            // Create a consumer to receive the test messages.
+            IChannel receivingChannel = _connection.CreateChannel(false, _acknowledgeMode);
+
+            string queueName = receivingChannel.GenerateUniqueName();
+            receivingChannel.DeclareQueue(queueName, false, true, true);
+            receivingChannel.Bind(queueName, "amq.direct", queueName);
+
+            IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName)
+                .WithPrefetchLow(30)
+                .WithPrefetchHigh(60).Create();
+
+            consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
+            _connection.Start();
+
+            // Create a publisher to send the test messages.
+            publishingChannel = _connection.CreateChannel(transacted, AcknowledgeMode.NoAcknowledge);
+            publisher = publishingChannel.CreatePublisherBuilder()
+                .WithRoutingKey(queueName)
+                .Create();
+
+            _log.Debug("connection = " + _connection);
+            _log.Debug("connectionInfo = " + connectionInfo);
+            _log.Debug("connection.AsUrl = " + _connection.toURL());
+            _log.Debug("AcknowledgeMode is " + _acknowledgeMode);
+        }
+
+        /// <summary>
+        /// Clean up the test connection.
+        /// </summary>
+        [TearDown]
+        public virtual void Shutdown()
+        {
+ 	    if (!failedOver)
+	    {
+                 Assert.Fail("The failover callback never occured.");
+            }
+
+            Console.WriteLine("Test done shutting down");
+	    Thread.Sleep(2000);
+            _connection.Close();
+        }
+
+        /// <summary>
+        /// Runs a failover test, building up the connection information from its component parts. In particular the brokers
+        /// to fail between are seperately added into the connection info.
+        /// </summary>
+        /*[Test]
+        public void TestWithBasicInfo()
+        {
+            _log.Debug("public void TestWithBasicInfo(): called");
+
+            // Manually create the connection parameters.
+            QpidConnectionInfo connectionInfo = new QpidConnectionInfo();
+            connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false));
+            connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5673, false));
+
+            Init(connectionInfo);
+            DoFailoverTest();
+        }*/
+
+        /// <summary>
+        /// Runs a failover test, with the failover configuration specified in the Qpid connection URL format.
+        /// </summary>
+        [Test]
+        public void TestWithUrl()
+        {
+            _log.Debug("public void runTestWithUrl(): called");
+
+            // Parse the connection parameters from a URL.
+            String clientId = "failover" + DateTime.Now.Ticks;
+            string defaultUrl = "amqp://guest:guest@" + clientId + "/test" +
+                "?brokerlist='tcp://localhost:9672;tcp://localhost:9673'&failover='roundrobin'";            
+            IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(defaultUrl);
+            
+            Init(connectionInfo);
+            DoFailoverTest(0);
+        }
+
+        /// <summary>
+        /// Send the test messages, prompting at the fail point for the user to cause a broker failure. The test checks that all messages sent
+        /// are received within the test time limit.
+        /// </summary>
+        ///
+        /// <param name="connectionInfo">The connection parameters, specifying the brokers to fail between.</param>
+        void DoFailoverTest(int delay)
+        {
+            _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): called");
+
+            // Wait for all of the test messages to be received, checking that this occurs within the test time limit.
+	    bool withinTimeout = false;
+
+            for (int i = 1; i <= NUM_MESSAGES; ++i)
+            {
+		SendMessage();
+
+		// Prompt the user to cause a failure if at the fail point.
+		if (i == FAIL_POINT)
+		{
+		    for( int min = delay ; min > 0 ; min--)
+		    {
+		       Console.WriteLine("Waiting for "+min+" minutes to test connection time bug.");
+		       Thread.Sleep(60*1000);
+		    }
+
+		    PromptAndWait("Cause a broker failure now, then press return.");
+		    Console.WriteLine("NOTE: ensure that the delay between killing the broker and continuing here is less than 20 second");
+		    
+		    Console.WriteLine("Sending a message to ensure send right after works");
+
+		    SendMessage();
+
+		    Console.WriteLine("Waiting for fail-over to complete before continuing...");
+
+
+		    lock(failoverComplete)
+		    {
+			if (!failedOver)
+			{
+			    withinTimeout = Monitor.Wait(failoverComplete, TIMEOUT);
+			}
+			else
+			{
+			    withinTimeout=true;
+			}
+		    }
+
+		    if (!withinTimeout)
+		    {
+			PromptAndWait("Failover has not yet occured. Press enter to give up waiting.");
+		    }
+		}
+	    }
+
+            lock(testComplete)
+            {
+                withinTimeout = Monitor.Wait(testComplete, TIMEOUT);
+            }            
+
+            if (!withinTimeout)
+            {
+                Assert.Fail("Test timed out, before all messages received.");
+            }
+
+            _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): exiting");
+        }
+
+	[Test]
+        public void Test5MinuteWait()
+	{
+	    String clientId = "failover" + DateTime.Now.Ticks;
+
+	    QpidConnectionInfo connectionInfo = new QpidConnectionInfo();
+	    connectionInfo.Username = "guest";
+	    connectionInfo.Password = "guest";
+	    connectionInfo.ClientName = clientId;
+	    connectionInfo.VirtualHost = "/test";
+	    connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 9672, false));
+	    connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 9673, false));
+	    
+	    Init(connectionInfo);
+	    DoFailoverTest(5);
+	}
+
+	void SendMessage()
+	{
+	    ITextMessage msg = publishingChannel.CreateTextMessage("message=" + messagesSent);
+
+	    publisher.Send(msg);
+	    messagesSent++;
+
+	    if (transacted)
+	    {
+		publishingChannel.Commit();
+	    }
+	    
+	    Console.WriteLine("messagesSent = " + messagesSent);
+	}
+	
+        /// <summary>
+        /// Receives all of the test messages.
+        /// </summary>
+        ///
+        /// <param name="message">The newly arrived test message.</param>
+        public void OnMessage(IMessage message)
+        {
+            try
+            {
+                if (_acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
+                {
+                    message.Acknowledge();
+                }
+
+                messagesReceived++;
+
+                _log.Debug("messagesReceived = " + messagesReceived);
+
+                // Check if all of the messages in the test have been received, in which case notify the message producer that the test has 
+                // succesfully completed.
+                if (messagesReceived == NUM_MESSAGES + _extraMessage)
+                {
+                    lock (testComplete)
+                    {
+			failedOver = true;
+                        Monitor.Pulse(testComplete);
+                    }
+                }
+            }
+            catch (QpidException e)
+            {
+                _log.Fatal("Exception received. About to stop.", e);
+                Stop();
+            }
+        }
+
+        /// <summary>Prompts the user on stdout and waits for a reply on stdin, using the specified prompt message.</summary>
+        ///
+        /// <param name="message">The message to prompt the user with.</param>
+        private void PromptAndWait(string message)
+        {
+            Console.WriteLine("\n" + message);
+            Console.ReadLine();
+        }
+
+        // <summary>Closes the test connection.</summary>
+        private void Stop()
+        {
+            _log.Debug("Stopping...");
+            try
+            {
+                _connection.Close();
+            }
+            catch (QpidException e)
+            {
+                _log.Debug("Failed to shutdown: ", e);
+            }
+        }
+
+        /// <summary>
+        /// Called when bytes have been transmitted to the server
+        /// </summary>
+        ///
+        /// <param>count the number of bytes sent in total since the connection was opened</param>     
+        public void BytesSent(long count) {}
+
+        /// <summary>
+        /// Called when some bytes have been received on a connection
+        /// </summary>
+        ///
+        /// <param>count the number of bytes received in total since the connection was opened</param>         
+        public void BytesReceived(long count) {}
+
+        /// <summary>
+        /// Called after the infrastructure has detected that failover is required but before attempting failover.
+        /// </summary>
+        ///
+        /// <param>redirect true if the broker requested redirect. false if failover is occurring due to a connection error.</param>
+        ///
+        /// <return>true to continue failing over, false to veto failover and raise a connection exception</return>         
+        public bool PreFailover(bool redirect) 
+        {
+            _log.Debug("public bool PreFailover(bool redirect): called");
+            return true; 
+        }
+
+        /// <summary>
+        /// Called after connection has been made to another broker after failover has been started but before
+        /// any resubscription has been done.
+        /// </summary>
+        ///
+        /// <return> true to continue with resubscription, false to prevent automatic resubscription. This is useful in
+        /// cases where the application wants to handle resubscription. Note that in the latter case all sessions, producers
+        /// and consumers are invalidated.
+        /// </return>
+        public bool PreResubscribe() 
+        {
+            _log.Debug("public bool PreResubscribe(): called");
+            return true; 
+        }
+
+        /// <summary>
+        /// Called once failover has completed successfully. This is called irrespective of whether the client has
+        /// vetoed automatic resubscription.
+        /// </summary>
+        public void FailoverComplete() 
+        {
+	    failedOver = true;
+            _log.Debug("public void FailoverComplete(): called");
+	    Console.WriteLine("public void FailoverComplete(): called");
+	    lock (failoverComplete) 
+	    {
+	      Monitor.Pulse(failoverComplete);
+	    }
+        }
+    }
+}

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs?rev=654113&r1=654112&r2=654113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs Wed May  7 07:09:16 2008
@@ -41,7 +41,7 @@
         private const string MESSAGE_DATA_BYTES = "-- Test Message -- Test Message -- Test Message -- Test Message -- Test Message ";
 
         /// <summary> The default timeout in milliseconds to use on receives. </summary>
-        private const long RECEIVE_WAIT = 500;
+        private const long RECEIVE_WAIT = 2000;
 
         /// <summary> The default AMQ connection URL to use for tests. </summary>
         public const string connectionUri = "amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672'";
@@ -55,6 +55,9 @@
         /// <summary> Holds an array of channels for building mutiple test end-points. </summary>
         protected IChannel[] testChannel = new IChannel[10];
 
+         /// <summary> Holds an array of queues for building mutiple test end-points. </summary>
+        protected String[] testQueue = new String[10];
+        
         /// <summary> Holds an array of producers for building mutiple test end-points. </summary>
         protected IMessagePublisher[] testProducer = new IMessagePublisher[10];
 
@@ -65,7 +68,7 @@
         private static int uniqueId = 0;
 
         /// <summary> Used to hold unique ids per test. </summary>
-        protected int testId;
+        protected Guid testId;
 
         /// <summary> Creates the test connection and channel. </summary>
         [SetUp]
@@ -74,7 +77,7 @@
             log.Debug("public virtual void Init(): called");
 
             // Set up a unique id for this test.
-            testId = uniqueId++;
+            testId = System.Guid.NewGuid();
         }
 
         /// <summary>
@@ -144,6 +147,10 @@
 
                     if (declareBind)
                     {
+                    	if (durable) 
+                    	{
+                    		testQueue[n] = queueName;
+                    	}
                         testChannel[n].DeclareQueue(queueName, durable, true, true);
                         testChannel[n].Bind(queueName, exchangeName, routingKey);
                     }
@@ -167,6 +174,10 @@
 
             if (testConsumer[n] != null)
             {
+            	if (testQueue[n] != null)
+            	{
+            		testChannel[n].DeleteQueue(testQueue[n], false, false, true);
+            	}
                 testConsumer[n].Close();
                 testConsumer[n].Dispose();
                 testConsumer[n] = null;

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs?rev=654113&r1=654112&r2=654113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs Wed May  7 07:09:16 2008
@@ -127,7 +127,7 @@
                 .WithRoutingKey(_routingKey)
                 .Create();
             _logger.Info("Publisher created...");
-            SendTestMessage("Message 1");
+            SendTestMessage("DeleteNonEmptyQueue Message 1");
 
             try
             {
@@ -165,8 +165,8 @@
                 .Create();
             _logger.Info("Publisher created...");
 
-            SendTestMessage("Message 1");
-            SendTestMessage("Message 2");
+            SendTestMessage("DeleteQueueWithResponse Message 1");
+            SendTestMessage("DeleteQueueWithResponse Message 2");
             
             // delete the queue, the server must respond
             _channel.DeleteQueue(_queueName, false, false, false);

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs?rev=654113&r1=654112&r2=654113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs Wed May  7 07:09:16 2008
@@ -94,11 +94,11 @@
         public void TestCommittedSendReceived() 
         {
             // Send messages.
-            testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+            testProducer[0].Send(testChannel[0].CreateTextMessage("B"));
             testChannel[0].Commit();
 
             // Try to receive messages.
-            ConsumeNMessagesOnly(1, "A", testConsumer[1]);
+            ConsumeNMessagesOnly(1, "B", testConsumer[1]);
             testChannel[1].Commit();
         }
 
@@ -107,11 +107,11 @@
         public void TestRolledBackSendNotReceived()
         {
             // Send messages.
-            testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+            testProducer[0].Send(testChannel[0].CreateTextMessage("B"));
             testChannel[0].Rollback();
 
             // Try to receive messages.
-            ConsumeNMessagesOnly(0, "A", testConsumer[1]);
+            ConsumeNMessagesOnly(0, "B", testConsumer[1]);
             testChannel[1].Commit();
         }
 
@@ -124,17 +124,17 @@
                           true, false, null);
 
             // Send messages.
-            testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+            testProducer[0].Send(testChannel[0].CreateTextMessage("C"));
             testChannel[0].Commit();
 
             // Try to receive messages.
-            ConsumeNMessagesOnly(1, "A", testConsumer[1]);
+            ConsumeNMessagesOnly(1, "C", testConsumer[1]);
 
             // Close end-point 1 without committing the message, then re-open to consume again.
             CloseEndPoint(1);
 
             // Check that the message was released from the rolled back end-point an can be received on the alternative one instead.
-            ConsumeNMessagesOnly(1, "A", testConsumer[2]);
+            ConsumeNMessagesOnly(1, "C", testConsumer[2]);
 
             CloseEndPoint(2);
         }
@@ -144,38 +144,33 @@
         public void TestCommittedReceiveNotRereceived() 
         {
             // Send messages.
-            testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+            testProducer[0].Send(testChannel[0].CreateTextMessage("D"));
             testChannel[0].Commit();
 
             // Try to receive messages.
-            ConsumeNMessagesOnly(1, "A", testConsumer[1]);
+            ConsumeNMessagesOnly(1, "D", testConsumer[1]);
             testChannel[1].Commit();
 
             // Try to receive messages.
-            ConsumeNMessagesOnly(0, "A", testConsumer[1]);
+            ConsumeNMessagesOnly(0, "D", testConsumer[1]);
         }
 
         /// <summary> Check that a rolled back receive can be re-received. </summary>
         [Test]
         public void TestRolledBackReceiveCanBeRereceived() 
         {
-            // Create a third end-point as an alternative delivery route for the message.
-            SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, 
-                          true, false, null);
-
             // Send messages.
-            testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+            testProducer[0].Send(testChannel[0].CreateTextMessage("E"));
             testChannel[0].Commit();
             
             // Try to receive messages.
-            ConsumeNMessagesOnly(1, "A", testConsumer[1]);
+            ConsumeNMessagesOnly(1, "E", testConsumer[1]);
 
             testChannel[1].Rollback();
 
             // Try to receive messages.
-            ConsumeNMessagesOnly(1, "A", testConsumer[2]);
-
-            CloseEndPoint(2);
+            ConsumeNMessagesOnly(1, "E", testConsumer[1]);
+            
         }
     }
-}
\ No newline at end of file
+}

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs?rev=654113&r1=654112&r2=654113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs Wed May  7 07:09:16 2008
@@ -99,7 +99,7 @@
                           true, "TestSubscription" + testId);
 
             ConsumeNMessagesOnly(1, "B", testConsumer[2]);
-
+			
             // Clean up any open consumers at the end of the test.
             CloseEndPoint(2);
             CloseEndPoint(1);
@@ -113,23 +113,23 @@
             SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
                           true, false, null);
             SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC, 
-                          true, false, null);
+                          true, true, "foo"+testId);
 
             // Send messages.
-            testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+            testProducer[0].Send(testChannel[0].CreateTextMessage("C"));
             testChannel[0].Commit();
 
             // Try to receive messages, but don't commit them.
-            ConsumeNMessagesOnly(1, "A", testConsumer[1]);
+            ConsumeNMessagesOnly(1, "C", testConsumer[1]);
 
             // Close end-point 1 without committing the message, then re-open the subscription to consume again.
             CloseEndPoint(1);
-            SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, 
-                          true, false, null);
+            SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC, 
+                          true, true, "foo"+testId);
 
             // Check that the message was released from the rolled back end-point an can be received on the alternative one instead.
-            ConsumeNMessagesOnly(1, "A", testConsumer[1]);
-
+            ConsumeNMessagesOnly(1, "C", testConsumer[1]);
+			testChannel[1].Commit();
             CloseEndPoint(1);
             CloseEndPoint(0);
         }
@@ -141,24 +141,24 @@
             SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC, 
                           true, false, null);
             SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC, 
-                          true, false, null);
+                          true, true, "foo"+testId);
 
             // Send messages.
-            testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+            testProducer[0].Send(testChannel[0].CreateTextMessage("D"));
             testChannel[0].Commit();
             
             // Try to receive messages, but roll them back.
-            ConsumeNMessagesOnly(1, "A", testConsumer[1]);
+            ConsumeNMessagesOnly(1, "D", testConsumer[1]);
             testChannel[1].Rollback();
 
             // Close end-point 1 without committing the message, then re-open the subscription to consume again.
             CloseEndPoint(1);
-            SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, 
-                          true, false, null);
+            SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC, 
+                          true, true, "foo"+testId);
 
             // Check that the message was released from the rolled back end-point an can be received on the alternative one instead.
-            ConsumeNMessagesOnly(1, "A", testConsumer[1]);
-
+            ConsumeNMessagesOnly(1, "D", testConsumer[1]);
+			testChannel[1].Commit();
             CloseEndPoint(1);
             CloseEndPoint(0);
         }

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs?rev=654113&r1=654112&r2=654113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs Wed May  7 07:09:16 2008
@@ -50,7 +50,7 @@
         private static ILog _logger = LogManager.GetLogger(typeof(HeadersExchangeTest));
 
         /// <summary> Holds the default test timeout for broker communications before tests give up. </summary>
-        private static readonly int TIMEOUT = 1000;
+        private static readonly int TIMEOUT = 2000;
 
         /// <summary> Holds the name of the headers exchange to create to send test messages on. </summary>
         private string _exchangeName = "ServiceQ1";

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs?rev=654113&r1=654112&r2=654113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs Wed May  7 07:09:16 2008
@@ -117,7 +117,7 @@
                 testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
             }
 
-            _finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 10), false);
+            _finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 30), false);
 
             // Check that all messages really were received.
             Assert.IsTrue(allReceived, "All messages were not received, only got " + _messageReceivedCount + " but wanted " + expectedMessageCount);
@@ -139,14 +139,14 @@
             SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, false, ExchangeNameDefaults.DIRECT,
                           true, false, null);
 
-            expectedMessageCount = MESSAGE_COUNT;
+            expectedMessageCount = (MESSAGE_COUNT * CONSUMER_COUNT);
 
             for (int i = 0; i < MESSAGE_COUNT; i++)
             {
                 testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
             }
 
-            _finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 10), false);
+            _finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 30), false);
 
             // Check that all messages really were received.
             Assert.IsTrue(allReceived, "All messages were not received, only got: " + _messageReceivedCount + " but wanted " + expectedMessageCount);

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs?rev=654113&r1=654112&r2=654113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs Wed May  7 07:09:16 2008
@@ -39,7 +39,7 @@
         /// Make a test TLS connection to the broker
         /// without using client-certificates
         /// </summary>
-        [Test]
+        //[Test]
         public void DoSslConnection()
         {
             // because for tests we don't usually trust the server certificate