You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2012/04/11 00:25:14 UTC

svn commit: r1312026 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src: main/csharp/Connection.cs test/csharp/TempDestinationTest.cs

Author: jgomes
Date: Tue Apr 10 22:25:13 2012
New Revision: 1312026

URL: http://svn.apache.org/viewvc?rev=1312026&view=rev
Log:
Make of copy of the temporary destinations that are being cleaned up when closing a Connection to avoid a race condition of modifying the list of temp destinations while it is being enumerated. Found that the AdvisoryConsumer monitor was creating a race condition while populating the internal temp destination list associated with a Connection so that the Connection member reference was not being set on the temp destination, and subsequently the temp destination would not be cleaned up when the Connection was closed.
Resolves AMQNET-378.

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/TempDestinationTest.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Connection.cs?rev=1312026&r1=1312025&r2=1312026&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Connection.cs Tue Apr 10 22:25:13 2012
@@ -46,8 +46,8 @@ namespace Apache.NMS.ActiveMQ
 		private bool sendAcksAsync = false;
 		private bool dispatchAsync = true;
 		private int producerWindowSize = 0;
-		private bool messagePrioritySupported=true;
-        private bool watchTopicAdviosires = true;
+		private bool messagePrioritySupported = true;
+		private bool watchTopicAdviosires = true;
 
 		private bool userSpecifiedClientID;
 		private readonly Uri brokerUri;
@@ -60,7 +60,7 @@ namespace Apache.NMS.ActiveMQ
 		private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
 		private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
 		private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable());
-        private readonly IDictionary tempDests = Hashtable.Synchronized(new Hashtable());
+		private readonly IDictionary tempDests = Hashtable.Synchronized(new Hashtable());
 		private readonly object myLock = new object();
 		private readonly Atomic<bool> connected = new Atomic<bool>(false);
 		private readonly Atomic<bool> closed = new Atomic<bool>(false);
@@ -77,11 +77,11 @@ namespace Apache.NMS.ActiveMQ
 		private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
 		private ICompressionPolicy compressionPolicy = new CompressionPolicy();
 		private readonly IdGenerator clientIdGenerator;
-        private int consumerIdCounter = 0;
+		private int consumerIdCounter = 0;
 		private volatile CountDownLatch transportInterruptionProcessingComplete;
 		private readonly MessageTransformation messageTransformation;
 		private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor();
-        private AdvisoryConsumer advisoryConsumer = null;
+		private AdvisoryConsumer advisoryConsumer = null;
 
 		public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
 		{
@@ -324,11 +324,11 @@ namespace Apache.NMS.ActiveMQ
 			set { this.dispatchAsync = value; }
 		}
 
