You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/05/09 15:53:51 UTC

svn commit: r654818 - in /incubator/qpid/branches/M2.1.x: ./ dotnet/Qpid.Client.Transport.Socket.Blocking/ dotnet/Qpid.Client/Client/ dotnet/Qpid.Client/Client/Protocol/ dotnet/Qpid.Integration.Tests/interactive/ dotnet/Qpid.Integration.Tests/testcases...

Author: aidan
Date: Fri May  9 06:53:51 2008
New Revision: 654818

URL: http://svn.apache.org/viewvc?rev=654818&view=rev
Log:
Merged revisions 652388-653415,653417-654109 via svnmerge from 
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.x

........
  r652388 | ritchiem | 2008-04-30 15:40:18 +0100 (Wed, 30 Apr 2008) | 2 lines
  
  QPID-889 : Removed _reapingStoreContext from CSDM replaced with local StoreContext()s so they are not reused by different threads.
........
  r652389 | ritchiem | 2008-04-30 15:40:45 +0100 (Wed, 30 Apr 2008) | 1 line
  
  QPID-887 : Renamed QueueHouseKeeping threads so they can be identified in thread dump. Named Queue-housekeeping-<virtualhost name>
........
  r652399 | ritchiem | 2008-04-30 16:32:42 +0100 (Wed, 30 Apr 2008) | 1 line
  
  QPID-888,QPID-886 : Fixed all management uses of _lock.lock / _lock.unlock so that they correctly call unlock from a finally block in the CSDM. There are two issues that cover that. QPID-888 - Fix the management ones and QPID-886 to fix the use in removeExpired.
........
  r652567 | aidan | 2008-05-01 17:32:20 +0100 (Thu, 01 May 2008) | 1 line
  
  QPID-994 Dont wait for attain state as connection is closed by we get CloseOk
........
  r652568 | aidan | 2008-05-01 17:35:09 +0100 (Thu, 01 May 2008) | 1 line
  
  QPID-1001 dont set the expiration time if TTL is 0
........
  r653447 | aidan | 2008-05-05 13:26:29 +0100 (Mon, 05 May 2008) | 1 line
  
  Check  if consumer is closed and dont reclose it 
........
  r653451 | aidan | 2008-05-05 13:29:15 +0100 (Mon, 05 May 2008) | 1 line
  
  QPID-1022 Use synchronous writes to fix race conditions
........
  r653452 | aidan | 2008-05-05 13:30:45 +0100 (Mon, 05 May 2008) | 1 line
  
  QPID-1023 increase some timeouts
........
  r653760 | aidan | 2008-05-06 13:40:34 +0100 (Tue, 06 May 2008) | 3 lines
  
  QPID-1029: Generate temporary queue names using GUIDs to ensure uniqueness.
........
  r654097 | aidan | 2008-05-07 14:25:38 +0100 (Wed, 07 May 2008) | 2 lines
  
  QPID-952, QPID-951, QPID-1032 Fix failover, ensure that it is properly detected, that frames are replayed approrpiately and that failover does not timeout.
........
  r654104 | aidan | 2008-05-07 14:46:51 +0100 (Wed, 07 May 2008) | 1 line
  
  QPID-952 should have been part of previous commit
........
  r654109 | aidan | 2008-05-07 14:56:09 +0100 (Wed, 07 May 2008) | 2 lines
  
  QPID-1036 increase timeouts to more reasonable levels, ensure that durable queues are deleted when no longer needed
........

Added:
    incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/Qpid.Integration.Tests.csproj
      - copied unchanged from r654109, incubator/qpid/branches/M2.x/dotnet/Qpid.Integration.Tests/testcases/Qpid.Integration.Tests.csproj
Modified:
    incubator/qpid/branches/M2.1.x/   (props changed)
    incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client.Transport.Socket.Blocking/BlockingSocketProcessor.cs
    incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/AMQConnection.cs
    incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/AmqChannel.cs
    incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
    incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
    incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs
    incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs
    incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs
    incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs
    incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs
    incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs
    incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs
    incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs
    incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs
    incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java

Propchange: incubator/qpid/branches/M2.1.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client.Transport.Socket.Blocking/BlockingSocketProcessor.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client.Transport.Socket.Blocking/BlockingSocketProcessor.cs?rev=654818&r1=654817&r2=654818&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client.Transport.Socket.Blocking/BlockingSocketProcessor.cs (original)
+++ incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client.Transport.Socket.Blocking/BlockingSocketProcessor.cs Fri May  9 06:53:51 2008
@@ -53,6 +53,11 @@
         {
             _socket = new System.Net.Sockets.Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
 
+	    /// For future note TCP Set NoDelay options may help, though with the blocking io not sure
+	    /// The Don't linger may help with detecting disconnect but that hasn't been the case in testing.
+	    /// _socket.SetSocketOption (SocketOptionLevel.Socket, SocketOptionName.NoDelay, 0);
+	    /// _socket.SetSocketOption (SocketOptionLevel.Socket, SocketOptionName.DontLinger, 0);
+	    
             IPHostEntry ipHostInfo = Dns.Resolve(_host); // Note: don't fix this warning. We do this for .NET 1.1 compatibility.
             IPAddress ipAddress = ipHostInfo.AddressList[0];
 
@@ -77,6 +82,8 @@
             {
                 _log.Error("Write caused exception", e);
                 _protocolListener.OnException(e);
+                // We should provide the error synchronously as we are doing blocking io.
+                throw e;
             }
         }
 
@@ -87,6 +94,17 @@
             
             int numOctets = _networkStream.Read(bytes, 0, bytes.Length);
 
+	    /// Read only returns 0 if the socket has been gracefully shutdown.
+	    /// http://msdn2.microsoft.com/en-us/library/system.net.sockets.networkstream.read(VS.71).aspx            
+	    /// We can use this to block Send so the next Read will force an exception forcing failover.
+	    /// Otherwise we need to wait ~20 seconds for the NetworkStream/Socket code to realise that
+	    /// the socket has been closed.
+	    if (numOctets == 0)
+	    {              
+		_socket.Shutdown(SocketShutdown.Send);
+		_socket.Close();
+	    }
+             
             ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
             byteBuffer.limit(numOctets);
             

Modified: incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/AMQConnection.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/AMQConnection.cs?rev=654818&r1=654817&r2=654818&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/AMQConnection.cs (original)
+++ incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/AMQConnection.cs Fri May  9 06:53:51 2008
@@ -69,7 +69,7 @@
 
         internal bool IsFailoverAllowed
         {
-            get { return _failoverPolicy.FailoverAllowed(); }
+            get { if(!_connected) return false; else return _failoverPolicy.FailoverAllowed(); }
         }
 
         /// <summary>
@@ -151,34 +151,22 @@
                     _log.Error("Unable to connect to broker " + _failoverPolicy.GetCurrentBrokerInfo(), e);
                     // XXX: Should perhaps break out of the do/while here if not a SocketException...
                 }
-            } while (_failoverPolicy.FailoverAllowed());
+            } while (!_connected && _failoverPolicy.FailoverAllowed());
 
             _log.Debug("Are we connected:" + _connected);
