You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2011/06/18 01:58:06 UTC
svn commit: r1137086 - in
/activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp:
Connection.cs State/ConnectionState.cs State/ConnectionStateTracker.cs
State/SynchronizedObjects.cs Transport/MutexTransport.cs
Author: jgomes
Date: Fri Jun 17 23:58:06 2011
New Revision: 1137086
URL: http://svn.apache.org/viewvc?rev=1137086&view=rev
Log:
Refactor the CheckConnected function to handle multiple threads attempting to check connection status against an offline broker. Guard against unwanted exceptions being thrown when indexing into a connection state array that has not been fully set up because the broker is offline.
Fixes [AMQNET-331]. (See https://issues.apache.org/jira/browse/AMQNET-331)
Modified:
activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/ConnectionState.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/ConnectionStateTracker.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/SynchronizedObjects.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/Transport/MutexTransport.cs
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/Connection.cs?rev=1137086&r1=1137085&r2=1137086&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/Connection.cs Fri Jun 17 23:58:06 2011
@@ -501,14 +501,7 @@ namespace Apache.NMS.Stomp
public Response SyncRequest(Command command)
{
- try
- {
- return SyncRequest(command, this.RequestTimeout);
- }
- catch(Exception ex)
- {
- throw NMSExceptionSupport.Create(ex);
- }
+ return SyncRequest(command, this.RequestTimeout);
}
public Response SyncRequest(Command command, TimeSpan requestTimeout)
@@ -546,7 +539,13 @@ namespace Apache.NMS.Stomp
}
}
- protected void CheckConnected()
+ private object checkConnectedLock = new object();
+
+ /// <summary>
+ /// Check and ensure that the connection objcet is connected. If it is not
+ /// connected or is closed, a ConnectionClosedException is thrown.
+ /// </summary>
+ internal void CheckConnected()
{
if(closed.Value)
{
@@ -555,17 +554,57 @@ namespace Apache.NMS.Stomp
if(!connected.Value)
{
- if(!this.userSpecifiedClientID)
+ DateTime timeoutTime = DateTime.Now + this.RequestTimeout;
+ int waitCount = 1;
+
+ while(true)
{
- this.info.ClientId = this.clientIdGenerator.GenerateId();
+ if(Monitor.TryEnter(checkConnectedLock))
+ {
+ try
+ {
+ if(!connected.Value)
+ {
+ if(!this.userSpecifiedClientID)
+ {
+ this.info.ClientId = this.clientIdGenerator.GenerateId();
+ }
+
+ try
+ {
+ if(null != transport)
+ {
+ // Send the connection and see if an ack/nak is returned.
+ Response response = transport.Request(this.info, this.RequestTimeout);
+ if(!(response is ExceptionResponse))
+ {
+ connected.Value = true;
+ }
+ }
+ }
+ catch
+ {
+ }
+ }
+ }
+ finally
+ {
+ Monitor.Exit(checkConnectedLock);
+ }
+ }
+
+ if(connected.Value || DateTime.Now > timeoutTime)
+ {
+ break;
+ }
+
+ // Back off from being overly aggressive. Having too many threads
+ // aggressively trying to connect to a down broker pegs the CPU.
+ Thread.Sleep(5 * (waitCount++));
}
- connected.Value = true;
- // now lets send the connection and see if we get an ack/nak
- if(null == SyncRequest(info))
+ if(!connected.Value)
{
- closed.Value = true;
- connected.Value = false;
throw new ConnectionClosedException();
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/ConnectionState.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/ConnectionState.cs?rev=1137086&r1=1137085&r2=1137086&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/ConnectionState.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/ConnectionState.cs Fri Jun 17 23:58:06 2011
@@ -25,8 +25,7 @@ namespace Apache.NMS.Stomp.State
{
ConnectionInfo info;
- private readonly AtomicDictionary<ConsumerId, ConsumerState> consumers =
- new AtomicDictionary<ConsumerId, ConsumerState>();
+ private readonly AtomicDictionary<ConsumerId, ConsumerState> consumers = new AtomicDictionary<ConsumerId, ConsumerState>();
private readonly Atomic<bool> _shutdown = new Atomic<bool>(false);
public ConnectionState(ConnectionInfo info)
@@ -49,26 +48,25 @@ namespace Apache.NMS.Stomp.State
{
get
{
- #if DEBUG
- try
+ ConsumerState consumerState;
+
+ if(consumers.TryGetValue(id, out consumerState))
{
- #endif
- return consumers[id];
- #if DEBUG
+ return consumerState;
}
- catch(System.Collections.Generic.KeyNotFoundException ex)
+
+#if DEBUG
+ // Useful for dignosing missing consumer ids
+ string consumerList = string.Empty;
+ foreach(ConsumerId consumerId in consumers.Keys)
{
- // Useful for dignosing missing consumer ids
- string consumerList = string.Empty;
- foreach(ConsumerId consumerId in consumers.Keys)
- {
- consumerList += consumerId.ToString() + "\n";
- }
- System.Diagnostics.Debug.Assert(false,
- string.Format("Consumer '{0}' did not exist in the consumers collection.\n\nConsumers:-\n{1}", id, consumerList));
- throw ex;
+ consumerList += consumerId.ToString() + "\n";
}
- #endif
+
+ System.Diagnostics.Debug.Assert(false,
+ string.Format("Consumer '{0}' did not exist in the consumers collection.\n\nConsumers:-\n{1}", id, consumerList));
+#endif
+ return null;
}
}
@@ -80,7 +78,9 @@ namespace Apache.NMS.Stomp.State
public ConsumerState removeConsumer(ConsumerId id)
{
- ConsumerState ret = consumers[id];
+ ConsumerState ret = null;
+
+ consumers.TryGetValue(id, out ret);
consumers.Remove(id);
return ret;
}
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/ConnectionStateTracker.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/ConnectionStateTracker.cs?rev=1137086&r1=1137085&r2=1137086&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/ConnectionStateTracker.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/ConnectionStateTracker.cs Fri Jun 17 23:58:06 2011
@@ -31,8 +31,7 @@ namespace Apache.NMS.Stomp.State
{
private static readonly Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
- protected Dictionary<ConnectionId, ConnectionState> connectionStates =
- new Dictionary<ConnectionId, ConnectionState>();
+ protected Dictionary<ConnectionId, ConnectionState> connectionStates = new Dictionary<ConnectionId, ConnectionState>();
private bool _restoreConsumers = true;
@@ -67,10 +66,10 @@ namespace Apache.NMS.Stomp.State
{
transport.Oneway(connectionState.Info);
- if(RestoreConsumers)
- {
- DoRestoreConsumers(transport, connectionState);
- }
+ if(RestoreConsumers)
+ {
+ DoRestoreConsumers(transport, connectionState);
+ }
}
}
@@ -97,10 +96,11 @@ namespace Apache.NMS.Stomp.State
ConnectionId connectionId = sessionId.ParentId;
if(connectionId != null)
{
- ConnectionState cs = connectionStates[connectionId];
- if(cs != null)
+ ConnectionState cs = null;
+
+ if(connectionStates.TryGetValue(connectionId, out cs))
{
- cs.addConsumer(info);
+ cs.addConsumer(info);
}
}
}
@@ -118,8 +118,9 @@ namespace Apache.NMS.Stomp.State
ConnectionId connectionId = sessionId.ParentId;
if(connectionId != null)
{
- ConnectionState cs = connectionStates[connectionId];
- if(cs != null)
+ ConnectionState cs = null;
+
+ if(connectionStates.TryGetValue(connectionId, out cs))
{
cs.removeConsumer(id);
}
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/SynchronizedObjects.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/SynchronizedObjects.cs?rev=1137086&r1=1137085&r2=1137086&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/SynchronizedObjects.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/State/SynchronizedObjects.cs Fri Jun 17 23:58:06 2011
@@ -177,6 +177,14 @@ namespace Apache.NMS.Stomp.State
}
}
+ public bool TryGetValue(TKey key, out TValue val)
+ {
+ lock(((ICollection) _dictionary).SyncRoot)
+ {
+ return _dictionary.TryGetValue(key, out val);
+ }
+ }
+
public AtomicCollection<TKey> Keys
{
get
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/Transport/MutexTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/Transport/MutexTransport.cs?rev=1137086&r1=1137085&r2=1137086&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/Transport/MutexTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/branches/1.5.x/src/main/csharp/Transport/MutexTransport.cs Fri Jun 17 23:58:06 2011
@@ -20,6 +20,7 @@ using Apache.NMS.Stomp.Commands;
namespace Apache.NMS.Stomp.Transport
{
+ /// <summary>
/// A Transport which guards access to the next transport using a mutex.
/// </summary>
public class MutexTransport : TransportFilter
@@ -31,6 +32,7 @@ namespace Apache.NMS.Stomp.Transport
if(timeout > 0)
{
DateTime timeoutTime = DateTime.Now + TimeSpan.FromMilliseconds(timeout);
+ int waitCount = 1;
while(true)
{
@@ -44,7 +46,9 @@ namespace Apache.NMS.Stomp.Transport
throw new IOException(string.Format("Oneway timed out after {0} milliseconds.", timeout));
}
- Thread.Sleep(10);
+ // Back off from being overly aggressive. Having too many threads
+ // aggressively trying to get the lock pegs the CPU.
+ Thread.Sleep(3 * (waitCount++));
}
}
else