-        public bool WatchTopicAdvisories
-        {
-            get { return this.watchTopicAdviosires; }
-            set { this.watchTopicAdviosires = value; }
-        }
+		public bool WatchTopicAdvisories
+		{
+			get { return this.watchTopicAdviosires; }
+			set { this.watchTopicAdviosires = value; }
+		}
 
 		public string ClientId
 		{
@@ -545,11 +545,11 @@ namespace Apache.NMS.ActiveMQ
 					Tracer.Info("Connection.Close(): Closing Connection Now.");
 					this.closing.Value = true;
 
-                    if(this.advisoryConsumer != null)
-                    {
-                        this.advisoryConsumer.Dispose();
-                        this.advisoryConsumer = null;
-                    }
+					if(this.advisoryConsumer != null)
+					{
+						this.advisoryConsumer.Dispose();
+						this.advisoryConsumer = null;
+					}
 
 					lock(sessions.SyncRoot)
 					{
@@ -560,10 +560,18 @@ namespace Apache.NMS.ActiveMQ
 					}
 					sessions.Clear();
 
-                    foreach(ActiveMQTempDestination dest in this.tempDests.Values)
-                    {
-                        dest.Delete();
-                    }
+					if(this.tempDests.Count > 0)
+					{
+						// Make a copy of the destinations to delete, because the act of deleting
+						// them will modify the collection.
+						ActiveMQTempDestination[] tempDestsToDelete = new ActiveMQTempDestination[this.tempDests.Count];
+
+						this.tempDests.Values.CopyTo(tempDestsToDelete, 0);
+						foreach(ActiveMQTempDestination dest in tempDestsToDelete)
+						{
+							dest.Delete();
+						}
+					}
 
 					// Connected is true only when we've successfully sent our ConnectionInfo
 					// to the broker, so if we haven't announced ourselves there's no need to
@@ -655,7 +663,7 @@ namespace Apache.NMS.ActiveMQ
 				Response response = transport.Request(command, requestTimeout);
 				if(response is ExceptionResponse)
 				{
-					ExceptionResponse exceptionResponse = (ExceptionResponse) response;
+					ExceptionResponse exceptionResponse = (ExceptionResponse)response;
 					BrokerError brokerError = exceptionResponse.Exception;
 					throw new BrokerException(brokerError);
 				}
@@ -752,12 +760,12 @@ namespace Apache.NMS.ActiveMQ
 										if(!(response is ExceptionResponse))
 										{
 											connected.Value = true;
-                                            if(this.watchTopicAdviosires)
-                                            {
-                                                ConsumerId id = new ConsumerId(
-                                                    new SessionId(info.ConnectionId, -1),
-                                                    Interlocked.Increment(ref this.consumerIdCounter));
-                                                this.advisoryConsumer = new AdvisoryConsumer(this, id);
+											if(this.watchTopicAdviosires)
+											{
+												ConsumerId id = new ConsumerId(
+													new SessionId(info.ConnectionId, -1),
+													Interlocked.Increment(ref this.consumerIdCounter));
+												this.advisoryConsumer = new AdvisoryConsumer(this, id);
 											}
 										}
 									}
@@ -800,19 +808,19 @@ namespace Apache.NMS.ActiveMQ
 			if(command.IsMessageDispatch)
 			{
 				WaitForTransportInterruptionProcessingToComplete();
-				DispatchMessage((MessageDispatch) command);
+				DispatchMessage((MessageDispatch)command);
 			}
 			else if(command.IsKeepAliveInfo)
 			{
-				OnKeepAliveCommand(commandTransport, (KeepAliveInfo) command);
+				OnKeepAliveCommand(commandTransport, (KeepAliveInfo)command);
 			}
 			else if(command.IsWireFormatInfo)
 			{
-				this.brokerWireFormatInfo = (WireFormatInfo) command;
+				this.brokerWireFormatInfo = (WireFormatInfo)command;
 			}
 			else if(command.IsBrokerInfo)
 			{
-				this.brokerInfo = (BrokerInfo) command;
+				this.brokerInfo = (BrokerInfo)command;
 				this.brokerInfoReceived.countDown();
 			}
 			else if(command.IsShutdownInfo)
@@ -824,7 +832,7 @@ namespace Apache.NMS.ActiveMQ
 			}
 			else if(command.IsProducerAck)
 			{
-				ProducerAck ack = (ProducerAck) command as ProducerAck;
+				ProducerAck ack = (ProducerAck)command as ProducerAck;
 				if(ack.ProducerId != null)
 				{
 					MessageProducer producer = producers[ack.ProducerId] as MessageProducer;
@@ -843,7 +851,7 @@ namespace Apache.NMS.ActiveMQ
 			{
 				if(!closing.Value && !closed.Value)
 				{
-					ConnectionError connectionError = (ConnectionError) command;
+					ConnectionError connectionError = (ConnectionError)command;
 					BrokerError brokerError = connectionError.Exception;
 					string message = "Broker connection error.";
 					string cause = "";
@@ -873,7 +881,7 @@ namespace Apache.NMS.ActiveMQ
 			{
 				if(dispatchers.Contains(dispatch.ConsumerId))
 				{
-					IDispatcher dispatcher = (IDispatcher) dispatchers[dispatch.ConsumerId];
+					IDispatcher dispatcher = (IDispatcher)dispatchers[dispatch.ConsumerId];
 
 					// Can be null when a consumer has sent a MessagePull and there was
 					// no available message at the broker to dispatch or when signalled
@@ -926,7 +934,7 @@ namespace Apache.NMS.ActiveMQ
 					{
 						error = NMSExceptionSupport.Create(error);
 					}
-					NMSException e = (NMSException) error;
+					NMSException e = (NMSException)error;
 
 					// Called in another thread so that processing can continue
 					// here, ensures no lock contention.
@@ -1014,12 +1022,12 @@ namespace Apache.NMS.ActiveMQ
 		{
 			Tracer.Debug("Connection: Transport has been Interrupted.");
 
-            // Ensure that if there's an advisory consumer we don't add it to the
-            // set of consumers that need interruption processing.
+			// Ensure that if there's an advisory consumer we don't add it to the
+			// set of consumers that need interruption processing.
 			this.transportInterruptionProcessingComplete =
-                new CountDownLatch(dispatchers.Count - (this.advisoryConsumer != null ? 1 : 0));
+				new CountDownLatch(dispatchers.Count - (this.advisoryConsumer != null ? 1 : 0));
 
-            if(Tracer.IsDebugEnabled)
+			if(Tracer.IsDebugEnabled)
 			{
 				Tracer.Debug("transport interrupted, dispatchers: " + dispatchers.Count);
 			}
@@ -1120,8 +1128,8 @@ namespace Apache.NMS.ActiveMQ
 
 			this.SyncRequest(command);
 
+			destination = this.AddTempDestination(destination);
 			destination.Connection = this;
-            this.AddTempDestination(destination);
 
 			return destination;
 		}
@@ -1132,19 +1140,19 @@ namespace Apache.NMS.ActiveMQ
 
 		public void DeleteTemporaryDestination(IDestination destination)
 		{
-            CheckClosedOrFailed();
+			CheckClosedOrFailed();
 
-            ActiveMQTempDestination temp = destination as ActiveMQTempDestination;
+			ActiveMQTempDestination temp = destination as ActiveMQTempDestination;
 
-            foreach(Session session in this.sessions)
-            {
-                if(session.IsInUse(temp))
-                {
-                    throw new NMSException("A consumer is consuming from the temporary destination");
-                }
-            }
+			foreach(Session session in this.sessions)
+			{
+				if(session.IsInUse(temp))
+				{
+					throw new NMSException("A consumer is consuming from the temporary destination");
+				}
+			}
 
-            this.tempDests.Remove(destination as ActiveMQTempDestination);
+			this.tempDests.Remove(destination as ActiveMQTempDestination);
 			this.DeleteDestination(destination);
 		}
 
@@ -1153,7 +1161,7 @@ namespace Apache.NMS.ActiveMQ
 			DestinationInfo command = new DestinationInfo();
 			command.ConnectionId = this.ConnectionId;
 			command.OperationType = DestinationInfo.REMOVE_OPERATION_TYPE; // 1 is remove
-			command.Destination = (ActiveMQDestination) destination;
+			command.Destination = (ActiveMQDestination)destination;
 
 			this.Oneway(command);
 		}
@@ -1230,48 +1238,56 @@ namespace Apache.NMS.ActiveMQ
 			}
 		}
 
-        internal void AddTempDestination(ActiveMQTempDestination dest)
-        {
-            // .NET lacks a putIfAbsent operation for Maps.
-            lock(tempDests.SyncRoot)
-            {
-                if(!this.tempDests.Contains(dest))
-                {
-                    this.tempDests.Add(dest, dest);
-                }
-            }
-        }
-
-        internal void RemoveTempDestination(ActiveMQTempDestination dest)
-        {
-            this.tempDests.Remove(dest);
-        }
-
-        internal bool IsTempDestinationActive(ActiveMQTempDestination dest)
-        {
-            if(this.advisoryConsumer == null)
-            {
-                return true;
-            }
-
-            return this.tempDests.Contains(dest);
-        }
-
-        protected void CheckClosedOrFailed()
-        {
-            CheckClosed();
-            if (transportFailed.Value)
-            {
-                throw new ConnectionFailedException(firstFailureError.Message);
-            }
-        }
-
-        protected void CheckClosed()
-        {
-            if(closed.Value)
-            {
-                throw new ConnectionClosedException();
-            }
-        }
+		internal ActiveMQTempDestination AddTempDestination(ActiveMQTempDestination dest)
+		{
+			ActiveMQTempDestination addedDest = dest;
+
+			// .NET lacks a putIfAbsent operation for Maps.
+			lock(tempDests.SyncRoot)
+			{
+				if(!this.tempDests.Contains(dest))
+				{
+					this.tempDests.Add(dest, dest);
+				}
+				else
+				{
+					addedDest = this.tempDests[dest] as ActiveMQTempDestination;
+				}
+			}
+
+			return addedDest;
+		}
+
+		internal void RemoveTempDestination(ActiveMQTempDestination dest)
+		{
+			this.tempDests.Remove(dest);
+		}
+
+		internal bool IsTempDestinationActive(ActiveMQTempDestination dest)
+		{
+			if(this.advisoryConsumer == null)
+			{
+				return true;
+			}
+
+			return this.tempDests.Contains(dest);
+		}
+
+		protected void CheckClosedOrFailed()
+		{
+			CheckClosed();
+			if(transportFailed.Value)
+			{
+				throw new ConnectionFailedException(firstFailureError.Message);
+			}
+		}
+
+		protected void CheckClosed()
+		{
+			if(closed.Value)
+			{
+				throw new ConnectionClosedException();
+			}
+		}
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/TempDestinationTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/TempDestinationTest.cs?rev=1312026&r1=1312025&r2=1312026&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/TempDestinationTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/TempDestinationTest.cs Tue Apr 10 22:25:13 2012
@@ -236,7 +236,7 @@ namespace Apache.NMS.ActiveMQ.Test
         /// Make sure you cannot publish to a temp destination that does not exist anymore.
         /// </summary>
         [Test]
-        public void TestPublishFailsForDestoryedTempDestination()
+        public void TestPublishFailsForDestroyedTempDestination()
         {
 			Connection connection = GetNewConnection();
 			Connection tempConnection = GetNewConnection();
@@ -310,7 +310,7 @@ namespace Apache.NMS.ActiveMQ.Test
 
 				connections.Remove(producerConnection);
 				producerConnection.Close();
-				//Thread.Sleep(2000); // Wait a little bit to let the delete take effect.
+				Thread.Sleep(1000); // Wait a little bit to let the delete take effect.
 
 				// This message delivery NOT should work since the temp destination was removed by closing the connection.
 				try
@@ -351,6 +351,25 @@ namespace Apache.NMS.ActiveMQ.Test
             }
         }
 
-    }
+		/// <summary>
+		/// Test clean up of multiple temp destinations
+		/// </summary>
+		[Test]
+		public void TestCloseConnectionWithTempQueues()
+		{
+			List<ITemporaryQueue> listTempQueues = new List<ITemporaryQueue>();
+			IConnection connection = CreateConnection();
+			ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+
+			connection.Start();
+
+			for(int index = 0; index < 25; index++)
+			{
+				listTempQueues.Add(session.CreateTemporaryQueue());
+			}
+
+			connection.Close();
+		}
+	}
 }