-            
-            if (!_failoverPolicy.FailoverAllowed())
-            {
-                if ( lastException is AMQException )
-                   throw lastException;
-                else
-                   throw new AMQConnectionException("Unable to connect", lastException);
-            }
 
-            // TODO: this needs to be redone so that we are not spinning.
-            // A suitable object should be set that is then waited on
-            // and only notified when a connection is made or when
-            // the AMQConnection gets closed.
-            while (!_connected && !Closed)
+            if (!_connected)
             {
-                _log.Debug("Sleeping.");
-                Thread.Sleep(100);
-            }
-            if (!_failoverPolicy.FailoverAllowed() || _failoverPolicy.GetCurrentBrokerInfo() == null)
-            {
-                if (_lastAMQException != null)
-                {
-                    throw _lastAMQException;
-                }
+            	if ( lastException is AMQException )
+            	{
+            		throw lastException;
+            	}
+            	else
+            	{
+            		throw new AMQConnectionException("Unable to connect", lastException);
+            	}
             }
+            
         }
 
         /*private ITransport LoadTransportFromAssembly(string host, int port, String assemblyName, String transportType)
@@ -263,7 +251,6 @@
 
             _log.Debug("Blocking for connection close ok frame");
 
-            _stateManager.AttainState(AMQState.CONNECTION_CLOSED);
             Disconnect();
         }
 

Modified: incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/AmqChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/AmqChannel.cs?rev=654818&r1=654817&r2=654818&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/AmqChannel.cs (original)
+++ incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/AmqChannel.cs Fri May  9 06:53:51 2008
@@ -888,10 +888,14 @@
         /// <param name="consumer"></param>
         private void RegisterConsumer(BasicMessageConsumer consumer)
         {
+            // Need to generate a consumer tag on the client so we can exploit the nowait flag.
+            String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++);
+            consumer.ConsumerTag = tag;
+            _consumers.Add(tag, consumer);
+            
             String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal,
-                                                  consumer.Exclusive, consumer.AcknowledgeMode);
-            consumer.ConsumerTag = consumerTag;
-            _consumers.Add(consumerTag, consumer);
+                                                  consumer.Exclusive, consumer.AcknowledgeMode, tag);
+            
         }
 
         internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args)
@@ -902,19 +906,21 @@
 
             AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0,
                                                               queueName, exchangeName,
-                                                              routingKey, true, args);
-            _replayFrames.Add(queueBind);
+                                                              routingKey, false, args);
+            
 
             lock (_connection.FailoverMutex)
             {
-                _connection.ProtocolWriter.Write(queueBind);
+                _connection.ConvenientProtocolWriter.SyncWrite(queueBind, typeof(QueueBindOkBody));
             }
+            // AS FIXME: wasnae me
+            _replayFrames.Add(QueueBindBody.CreateAMQFrame(_channelId, 0,
+                                                           queueName, exchangeName,
+                                                           routingKey, true, args));
         }
 
-        private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode)
+        private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode, String tag)
         {
-            // Need to generate a consumer tag on the client so we can exploit the nowait flag.
-            String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++);
             
             AMQFrame basicConsume = BasicConsumeBody.CreateAMQFrame(_channelId, 0,
                                                                     queueName, tag, noLocal,
@@ -934,9 +940,7 @@
                 _logger.Debug(string.Format("DeleteQueue name={0}", queueName));
                 
                 AMQFrame queueDelete = QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, noWait);
-
-                _replayFrames.Add(queueDelete);
-
+                
                 if (noWait)
                 {
                     _connection.ProtocolWriter.Write(queueDelete);
@@ -945,6 +949,8 @@
                 {
                     _connection.ConvenientProtocolWriter.SyncWrite(queueDelete, typeof(QueueDeleteOkBody));
                 }
+                // AS FIXME: wasnae me
+                _replayFrames.Add(QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, true));
             }
             catch (AMQException)
             {
@@ -958,14 +964,16 @@
                                         queueName, isDurable, isExclusive, isAutoDelete));
 
             AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive,
-                                                                    isAutoDelete, true, null);
+                                                                    isAutoDelete, false, null);
 
-            _replayFrames.Add(queueDeclare);
 
             lock (_connection.FailoverMutex)
             {
-                _connection.ProtocolWriter.Write(queueDeclare);
+                _connection.ConvenientProtocolWriter.SyncWrite(queueDeclare, typeof(QueueDeclareOkBody));
             }
+            // AS FIXME: wasnae me
+            _replayFrames.Add(QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive,
+                                                                    isAutoDelete, true, null));
         }
 
         // AMQP-level method.
@@ -978,8 +986,6 @@
 
             AMQFrame declareExchange = ExchangeDeclareBody.CreateAMQFrame(channelId, ticket, exchangeName, exchangeClass, passive, 
                                                                           durable, autoDelete, xinternal, noWait, args);
-
-            _replayFrames.Add(declareExchange);
             
             if (noWait)
             {
@@ -987,6 +993,8 @@
                 {
                     _connection.ProtocolWriter.Write(declareExchange);
                 }
+                // AS FIXME: wasnae me
+            	_replayFrames.Add(declareExchange);
             }
             else
             {

Modified: incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs?rev=654818&r1=654817&r2=654818&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs (original)
+++ incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs Fri May  9 06:53:51 2008
@@ -266,7 +266,11 @@
 
         public override void Close()
         {
-            // FIXME: Don't we need FailoverSupport here (as we have SyncWrite). i.e. rather than just locking FailOverMutex
+        	if (_closed == CLOSED) 
+        	{
+        		return;        		
+        	}
+        	// FIXME: Don't we need FailoverSupport here (as we have SyncWrite). i.e. rather than just locking FailOverMutex
             lock (_channel.Connection.FailoverMutex)
             {
                 lock (_closingLock)

Modified: incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/BasicMessageProducer.cs?rev=654818&r1=654817&r2=654818&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/BasicMessageProducer.cs (original)
+++ incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/BasicMessageProducer.cs Fri May  9 06:53:51 2008
@@ -306,7 +306,10 @@
          if ( !_disableTimestamps )
          {
             message.Timestamp = DateTime.UtcNow.Ticks;
-            message.Expiration = message.Timestamp + timeToLive;
+            if (timeToLive != 0)
+            {
+                message.Expiration = message.Timestamp + timeToLive;
+            }
          } else
          {
             message.Expiration = 0;

Modified: incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs?rev=654818&r1=654817&r2=654818&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs (original)
+++ incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs Fri May  9 06:53:51 2008
@@ -35,12 +35,6 @@
         private readonly IProtocolWriter _protocolWriter;
         private readonly IConnectionCloser _connectionCloser;
 
-        /**
-         * Counter to ensure unique queue names
-         */
-        private int _queueId = 1;
-        private readonly Object _queueIdLock = new Object();
-
         /// <summary>
         /// Maps from the channel id to the AmqChannel that it represents.
         /// </summary>
@@ -267,13 +261,7 @@
 
         internal string GenerateQueueName()
         {
-            int id;
-            lock(_queueIdLock)
-            {
-                id = _queueId++;
-            }
-
-            return "tmp_" + _connection.Transport.LocalEndpoint + "_" + id;          
+        	return "ntmp_" + System.Guid.NewGuid();
         }
     }
 }

