You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2011/06/18 01:58:06 UTC

svn commit: r1137086 - in /activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp: Connection.cs State/ConnectionState.cs State/ConnectionStateTracker.cs State/SynchronizedObjects.cs Transport/MutexTransport.cs

Author: jgomes
Date: Fri Jun 17 23:58:06 2011
New Revision: 1137086

URL: http://svn.apache.org/viewvc?rev=1137086&view=rev
Log:
Refactor the CheckConnected function to handle multiple threads attempting to check connection status against an offline broker.  Guard against unwanted exceptions being thrown when indexing into a connection state array that has not been fully set up because the broker is offline.

Fixes [AMQNET-331]. (See https://issues.apache.org/jira/browse/AMQNET-331)

Modified:
    activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/ConnectionState.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/ConnectionStateTracker.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/SynchronizedObjects.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/Transport/MutexTransport.cs

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/Connection.cs?rev=1137086&r1=1137085&r2=1137086&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/Connection.cs Fri Jun 17 23:58:06 2011
@@ -501,14 +501,7 @@ namespace Apache.NMS.Stomp
 
         public Response SyncRequest(Command command)
         {
-            try
-            {
-                return SyncRequest(command, this.RequestTimeout);
-            }
-            catch(Exception ex)
-            {
-                throw NMSExceptionSupport.Create(ex);
-            }
+            return SyncRequest(command, this.RequestTimeout);
         }
 
         public Response SyncRequest(Command command, TimeSpan requestTimeout)
@@ -546,7 +539,13 @@ namespace Apache.NMS.Stomp
             }
         }
 