Modified: incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs?rev=654818&r1=654817&r2=654818&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs (original)
+++ incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs Fri May  9 06:53:51 2008
@@ -1,320 +1,397 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Runtime.InteropServices;
-using System.Threading;
-using log4net;
-using NUnit.Framework;
-using Apache.Qpid.Client.Qms;
-using Apache.Qpid.Client;
-using Apache.Qpid.Messaging;
-
-namespace Apache.Qpid.Integration.Tests.interactive
-{
-    [TestFixture, Category("Interactive")]
-    public class FailoverTest : IConnectionListener
-    {
-        private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverTest));
-
-        /// <summary>Specifies the number of times to run the test cycle.</summary>
-        const int NUM_MESSAGES = 10;
-
-        /// <summary>Determines how many messages to send within each commit.</summary>
-        const int COMMIT_BATCH_SIZE = 1;
-
-        /// <summary>Specifies the duration of the pause to place between each message sent in the test.</summary>
-        //const int SLEEP_MILLIS = 1;
-
-        /// <summary>Specified the maximum time in milliseconds to wait for the test to complete.</summary>
-        const int TIMEOUT = 10000;
-
-        /// <summary>Defines the number of test messages to send, before prompting the user to fail a broker.</summary>
-        const int FAIL_POINT = 5;
-
-        /// <summary>Specified the ack mode to use for the test.</summary>
-        AcknowledgeMode _acknowledgeMode = AcknowledgeMode.AutoAcknowledge;
-
-        /// <summary>Determines whether this test runs transactionally or not. </summary>
-        bool transacted = false;
-
-        /// <summary>Holds the connection to run the test over.</summary>
-        AMQConnection _connection;
-
-        /// <summary>Holds the channel for the test message publisher. </summary>
-        IChannel publishingChannel;
-
-        /// <summary>Holds the test message publisher. </summary>
-        IMessagePublisher publisher;
-
-        /// <summary>Used to keep count of the number of messages sent. </summary>
-        int messagesSent;
-
-        /// <summary>Used to keep count of the number of messages received. </summary>
-        int messagesReceived;
-
-        /// <summary>Used to wait for test completion on. </summary>
-        private static object testComplete = new Object();
-
-        /// <summary>
-        /// Creates the test connection with a fail-over set up, and a producer/consumer pair on that connection.
-        /// </summary>
-        /// [SetUp]
-        public void Init(IConnectionInfo connectionInfo)
-        {
-            // Reset all counts.
-            messagesSent = 0;
-            messagesReceived = 0;
-
-            // Create a connection for the test.
-            _connection = new AMQConnection(connectionInfo);
-            _connection.ConnectionListener = this;
-
-            // Create a consumer to receive the test messages.
-            IChannel receivingChannel = _connection.CreateChannel(false, _acknowledgeMode);
-
-            string queueName = receivingChannel.GenerateUniqueName();
-            receivingChannel.DeclareQueue(queueName, false, true, true);
-            receivingChannel.Bind(queueName, "amq.direct", queueName);
-
-            IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName)
-                .WithPrefetchLow(30)
-                .WithPrefetchHigh(60).Create();
-
-            consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
-            _connection.Start();
-
-            // Create a publisher to send the test messages.
-            publishingChannel = _connection.CreateChannel(transacted, AcknowledgeMode.NoAcknowledge);
-            publisher = publishingChannel.CreatePublisherBuilder()
-                .WithRoutingKey(queueName)
-                .Create();
-
-            _log.Debug("connection = " + _connection);
-            _log.Debug("connectionInfo = " + connectionInfo);
-            _log.Debug("connection.AsUrl = " + _connection.toURL());
-            _log.Debug("AcknowledgeMode is " + _acknowledgeMode);
-        }
-
-        /// <summary>
-        /// Clean up the test connection.
-        /// </summary>
-        [TearDown]
-        public virtual void Shutdown()
-        {
-            Thread.Sleep(2000);
-            _connection.Close();
-        }
-
-        /// <summary>
-        /// Runs a failover test, building up the connection information from its component parts. In particular the brokers
-        /// to fail between are seperately added into the connection info.
-        /// </summary>
-        /*[Test]
-        public void TestWithBasicInfo()
-        {
-            _log.Debug("public void TestWithBasicInfo(): called");
-
-            // Manually create the connection parameters.
-            QpidConnectionInfo connectionInfo = new QpidConnectionInfo();
-            connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false));
-            connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5673, false));
-
-            Init(connectionInfo);
-            DoFailoverTest();
-        }*/
-
-        /// <summary>
-        /// Runs a failover test, with the failover configuration specified in the Qpid connection URL format.
-        /// </summary>
-        [Test]
-        public void TestWithUrl()
-        {
-            _log.Debug("public void runTestWithUrl(): called");
-
-            // Parse the connection parameters from a URL.
-            String clientId = "failover" + DateTime.Now.Ticks;
-            string defaultUrl = "amqp://guest:guest@" + clientId + "/test" +
-                "?brokerlist='tcp://localhost:5672;tcp://localhost:5673'&failover='roundrobin'";            
-            IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(defaultUrl);
-            
-            Init(connectionInfo);
-            DoFailoverTest();
-        }
-
-        /// <summary>
-        /// Send the test messages, prompting at the fail point for the user to cause a broker failure. The test checks that all messages sent
-        /// are received within the test time limit.
-        /// </summary>
-        ///
-        /// <param name="connectionInfo">The connection parameters, specifying the brokers to fail between.</param>
-        void DoFailoverTest()
-        {
-            _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): called");
-
-            for (int i = 1; i <= NUM_MESSAGES; ++i)
-            {
-                ITextMessage msg = publishingChannel.CreateTextMessage("message=" + messagesSent);
-                //_log.Debug("sending message = " + msg.Text);
-                publisher.Send(msg);
-                messagesSent++;
-
-                _log.Debug("messagesSent = " + messagesSent);
-
-                if (transacted)
-                {
-                    publishingChannel.Commit();
-                }
-
-                // Prompt the user to cause a failure if at the fail point.
-                if (i == FAIL_POINT)
-                {
-                    PromptAndWait("Cause a broker failure now, then press return...");
-                }
-
-                //Thread.Sleep(SLEEP_MILLIS);               
-            }
-
-            // Wait for all of the test messages to be received, checking that this occurs within the test time limit.
-            bool withinTimeout;
-
-            lock(testComplete)
-            {
-                withinTimeout = Monitor.Wait(testComplete, TIMEOUT);
-            }            
-
-            if (!withinTimeout)
-            {
-                Assert.Fail("Test timed out, before all messages received.");
-            }
-
-            _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): exiting");
-        }
-
-        /// <summary>
-        /// Receives all of the test messages.
-        /// </summary>
-        ///
-        /// <param name="message">The newly arrived test message.</param>
-        public void OnMessage(IMessage message)
-        {
-            try
-            {
-                if (_acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
-                {
-                    message.Acknowledge();
-                }
-
-                messagesReceived++;
-
-                _log.Debug("messagesReceived = " + messagesReceived);
-
-                // Check if all of the messages in the test have been received, in which case notify the message producer that the test has 
-                // succesfully completed.
-                if (messagesReceived == NUM_MESSAGES)
-                {
-                    lock (testComplete)
-                    {
-                        Monitor.Pulse(testComplete);
-                    }
-                }
-            }
-            catch (QpidException e)
-            {
-                _log.Fatal("Exception received. About to stop.", e);
-                Stop();
-            }
-        }
-
-        /// <summary>Prompts the user on stdout and waits for a reply on stdin, using the specified prompt message.</summary>
-        ///
-        /// <param name="message">The message to prompt the user with.</param>
-        private void PromptAndWait(string message)
-        {
-            Console.WriteLine("\n" + message);
-            Console.ReadLine();
-        }
-
-        // <summary>Closes the test connection.</summary>
-        private void Stop()
-        {
-            _log.Debug("Stopping...");
-            try
-            {
-                _connection.Close();
-            }
-            catch (QpidException e)
-            {
-                _log.Debug("Failed to shutdown: ", e);
-            }
-        }
-
-        /// <summary>
-        /// Called when bytes have been transmitted to the server
-        /// </summary>
-        ///
-        /// <param>count the number of bytes sent in total since the connection was opened</param>     
-        public void BytesSent(long count) {}
-
-        /// <summary>
-        /// Called when some bytes have been received on a connection
-        /// </summary>
-        ///
-        /// <param>count the number of bytes received in total since the connection was opened</param>         
-        public void BytesReceived(long count) {}
-
-        /// <summary>
-        /// Called after the infrastructure has detected that failover is required but before attempting failover.
-        /// </summary>
-        ///
-        /// <param>redirect true if the broker requested redirect. false if failover is occurring due to a connection error.</param>
-        ///
-        /// <return>true to continue failing over, false to veto failover and raise a connection exception</return>         
-        public bool PreFailover(bool redirect) 
-        {
-            _log.Debug("public bool PreFailover(bool redirect): called");
-            return true; 
-        }
-
-        /// <summary>
-        /// Called after connection has been made to another broker after failover has been started but before
-        /// any resubscription has been done.
-        /// </summary>
-        ///
-        /// <return> true to continue with resubscription, false to prevent automatic resubscription. This is useful in
-        /// cases where the application wants to handle resubscription. Note that in the latter case all sessions, producers
-        /// and consumers are invalidated.
-        /// </return>
-        public bool PreResubscribe() 
-        {
-            _log.Debug("public bool PreResubscribe(): called");
-            return true; 
-        }
-
-        /// <summary>
-        /// Called once failover has completed successfully. This is called irrespective of whether the client has
-        /// vetoed automatic resubscription.
-        /// </summary>
-        public void FailoverComplete() 
-        {
-            _log.Debug("public void FailoverComplete(): called");
-        }
-    }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Runtime.InteropServices;
+using System.Threading;
+using log4net;
+using NUnit.Framework;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client;
+using Apache.Qpid.Messaging;
+
+namespace Apache.Qpid.Integration.Tests.interactive
+{
+    [TestFixture, Category("Interactive")]
+    public class FailoverTest : IConnectionListener
+    {
+        private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverTest));
+
+        /// <summary>Specifies the number of times to run the test cycle.</summary>
+        const int NUM_MESSAGES = 10;
+
+        /// <summary>Determines how many messages to send within each commit.</summary>
+        const int COMMIT_BATCH_SIZE = 1;
+
+        /// <summary>Specifies the duration of the pause to place between each message sent in the test.</summary>
+        //const int SLEEP_MILLIS = 1;
+
+        /// <summary>Specified the maximum time in milliseconds to wait for the test to complete.</summary>
+        const int TIMEOUT = 10000;
+
+        /// <summary>Defines the number of test messages to send, before prompting the user to fail a broker.</summary>
+        const int FAIL_POINT = 5;
+
+        /// <summary>Specified the ack mode to use for the test.</summary>
+        AcknowledgeMode _acknowledgeMode = AcknowledgeMode.AutoAcknowledge;
+
+        /// <summary>Determines whether this test runs transactionally or not. </summary>
+        bool transacted = false;
+
+        /// <summary>Holds the connection to run the test over.</summary>
+        AMQConnection _connection;
+
+        /// <summary>Holds the channel for the test message publisher. </summary>
+        IChannel publishingChannel;
+
+        /// <summary>Holds the test message publisher. </summary>
+        IMessagePublisher publisher;
+
+        /// <summary>Used to keep count of the number of messages sent. </summary>
+        int messagesSent;
+
+        /// <summary>Used to keep count of the number of messages received. </summary>
+        int messagesReceived;
+
+        /// <summary>Used to wait for test completion on. </summary>
+        private static object testComplete = new Object();
+
+        /// <summary>Used to wait for failover completion on. </summary>
+	private static object failoverComplete = new Object();
+	
+        bool failedOver=false;
+
+        /// <summary>Used to record the extra message count (1) if the message sent right after failover actually made it to the new broker.</summary>
+        int _extraMessage = 0;
+	
+        /// <summary>
+        /// Creates the test connection with a fail-over set up, and a producer/consumer pair on that connection.
+        /// </summary>
+        /// [SetUp]
+        public void Init(IConnectionInfo connectionInfo)
+        {
+	    //log4net.Config.BasicConfigurator.Configure();
+            // Reset all counts.
+            messagesSent = 0;
+            messagesReceived = 0;
+            failedOver=false;
+            _extraMessage = 0;
+
+	    PromptAndWait("Ensure both brokers are running, then press Enter");	    
+	    
+            // Create a connection for the test.
+            _connection = new AMQConnection(connectionInfo);
+            _connection.ConnectionListener = this;
+
+            // Create a consumer to receive the test messages.
+            IChannel receivingChannel = _connection.CreateChannel(false, _acknowledgeMode);
+
+            string queueName = receivingChannel.GenerateUniqueName();
+            receivingChannel.DeclareQueue(queueName, false, true, true);
+            receivingChannel.Bind(queueName, "amq.direct", queueName);
+
+            IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName)
+                .WithPrefetchLow(30)
+                .WithPrefetchHigh(60).Create();
+
+            consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
+            _connection.Start();
+
+            // Create a publisher to send the test messages.
+            publishingChannel = _connection.CreateChannel(transacted, AcknowledgeMode.NoAcknowledge);
+            publisher = publishingChannel.CreatePublisherBuilder()
+                .WithRoutingKey(queueName)
+                .Create();
+
+            _log.Debug("connection = " + _connection);
+            _log.Debug("connectionInfo = " + connectionInfo);
+            _log.Debug("connection.AsUrl = " + _connection.toURL());
+            _log.Debug("AcknowledgeMode is " + _acknowledgeMode);
+        }
+
+        /// <summary>
+        /// Clean up the test connection.
+        /// </summary>
+        [TearDown]
+        public virtual void Shutdown()
+        {
+ 	    if (!failedOver)
+	    {
+                 Assert.Fail("The failover callback never occured.");
+            }
+
+            Console.WriteLine("Test done shutting down");
+	    Thread.Sleep(2000);
+            _connection.Close();
+        }
+
+        /// <summary>
+        /// Runs a failover test, building up the connection information from its component parts. In particular the brokers
+        /// to fail between are seperately added into the connection info.
+        /// </summary>
+        /*[Test]
+        public void TestWithBasicInfo()
+        {
+            _log.Debug("public void TestWithBasicInfo(): called");
+
+            // Manually create the connection parameters.
+            QpidConnectionInfo connectionInfo = new QpidConnectionInfo();
+            connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false));
+            connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5673, false));
+
+            Init(connectionInfo);
+            DoFailoverTest();
+        }*/
+
+        /// <summary>
+        /// Runs a failover test, with the failover configuration specified in the Qpid connection URL format.
+        /// </summary>
+        [Test]
+        public void TestWithUrl()
+        {
+            _log.Debug("public void runTestWithUrl(): called");
+
+            // Parse the connection parameters from a URL.
+            String clientId = "failover" + DateTime.Now.Ticks;
+            string defaultUrl = "amqp://guest:guest@" + clientId + "/test" +
+                "?brokerlist='tcp://localhost:9672;tcp://localhost:9673'&failover='roundrobin'";            
+            IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(defaultUrl);
+            
+            Init(connectionInfo);
+            DoFailoverTest(0);
+        }
+
+        /// <summary>
+        /// Send the test messages, prompting at the fail point for the user to cause a broker failure. The test checks that all messages sent
+        /// are received within the test time limit.
+        /// </summary>
+        ///
+        /// <param name="connectionInfo">The connection parameters, specifying the brokers to fail between.</param>
+        void DoFailoverTest(int delay)
+        {
+            _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): called");
+
+            // Wait for all of the test messages to be received, checking that this occurs within the test time limit.
+	    bool withinTimeout = false;
+
+            for (int i = 1; i <= NUM_MESSAGES; ++i)
+            {
+		SendMessage();
+
+		// Prompt the user to cause a failure if at the fail point.
+		if (i == FAIL_POINT)
+		{
+		    for( int min = delay ; min > 0 ; min--)
+		    {
+		       Console.WriteLine("Waiting for "+min+" minutes to test connection time bug.");
+		       Thread.Sleep(60*1000);
+		    }
+
+		    PromptAndWait("Cause a broker failure now, then press return.");
+		    Console.WriteLine("NOTE: ensure that the delay between killing the broker and continuing here is less than 20 second");
+		    
+		    Console.WriteLine("Sending a message to ensure send right after works");
+
+		    SendMessage();
+
+		    Console.WriteLine("Waiting for fail-over to complete before continuing...");
+
+
+		    lock(failoverComplete)
+		    {
+			if (!failedOver)
+			{
+			    withinTimeout = Monitor.Wait(failoverComplete, TIMEOUT);
+			}
+			else
+			{
+			    withinTimeout=true;
+			}
+		    }
+
+		    if (!withinTimeout)
+		    {
+			PromptAndWait("Failover has not yet occured. Press enter to give up waiting.");
+		    }
+		}
+	    }
+
+            lock(testComplete)
+            {
+                withinTimeout = Monitor.Wait(testComplete, TIMEOUT);
+            }            
+
+            if (!withinTimeout)
+            {
+                Assert.Fail("Test timed out, before all messages received.");
+            }
+
+            _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): exiting");
+        }
+
+	[Test]
+        public void Test5MinuteWait()
+	{
+	    String clientId = "failover" + DateTime.Now.Ticks;
+
+	    QpidConnectionInfo connectionInfo = new QpidConnectionInfo();
+	    connectionInfo.Username = "guest";
+	    connectionInfo.Password = "guest";
+	    connectionInfo.ClientName = clientId;
+	    connectionInfo.VirtualHost = "/test";
+	    connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 9672, false));
+	    connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 9673, false));
+	    
+	    Init(connectionInfo);
+	    DoFailoverTest(5);
+	}
+
+	void SendMessage()
+	{
+	    ITextMessage msg = publishingChannel.CreateTextMessage("message=" + messagesSent);
+
+	    publisher.Send(msg);
+	    messagesSent++;
+
+	    if (transacted)
+	    {
+		publishingChannel.Commit();
+	    }
+	    
+	    Console.WriteLine("messagesSent = " + messagesSent);
+	}
+	
+        /// <summary>
+        /// Receives all of the test messages.
+        /// </summary>
+        ///
+        /// <param name="message">The newly arrived test message.</param>
+        public void OnMessage(IMessage message)
+        {
+            try
+            {
+                if (_acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
+                {
+                    message.Acknowledge();
+                }
+
+                messagesReceived++;
+
+                _log.Debug("messagesReceived = " + messagesReceived);
+
+                // Check if all of the messages in the test have been received, in which case notify the message producer that the test has 
+                // succesfully completed.
+                if (messagesReceived == NUM_MESSAGES + _extraMessage)
+                {
+                    lock (testComplete)
+                    {
+			failedOver = true;
+                        Monitor.Pulse(testComplete);
+                    }
+                }
+            }
+            catch (QpidException e)
+            {
+                _log.Fatal("Exception received. About to stop.", e);
+                Stop();
+            }
+        }
+
+        /// <summary>Prompts the user on stdout and waits for a reply on stdin, using the specified prompt message.</summary>
+        ///
+        /// <param name="message">The message to prompt the user with.</param>
+        private void PromptAndWait(string message)
+        {
+            Console.WriteLine("\n" + message);
+            Console.ReadLine();
+        }
+
+        // <summary>Closes the test connection.</summary>
+        private void Stop()
+        {
+            _log.Debug("Stopping...");
+            try
+            {
+                _connection.Close();
+            }
+            catch (QpidException e)
+            {
+                _log.Debug("Failed to shutdown: ", e);
+            }
+        }
+
+        /// <summary>
+        /// Called when bytes have been transmitted to the server
+        /// </summary>
+        ///
+        /// <param>count the number of bytes sent in total since the connection was opened</param>     
+        public void BytesSent(long count) {}
+
+        /// <summary>
+        /// Called when some bytes have been received on a connection
+        /// </summary>
+        ///
+        /// <param>count the number of bytes received in total since the connection was opened</param>         
+        public void BytesReceived(long count) {}
+
+        /// <summary>
+        /// Called after the infrastructure has detected that failover is required but before attempting failover.
+        /// </summary>
+        ///
+        /// <param>redirect true if the broker requested redirect. false if failover is occurring due to a connection error.</param>
+        ///
+        /// <return>true to continue failing over, false to veto failover and raise a connection exception</return>         
+        public bool PreFailover(bool redirect) 
+        {
+            _log.Debug("public bool PreFailover(bool redirect): called");
+            return true; 
+        }
+
+        /// <summary>
+        /// Called after connection has been made to another broker after failover has been started but before
+        /// any resubscription has been done.
+        /// </summary>
+        ///
+        /// <return> true to continue with resubscription, false to prevent automatic resubscription. This is useful in
+        /// cases where the application wants to handle resubscription. Note that in the latter case all sessions, producers
+        /// and consumers are invalidated.
+        /// </return>
+        public bool PreResubscribe() 
+        {
+            _log.Debug("public bool PreResubscribe(): called");
+            return true; 
+        }
+
+        /// <summary>
+        /// Called once failover has completed successfully. This is called irrespective of whether the client has
+        /// vetoed automatic resubscription.
+        /// </summary>
+        public void FailoverComplete() 
+        {
+	    failedOver = true;
+            _log.Debug("public void FailoverComplete(): called");
+	    Console.WriteLine("public void FailoverComplete(): called");
+	    lock (failoverComplete) 
+	    {
+	      Monitor.Pulse(failoverComplete);
+	    }
+        }
+    }
+}

Modified: incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs?rev=654818&r1=654817&r2=654818&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs (original)
+++ incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs Fri May  9 06:53:51 2008
@@ -41,7 +41,7 @@
         private const string MESSAGE_DATA_BYTES = "-- Test Message -- Test Message -- Test Message -- Test Message -- Test Message ";
 
         /// <summary> The default timeout in milliseconds to use on receives. </summary>
-        private const long RECEIVE_WAIT = 500;
+        private const long RECEIVE_WAIT = 2000;
 
         /// <summary> The default AMQ connection URL to use for tests. </summary>
         public const string connectionUri = "amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672'";
@@ -55,6 +55,9 @@
         /// <summary> Holds an array of channels for building mutiple test end-points. </summary>
         protected IChannel[] testChannel = new IChannel[10];
 
+         /// <summary> Holds an array of queues for building mutiple test end-points. </summary>
+        protected String[] testQueue = new String[10];
+        
         /// <summary> Holds an array of producers for building mutiple test end-points. </summary>
         protected IMessagePublisher[] testProducer = new IMessagePublisher[10];
 
@@ -65,7 +68,7 @@
         private static int uniqueId = 0;
 
         /// <summary> Used to hold unique ids per test. </summary>
-        protected int testId;
+        protected Guid testId;
 
         /// <summary> Creates the test connection and channel. </summary>
         [SetUp]