-        protected void CheckConnected()
+        private object checkConnectedLock = new object();
+
+        /// <summary>
+        /// Check and ensure that the connection objcet is connected.  If it is not
+        /// connected or is closed, a ConnectionClosedException is thrown.
+        /// </summary>
+        internal void CheckConnected()
         {
             if(closed.Value)
             {
@@ -555,17 +554,57 @@ namespace Apache.NMS.Stomp
 
             if(!connected.Value)
             {
-                if(!this.userSpecifiedClientID)
+                DateTime timeoutTime = DateTime.Now + this.RequestTimeout;
+                int waitCount = 1;
+
+                while(true)
                 {
-                    this.info.ClientId = this.clientIdGenerator.GenerateId();
+                    if(Monitor.TryEnter(checkConnectedLock))
+                    {
+                        try
+                        {
+                            if(!connected.Value)
+                            {
+                                if(!this.userSpecifiedClientID)
+                                {
+                                    this.info.ClientId = this.clientIdGenerator.GenerateId();
+                                }
+
+                                try
+                                {
+                                    if(null != transport)
+                                    {
+                                        // Send the connection and see if an ack/nak is returned.
+                                        Response response = transport.Request(this.info, this.RequestTimeout);
+                                        if(!(response is ExceptionResponse))
+                                        {
+                                            connected.Value = true;
+                                        }
+                                    }
+                                }
+                                catch
+                                {
+                                }
+                            }
+                        }
+                        finally
+                        {
+                            Monitor.Exit(checkConnectedLock);
+                        }
+                    }
+
+                    if(connected.Value || DateTime.Now > timeoutTime)
+                    {
+                        break;
+                    }
+
+                    // Back off from being overly aggressive.  Having too many threads
+                    // aggressively trying to connect to a down broker pegs the CPU.
+                    Thread.Sleep(5 * (waitCount++));
                 }
 
-                connected.Value = true;
-                // now lets send the connection and see if we get an ack/nak
-                if(null == SyncRequest(info))
+                if(!connected.Value)
                 {
-                    closed.Value = true;
-                    connected.Value = false;
                     throw new ConnectionClosedException();
                 }
             }

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/ConnectionState.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/ConnectionState.cs?rev=1137086&r1=1137085&r2=1137086&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/ConnectionState.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/ConnectionState.cs Fri Jun 17 23:58:06 2011
@@ -25,8 +25,7 @@ namespace Apache.NMS.Stomp.State
 	{
 
 		ConnectionInfo info;
-		private readonly AtomicDictionary<ConsumerId, ConsumerState> consumers =
-            new AtomicDictionary<ConsumerId, ConsumerState>();
+		private readonly AtomicDictionary<ConsumerId, ConsumerState> consumers = new AtomicDictionary<ConsumerId, ConsumerState>();
 		private readonly Atomic<bool> _shutdown = new Atomic<bool>(false);
 
 		public ConnectionState(ConnectionInfo info)
@@ -49,26 +48,25 @@ namespace Apache.NMS.Stomp.State
 		{
 			get
 			{
-				#if DEBUG
-				try
+				ConsumerState consumerState;
+				
+				if(consumers.TryGetValue(id, out consumerState))
 				{
-				#endif
-					return consumers[id];
-				#if DEBUG
+					return consumerState;
 				}
-				catch(System.Collections.Generic.KeyNotFoundException ex)
+				
+#if DEBUG
+				// Useful for dignosing missing consumer ids
+				string consumerList = string.Empty;
+				foreach(ConsumerId consumerId in consumers.Keys)
 				{
-					// Useful for dignosing missing consumer ids
-					string consumerList = string.Empty;
-					foreach(ConsumerId consumerId in consumers.Keys)
-					{
-						consumerList += consumerId.ToString() + "\n";
-					}
-					System.Diagnostics.Debug.Assert(false,
-						string.Format("Consumer '{0}' did not exist in the consumers collection.\n\nConsumers:-\n{1}", id, consumerList));
-					throw ex;
+					consumerList += consumerId.ToString() + "\n";
 				}
-				#endif
+				
+				System.Diagnostics.Debug.Assert(false,
+					string.Format("Consumer '{0}' did not exist in the consumers collection.\n\nConsumers:-\n{1}", id, consumerList));
+#endif
+				return null;
 			}
 		}
 
@@ -80,7 +78,9 @@ namespace Apache.NMS.Stomp.State
 
 		public ConsumerState removeConsumer(ConsumerId id)
 		{
-			ConsumerState ret = consumers[id];
+			ConsumerState ret = null;
+			
+			consumers.TryGetValue(id, out ret);
 			consumers.Remove(id);
 			return ret;
 		}

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/ConnectionStateTracker.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/ConnectionStateTracker.cs?rev=1137086&r1=1137085&r2=1137086&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/ConnectionStateTracker.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/ConnectionStateTracker.cs Fri Jun 17 23:58:06 2011
@@ -31,8 +31,7 @@ namespace Apache.NMS.Stomp.State
 	{
 		private static readonly Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
 
-		protected Dictionary<ConnectionId, ConnectionState> connectionStates =
-            new Dictionary<ConnectionId, ConnectionState>();
+		protected Dictionary<ConnectionId, ConnectionState> connectionStates = new Dictionary<ConnectionId, ConnectionState>();
 
 		private bool _restoreConsumers = true;
 
@@ -67,10 +66,10 @@ namespace Apache.NMS.Stomp.State
 			{
 				transport.Oneway(connectionState.Info);
 
-                if(RestoreConsumers)
-                {
-                    DoRestoreConsumers(transport, connectionState);
-                }
+				if(RestoreConsumers)
+				{
+					DoRestoreConsumers(transport, connectionState);
+				}
 			}
 		}
 
@@ -97,10 +96,11 @@ namespace Apache.NMS.Stomp.State
 					ConnectionId connectionId = sessionId.ParentId;
 					if(connectionId != null)
 					{
-						ConnectionState cs = connectionStates[connectionId];
-						if(cs != null)
+						ConnectionState cs = null;
+						
+						if(connectionStates.TryGetValue(connectionId, out cs))
 						{
-						    cs.addConsumer(info);
+							cs.addConsumer(info);
 						}
 					}
 				}
@@ -118,8 +118,9 @@ namespace Apache.NMS.Stomp.State
 					ConnectionId connectionId = sessionId.ParentId;
 					if(connectionId != null)
 					{
-						ConnectionState cs = connectionStates[connectionId];
-						if(cs != null)
+						ConnectionState cs = null;
+						
+						if(connectionStates.TryGetValue(connectionId, out cs))
 						{
 							cs.removeConsumer(id);
 						}

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/SynchronizedObjects.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/SynchronizedObjects.cs?rev=1137086&r1=1137085&r2=1137086&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/SynchronizedObjects.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/SynchronizedObjects.cs Fri Jun 17 23:58:06 2011
@@ -177,6 +177,14 @@ namespace Apache.NMS.Stomp.State
 			}
 		}
 
+		public bool TryGetValue(TKey key, out TValue val)
+		{
+			lock(((ICollection) _dictionary).SyncRoot)
+			{
+				return _dictionary.TryGetValue(key, out val);
+			}
+		}
+
 		public AtomicCollection<TKey> Keys
 		{
 			get

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/Transport/MutexTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/Transport/MutexTransport.cs?rev=1137086&r1=1137085&r2=1137086&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/Transport/MutexTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/Transport/MutexTransport.cs Fri Jun 17 23:58:06 2011
@@ -20,6 +20,7 @@ using Apache.NMS.Stomp.Commands;
 
 namespace Apache.NMS.Stomp.Transport
 {
+	/// <summary>
 	/// A Transport which guards access to the next transport using a mutex.
 	/// </summary>
 	public class MutexTransport : TransportFilter
@@ -31,6 +32,7 @@ namespace Apache.NMS.Stomp.Transport
 			if(timeout > 0)
 			{
 				DateTime timeoutTime = DateTime.Now + TimeSpan.FromMilliseconds(timeout);
+				int waitCount = 1;
 
 				while(true)
 				{
@@ -44,7 +46,9 @@ namespace Apache.NMS.Stomp.Transport
 						throw new IOException(string.Format("Oneway timed out after {0} milliseconds.", timeout));
 					}
 
-					Thread.Sleep(10);
+					// Back off from being overly aggressive.  Having too many threads
+					// aggressively trying to get the lock pegs the CPU.
+					Thread.Sleep(3 * (waitCount++));
 				}
 			}
 			else