@@ -74,7 +77,7 @@
             log.Debug("public virtual void Init(): called");
 
             // Set up a unique id for this test.
-            testId = uniqueId++;
+            testId = System.Guid.NewGuid();
         }
 
         /// <summary>
@@ -144,6 +147,10 @@
 
                     if (declareBind)
                     {
+                    	if (durable) 
+                    	{
+                    		testQueue[n] = queueName;
+                    	}
                         testChannel[n].DeclareQueue(queueName, durable, true, true);
                         testChannel[n].Bind(queueName, exchangeName, routingKey);
                     }
@@ -167,6 +174,10 @@
 
             if (testConsumer[n] != null)
             {
+            	if (testQueue[n] != null)
+            	{
+            		testChannel[n].DeleteQueue(testQueue[n], false, false, true);
+            	}
                 testConsumer[n].Close();
                 testConsumer[n].Dispose();
                 testConsumer[n] = null;

Modified: incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs?rev=654818&r1=654817&r2=654818&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs (original)
+++ incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs Fri May  9 06:53:51 2008
@@ -127,7 +127,7 @@
                 .WithRoutingKey(_routingKey)
                 .Create();
             _logger.Info("Publisher created...");
-            SendTestMessage("Message 1");
+            SendTestMessage("DeleteNonEmptyQueue Message 1");
 
             try
             {
@@ -165,8 +165,8 @@
                 .Create();
             _logger.Info("Publisher created...");
 
-            SendTestMessage("Message 1");
-            SendTestMessage("Message 2");
+            SendTestMessage("DeleteQueueWithResponse Message 1");
+            SendTestMessage("DeleteQueueWithResponse Message 2");
             
             // delete the queue, the server must respond
             _channel.DeleteQueue(_queueName, false, false, false);

Modified: incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs?rev=654818&r1=654817&r2=654818&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs (original)
+++ incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs Fri May  9 06:53:51 2008
@@ -94,11 +94,11 @@
         public void TestCommittedSendReceived() 
         {
             // Send messages.
-            testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+            testProducer[0].Send(testChannel[0].CreateTextMessage("B"));
             testChannel[0].Commit();
 
             // Try to receive messages.
-            ConsumeNMessagesOnly(1, "A", testConsumer[1]);
+            ConsumeNMessagesOnly(1, "B", testConsumer[1]);
             testChannel[1].Commit();
         }
 
@@ -107,11 +107,11 @@
         public void TestRolledBackSendNotReceived()
         {
             // Send messages.
-            testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+            testProducer[0].Send(testChannel[0].CreateTextMessage("B"));
             testChannel[0].Rollback();
 
             // Try to receive messages.
-            ConsumeNMessagesOnly(0, "A", testConsumer[1]);
+            ConsumeNMessagesOnly(0, "B", testConsumer[1]);
             testChannel[1].Commit();
         }
 
@@ -124,17 +124,17 @@
                           true, false, null);
 
             // Send messages.
-            testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+            testProducer[0].Send(testChannel[0].CreateTextMessage("C"));
             testChannel[0].Commit();
 
             // Try to receive messages.
-            ConsumeNMessagesOnly(1, "A", testConsumer[1]);
+            ConsumeNMessagesOnly(1, "C", testConsumer[1]);
 
             // Close end-point 1 without committing the message, then re-open to consume again.
             CloseEndPoint(1);
 
             // Check that the message was released from the rolled back end-point an can be received on the alternative one instead.
-            ConsumeNMessagesOnly(1, "A", testConsumer[2]);
+            ConsumeNMessagesOnly(1, "C", testConsumer[2]);
 
             CloseEndPoint(2);
         }
@@ -144,38 +144,33 @@
         public void TestCommittedReceiveNotRereceived() 
         {
             // Send messages.
-            testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+            testProducer[0].Send(testChannel[0].CreateTextMessage("D"));
             testChannel[0].Commit();
 
             // Try to receive messages.
-            ConsumeNMessagesOnly(1, "A", testConsumer[1]);
+            ConsumeNMessagesOnly(1, "D", testConsumer[1]);
             testChannel[1].Commit();
 
             // Try to receive messages.
-            ConsumeNMessagesOnly(0, "A", testConsumer[1]);
+            ConsumeNMessagesOnly(0, "D", testConsumer[1]);
         }
 
         /// <summary> Check that a rolled back receive can be re-received. </summary>
         [Test]
         public void TestRolledBackReceiveCanBeRereceived() 
         {
-            // Create a third end-point as an alternative delivery route for the message.
-            SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, 
-                          true, false, null);
-
             // Send messages.
-            testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+            testProducer[0].Send(testChannel[0].CreateTextMessage("E"));
             testChannel[0].Commit();
             
             // Try to receive messages.
-            ConsumeNMessagesOnly(1, "A", testConsumer[1]);
+            ConsumeNMessagesOnly(1, "E", testConsumer[1]);
 
             testChannel[1].Rollback();
 
             // Try to receive messages.
-            ConsumeNMessagesOnly(1, "A", testConsumer[2]);
-
-            CloseEndPoint(2);
+            ConsumeNMessagesOnly(1, "E", testConsumer[1]);
+            
         }
     }
-}
\ No newline at end of file
+}

Modified: incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs?rev=654818&r1=654817&r2=654818&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs (original)
+++ incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs Fri May  9 06:53:51 2008
@@ -99,7 +99,7 @@
                           true, "TestSubscription" + testId);
 
             ConsumeNMessagesOnly(1, "B", testConsumer[2]);
-
+			
             // Clean up any open consumers at the end of the test.
             CloseEndPoint(2);
             CloseEndPoint(1);
@@ -113,23 +113,23 @@
             SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
                           true, false, null);
             SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC, 
-                          true, false, null);
+                          true, true, "foo"+testId);
 
             // Send messages.
-            testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+            testProducer[0].Send(testChannel[0].CreateTextMessage("C"));
             testChannel[0].Commit();
 
             // Try to receive messages, but don't commit them.
-            ConsumeNMessagesOnly(1, "A", testConsumer[1]);
+            ConsumeNMessagesOnly(1, "C", testConsumer[1]);
 
             // Close end-point 1 without committing the message, then re-open the subscription to consume again.
             CloseEndPoint(1);
-            SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, 
-                          true, false, null);
+            SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC, 
+                          true, true, "foo"+testId);
 
             // Check that the message was released from the rolled back end-point an can be received on the alternative one instead.
-            ConsumeNMessagesOnly(1, "A", testConsumer[1]);
-
+            ConsumeNMessagesOnly(1, "C", testConsumer[1]);
+			testChannel[1].Commit();
             CloseEndPoint(1);
             CloseEndPoint(0);
         }
@@ -141,24 +141,24 @@
             SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC, 
                           true, false, null);
             SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC, 
-                          true, false, null);
+                          true, true, "foo"+testId);
 
             // Send messages.
-            testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+            testProducer[0].Send(testChannel[0].CreateTextMessage("D"));
             testChannel[0].Commit();
             
             // Try to receive messages, but roll them back.
-            ConsumeNMessagesOnly(1, "A", testConsumer[1]);
+            ConsumeNMessagesOnly(1, "D", testConsumer[1]);
             testChannel[1].Rollback();
 
             // Close end-point 1 without committing the message, then re-open the subscription to consume again.
             CloseEndPoint(1);
-            SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, 
-                          true, false, null);
+            SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC, 
+                          true, true, "foo"+testId);
 
             // Check that the message was released from the rolled back end-point an can be received on the alternative one instead.
-            ConsumeNMessagesOnly(1, "A", testConsumer[1]);
-
+            ConsumeNMessagesOnly(1, "D", testConsumer[1]);
+			testChannel[1].Commit();
             CloseEndPoint(1);
             CloseEndPoint(0);
         }

Modified: incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs?rev=654818&r1=654817&r2=654818&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs (original)
+++ incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs Fri May  9 06:53:51 2008
@@ -50,7 +50,7 @@
         private static ILog _logger = LogManager.GetLogger(typeof(HeadersExchangeTest));
 
         /// <summary> Holds the default test timeout for broker communications before tests give up. </summary>
-        private static readonly int TIMEOUT = 1000;
+        private static readonly int TIMEOUT = 2000;
 
         /// <summary> Holds the name of the headers exchange to create to send test messages on. </summary>
         private string _exchangeName = "ServiceQ1";

Modified: incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs?rev=654818&r1=654817&r2=654818&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs (original)
+++ incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs Fri May  9 06:53:51 2008
@@ -117,7 +117,7 @@
                 testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
             }
 
-            _finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 10), false);
+            _finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 30), false);
 
             // Check that all messages really were received.
             Assert.IsTrue(allReceived, "All messages were not received, only got " + _messageReceivedCount + " but wanted " + expectedMessageCount);
@@ -139,14 +139,14 @@
             SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, false, ExchangeNameDefaults.DIRECT,
                           true, false, null);
 
-            expectedMessageCount = MESSAGE_COUNT;
+            expectedMessageCount = (MESSAGE_COUNT * CONSUMER_COUNT);
 
             for (int i = 0; i < MESSAGE_COUNT; i++)
             {
                 testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
             }
 
-            _finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 10), false);
+            _finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 30), false);
 
             // Check that all messages really were received.
             Assert.IsTrue(allReceived, "All messages were not received, only got: " + _messageReceivedCount + " but wanted " + expectedMessageCount);

Modified: incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs?rev=654818&r1=654817&r2=654818&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs (original)
+++ incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs Fri May  9 06:53:51 2008
@@ -39,7 +39,7 @@
         /// Make a test TLS connection to the broker
         /// without using client-certificates
         /// </summary>
-        [Test]
+        //[Test]
         public void DoSslConnection()
         {
             // because for tests we don't usually trust the server certificate

Modified: incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=654818&r1=654817&r2=654818&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Fri May  9 06:53:51 2008
@@ -87,10 +87,6 @@
     private final Object _queueHeadLock = new Object();
     private String _processingThreadName = "";
 
-
-    /** Used by any reaping thread to purge messages */
-    private StoreContext _reapingStoreContext = new StoreContext();
-
     ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
     {
 
@@ -218,22 +214,32 @@
     public void removeExpired() throws AMQException
     {
         _lock.lock();
-
-
-	    for(Iterator<QueueEntry> iter = _messages.iterator(); iter.hasNext();)
+        try
         {
-            QueueEntry entry = iter.next();
-            if(entry.expired())
+            // New Context to for dealing with the MessageStore.
+            StoreContext context = new StoreContext();
+
+            for(Iterator<QueueEntry> iter = _messages.iterator(); iter.hasNext();)
             {
-                // fixme: Currently we have to update the total byte size here for the data in the queue  
-                _totalMessageSize.addAndGet(-entry.getSize());
-                _queue.dequeue(_reapingStoreContext,entry);
-                iter.remove();
-            }
-	    }
+                QueueEntry entry = iter.next();
+                if(entry.expired())
+                {
+                    // fixme: Currently we have to update the total byte size here for the data in the queue
+                    _totalMessageSize.addAndGet(-entry.getSize());
 
+                    // Remove the message from the queue in the MessageStore
+                    _queue.dequeue(context,entry);
 
-        _lock.unlock();
+                    // This queue nolonger needs a reference to this message
+                    entry.getMessage().decrementReference(context);
+                    iter.remove();
+                }
+            }
+        }
+        finally
+        {
+            _lock.unlock();
+        }
     }
 
     /** @return the state of the async processor. */
@@ -249,14 +255,20 @@
      */
     public List<QueueEntry> getMessages()
     {
-        _lock.lock();
-        List<QueueEntry> list = new ArrayList<QueueEntry>();
+         List<QueueEntry> list = new ArrayList<QueueEntry>();
 
-        for (QueueEntry entry : _messages)
+        _lock.lock();
+        try
         {
-            list.add(entry);
+            for (QueueEntry entry : _messages)
+            {
+                list.add(entry);
+            }
+        }
+        finally
+        {
+            _lock.unlock();
         }
-        _lock.unlock();
 
         return list;
     }
@@ -278,24 +290,28 @@
 
         long maxMessageCount = toMessageId - fromMessageId + 1;
 
-        _lock.lock();
-
         List<QueueEntry> foundMessagesList = new ArrayList<QueueEntry>();
-
-        for (QueueEntry entry : _messages)
+        _lock.lock();
+        try
         {
-            long msgId = entry.getMessage().getMessageId();
-            if (msgId >= fromMessageId && msgId <= toMessageId)
+            for (QueueEntry entry : _messages)
             {
-                foundMessagesList.add(entry);
-            }
-            // break if the no of messages are found
-            if (foundMessagesList.size() == maxMessageCount)
-            {
-                break;
+                long msgId = entry.getMessage().getMessageId();
+                if (msgId >= fromMessageId && msgId <= toMessageId)
+                {
+                    foundMessagesList.add(entry);
+                }
+                // break if the no of messages are found
+                if (foundMessagesList.size() == maxMessageCount)
+                {
+                    break;
+                }
             }
         }
-        _lock.unlock();
+        finally
+        {
+            _lock.unlock();
+        }
 
         return foundMessagesList;
     }
@@ -445,45 +461,62 @@
     {
         _lock.lock();
 
-        QueueEntry entry = _messages.poll();
-
-        if (entry != null)
+        try
         {
-            queue.dequeue(storeContext, entry);
+            QueueEntry entry = _messages.poll();
 
-            _totalMessageSize.addAndGet(-entry.getSize());
+            if (entry != null)
+            {
+                queue.dequeue(storeContext, entry);
 
-            //If this causes ref count to hit zero then data will be purged so message.getSize() will NPE.
-            entry.getMessage().decrementReference(storeContext);
+                _totalMessageSize.addAndGet(-entry.getSize());
 
-        }
+                //If this causes ref count to hit zero then data will be purged so message.getSize() will NPE.
+                entry.getMessage().decrementReference(storeContext);
 
-        _lock.unlock();
+            }
+        }
+        finally
+        {
+            _lock.unlock();
+        }
     }
 
     public long clearAllMessages(StoreContext storeContext) throws AMQException
     {
         long count = 0;
-        _lock.lock();
 
-        synchronized (_queueHeadLock)
+        _lock.lock();
+        try
         {
-            QueueEntry entry = getNextMessage();
-            while (entry != null)
+            synchronized (_queueHeadLock)
             {
-                //and remove it
-                _messages.poll();
+                QueueEntry entry = getNextMessage();
 
-                _queue.dequeue(storeContext, entry);
+                // todo: note: why do we need this? Why not reuse the passed 'storeContext'
+                //Create a new StoreContext for decrementing the References
+                StoreContext context = new StoreContext();
+
+                while (entry != null)
+                {
+                    //and remove it
+                    _messages.poll();
 
-                entry.getMessage().decrementReference(_reapingStoreContext);
+                    // todo: NOTE: Why is this a different context to the new local 'context'?
+                    _queue.dequeue(storeContext, entry);
 
-                entry = getNextMessage();
-                count++;
+                    entry.getMessage().decrementReference(context);
+
+                    entry = getNextMessage();
+                    count++;
+                }
+                _totalMessageSize.set(0L);
             }
-            _totalMessageSize.set(0L);
         }
-        _lock.unlock();
+        finally
+        {
+            _lock.unlock();
+        }
         return count;
     }
 
@@ -518,10 +551,13 @@
             {
                 _totalMessageSize.addAndGet(-entry.getSize());
 
+                // New Store Context for removing expired messages
+                StoreContext storeContext = new StoreContext();
+
                 // Use the reapingStoreContext as any sub(if we have one) may be in a tx.
-                _queue.dequeue(_reapingStoreContext, entry);
+                _queue.dequeue(storeContext, entry);
 
-                message.decrementReference(_reapingStoreContext);
+                message.decrementReference(storeContext);
 
                 if (_log.isInfoEnabled())
                 {
@@ -760,24 +796,30 @@
     public void enqueueMovedMessages(StoreContext storeContext, List<QueueEntry> movedMessageList)
     {
         _lock.lock();
-        for (QueueEntry entry : movedMessageList)
-        {
-            addMessageToQueue(entry, false);
-        }
-
-        // enqueue on the pre delivery queues
-        for (Subscription sub : _subscriptions.getSubscriptions())
+        try
         {
             for (QueueEntry entry : movedMessageList)
             {
-                // Only give the message to those that want them.
-                if (sub.hasInterest(entry))
+                addMessageToQueue(entry, false);
+            }
+
+            // enqueue on the pre delivery queues
+            for (Subscription sub : _subscriptions.getSubscriptions())
+            {
+                for (QueueEntry entry : movedMessageList)
                 {
-                    sub.enqueueForPreDelivery(entry, true);
+                    // Only give the message to those that want them.
+                    if (sub.hasInterest(entry))
+                    {
+                        sub.enqueueForPreDelivery(entry, true);
+                    }
                 }
             }
         }
-        _lock.unlock();
+        finally
+        {
+            _lock.unlock();
+        }
     }
 
     /**

Modified: incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=654818&r1=654817&r2=654818&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Fri May  9 06:53:51 2008
@@ -71,7 +71,7 @@
 
     private ACLPlugin _accessManager;
 
-    private final Timer _houseKeepingTimer = new Timer("Queue-housekeeping", true);
+    private final Timer _houseKeepingTimer;
      
     private static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L;
     
@@ -172,41 +172,53 @@
 
         _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
         _brokerMBean.register();
+        _houseKeepingTimer = new Timer("Queue-housekeeping-" + _name, true);
         initialiseHouseKeeping(hostConfig);
     }
 
     private void initialiseHouseKeeping(final Configuration hostConfig)
     {
-     
-    	long period = hostConfig.getLong("housekeeping.expiredMessageCheckPeriod", DEFAULT_HOUSEKEEPING_PERIOD);
-    
-    	/* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */
-    	if(period != 0L)
-    	{
-    		class RemoveExpiredMessagesTask extends TimerTask
-    		{
-    			public void run()
-    			{
-    				for(AMQQueue q : _queueRegistry.getQueues())
-    				{
-
-    					try
-    					{
-    						q.removeExpiredIfNoSubscribers();
-    					}
-    					catch (AMQException e)
-    					{
-    						_logger.error("Exception in housekeeping for queue: " + q.getName().toString(),e);
-    						throw new RuntimeException(e);
-    					}
-    				}
-    			}
-    		}
-    		
-    		_houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(),
-    				period/2,
-    				period);
-    	}
+
+        long period = hostConfig.getLong("housekeeping.expiredMessageCheckPeriod", DEFAULT_HOUSEKEEPING_PERIOD);
+
+        /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */
+        if (period != 0L)
+        {
+            class RemoveExpiredMessagesTask extends TimerTask
+            {
+                public void run()
+                {
+                    try
+                    {
+                        _logger.info("Start Run");
+                        for (AMQQueue q : _queueRegistry.getQueues())
+                        {
+
+                            try
+                            {
+                                q.removeExpiredIfNoSubscribers();
+                            }
+                            catch (AMQException e)
+                            {
+                                _logger.error("Exception in housekeeping for queue: " + q.getName().toString(), e);
+                                throw new RuntimeException(e);
+                            }
+                        }
+                        _logger.info("Stop Run");
+                    }
+                    catch (Exception fatal)
+                    {
+                        System.err.println(Thread.currentThread().getName()+" Exception in housekeeping "+fatal);
+                        fatal.printStackTrace();
+                        System.exit(-1);
+                    }
+                }
+            }
+
+            _houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(),
+                                                   period / 2,
+                                                   period);
+        }
     }
     
     private void initialiseMessageStore(Configuration config) throws Exception