You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2011/06/18 01:43:34 UTC

svn commit: r1137081 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp: Connection.cs State/ConnectionState.cs State/ConnectionStateTracker.cs State/SynchronizedObjects.cs Transport/MutexTransport.cs

Author: jgomes
Date: Fri Jun 17 23:43:33 2011
New Revision: 1137081

URL: http://svn.apache.org/viewvc?rev=1137081&view=rev
Log:
Refactor the CheckConnected function to handle multiple threads attempting to check connection status against an offline broker.  Guard against unwanted exceptions being thrown when indexing into a connection state array that has not been fully set up because the broker is offline.
Fixes [AMQNET-331]. (See https://issues.apache.org/jira/browse/AMQNET-331)

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionState.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionStateTracker.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/SynchronizedObjects.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Transport/MutexTransport.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Connection.cs?rev=1137081&r1=1137080&r2=1137081&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Connection.cs Fri Jun 17 23:43:33 2011
@@ -28,1115 +28,1159 @@ using Apache.NMS.Util;
 
 namespace Apache.NMS.ActiveMQ
 {
-    /// <summary>
-    /// Represents a connection with a message broker
-    /// </summary>
-    public class Connection : IConnection
-    {
-        private static readonly IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
-
-        // Uri configurable options.
-        private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
-        private bool asyncSend = false;
-        private bool alwaysSyncSend = false;
-        private bool asyncClose = true;
-        private bool useCompression = false;
-        private bool copyMessageOnSend = true;
-        private bool sendAcksAsync = false;
-        private bool dispatchAsync = true;
-        private int producerWindowSize = 0;
-        private bool messagePrioritySupported=true;
-
-        private bool userSpecifiedClientID;
-        private readonly Uri brokerUri;
-        private ITransport transport;
-        private readonly ConnectionInfo info;
-        private TimeSpan requestTimeout; // from connection factory
-        private BrokerInfo brokerInfo; // from broker
-        private readonly CountDownLatch brokerInfoReceived = new CountDownLatch(1);
-        private WireFormatInfo brokerWireFormatInfo; // from broker
-        private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
-        private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
-        private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable());
-        private readonly object myLock = new object();
-        private readonly Atomic<bool> connected = new Atomic<bool>(false);
-        private readonly Atomic<bool> closed = new Atomic<bool>(false);
-        private readonly Atomic<bool> closing = new Atomic<bool>(false);
-        private readonly Atomic<bool> transportFailed = new Atomic<bool>(false);
-        private Exception firstFailureError = null;
-        private int sessionCounter = 0;
-        private int temporaryDestinationCounter = 0;
-        private int localTransactionCounter;
-        private readonly Atomic<bool> started = new Atomic<bool>(false);
-        private ConnectionMetaData metaData = null;
-        private bool disposed = false;
-        private IRedeliveryPolicy redeliveryPolicy;
-        private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
-        private ICompressionPolicy compressionPolicy = new CompressionPolicy();
-        private readonly IdGenerator clientIdGenerator;
-        private volatile CountDownLatch transportInterruptionProcessingComplete;
-        private readonly MessageTransformation messageTransformation;
-        private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor();
-
-        public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
-        {
-            this.brokerUri = connectionUri;
-            this.clientIdGenerator = clientIdGenerator;
-
-            this.transport = transport;
-            this.transport.Command = new CommandHandler(OnCommand);
-            this.transport.Exception = new ExceptionHandler(OnTransportException);
-            this.transport.Interrupted = new InterruptedHandler(OnTransportInterrupted);
-            this.transport.Resumed = new ResumedHandler(OnTransportResumed);
-
-            ConnectionId id = new ConnectionId();
-            id.Value = CONNECTION_ID_GENERATOR.GenerateId();
-
-            this.info = new ConnectionInfo();
-            this.info.ConnectionId = id;
-            this.info.FaultTolerant = transport.IsFaultTolerant;
-
-            this.messageTransformation = new ActiveMQMessageTransformation(this);
-        }
-
-        ~Connection()
-        {
-            Dispose(false);
-        }
-
-        /// <summary>
-        /// A delegate that can receive transport level exceptions.
-        /// </summary>
-        public event ExceptionListener ExceptionListener;
-
-        /// <summary>
-        /// An asynchronous listener that is notified when a Fault tolerant connection
-        /// has been interrupted.
-        /// </summary>
-        public event ConnectionInterruptedListener ConnectionInterruptedListener;
-
-        /// <summary>
-        /// An asynchronous listener that is notified when a Fault tolerant connection
-        /// has been resumed.
-        /// </summary>
-        public event ConnectionResumedListener ConnectionResumedListener;
-
-        private ConsumerTransformerDelegate consumerTransformer;
-        public ConsumerTransformerDelegate ConsumerTransformer
-        {
-            get { return this.consumerTransformer; }
-            set { this.consumerTransformer = value; }
-        }
-
-        private ProducerTransformerDelegate producerTransformer;
-        public ProducerTransformerDelegate ProducerTransformer
-        {
-            get { return this.producerTransformer; }
-            set { this.producerTransformer = value; }
-        }
-
-        #region Properties
-
-        public String UserName
-        {
-            get { return this.info.UserName; }
-            set { this.info.UserName = value; }
-        }
-
-        public String Password
-        {
-            get { return this.info.Password; }
-            set { this.info.Password = value; }
-        }
-
-        /// <summary>
-        /// This property indicates what version of the Protocol we are using to
-        /// communicate with the Broker, if not set we return the lowest version
-        /// number to indicate we support only the basic command set.
-        /// </summary>
-        public int ProtocolVersion
-        {
-            get
-            {
-                if(brokerWireFormatInfo != null)
-                {
-                    return brokerWireFormatInfo.Version;
-                }
-
-                return 1;
-            }
-        }
-
-        /// <summary>
-        /// This property indicates whether or not async send is enabled.
-        /// </summary>
-        public bool AsyncSend
-        {
-            get { return asyncSend; }
-            set { asyncSend = value; }
-        }
-
-        /// <summary>
-        /// This property indicates whether or not async close is enabled.
-        /// When the connection is closed, it will either send a synchronous
-        /// DisposeOf command to the broker and wait for confirmation (if true),
-        /// or it will send the DisposeOf command asynchronously.
-        /// </summary>
-        public bool AsyncClose
-        {
-            get { return asyncClose; }
-            set { asyncClose = value; }
-        }
-
-        /// <summary>
-        /// This property indicates whether or not async sends are used for
-        /// message acknowledgement messages.  Sending Acks async can improve
-        /// performance but may decrease reliability.
-        /// </summary>
-        public bool SendAcksAsync
-        {
-            get { return sendAcksAsync; }
-            set { sendAcksAsync = value; }
-        }
-
-        /// <summary>
-        /// This property sets the acknowledgment mode for the connection.
-        /// The URI parameter connection.ackmode can be set to a string value
-        /// that maps to the enumeration value.
-        /// </summary>
-        public string AckMode
-        {
-            set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); }
-        }
-
-        /// <summary>
-        /// This property is the maximum number of bytes in memory that a producer will transmit
-        /// to a broker before waiting for acknowledgement messages from the broker that it has
-        /// accepted the previously sent messages. In other words, this how you configure the
-        /// producer flow control window that is used for async sends where the client is responsible
-        /// for managing memory usage. The default value of 0 means no flow control at the client
-        /// </summary>
-        public int ProducerWindowSize
-        {
-            get { return producerWindowSize; }
-            set { producerWindowSize = value; }
-        }
-
-        /// <summary>
-        /// This property forces all messages that are sent to be sent synchronously overriding
-        /// any usage of the AsyncSend flag. This can reduce performance in some cases since the
-        /// only messages we normally send synchronously are Persistent messages not sent in a
-        /// transaction. This options guarantees that no send will return until the broker has
-        /// acknowledge receipt of the message
-        /// </summary>
-        public bool AlwaysSyncSend
-        {
-            get { return alwaysSyncSend; }
-            set { alwaysSyncSend = value; }
-        }
-
-        /// <summary>
-        /// This property indicates whether Message's should be copied before being sent via
-        /// one of the Connection's send methods.  Copying the Message object allows the user
-        /// to resuse the Object over for another send.  If the message isn't copied performance
-        /// can improve but the user must not reuse the Object as it may not have been sent
-        /// before they reset its payload.
-        /// </summary>
-        public bool CopyMessageOnSend
-        {
-            get { return copyMessageOnSend; }
-            set { copyMessageOnSend = value; }
-        }
-
-        /// <summary>
-        /// Enable or Disable the use of Compression on Message bodies.  When enabled all
-        /// messages have their body compressed using the Deflate compression algorithm.
-        /// The recipient of the message must support the use of message compression as well
-        /// otherwise the receiving client will receive a message whose body appears in the
-        /// compressed form.
-        /// </summary>
-        public bool UseCompression
-        {
-            get { return this.useCompression; }
-            set { this.useCompression = value; }
-        }
-
-        /// <summary>
-        /// Indicate whether or not the resources of this Connection should support the
-        /// Message Priority value of incoming messages and dispatch them accordingly.
-        /// When disabled Message are always dispatched to Consumers in FIFO order.
-        /// </summary>
-        public bool MessagePrioritySupported
-        {
-            get { return this.messagePrioritySupported; }
-            set { this.messagePrioritySupported = value; }
-        }
-
-        public IConnectionMetaData MetaData
-        {
-            get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); }
-        }
-
-        public Uri BrokerUri
-        {
-            get { return brokerUri; }
-        }
-
-        public ITransport ITransport
-        {
-            get { return transport; }
-            set { this.transport = value; }
-        }
-
-        public bool TransportFailed
-        {
-            get { return this.transportFailed.Value; }
-        }
-
-        public Exception FirstFailureError
-        {
-            get { return this.firstFailureError; }
-        }
-
-        public TimeSpan RequestTimeout
-        {
-            get { return this.requestTimeout; }
-            set { this.requestTimeout = value; }
-        }
-
-        public AcknowledgementMode AcknowledgementMode
-        {
-            get { return acknowledgementMode; }
-            set { this.acknowledgementMode = value; }
-        }
-
-        /// <summary>
-        /// synchronously or asynchronously by the broker.
-        /// </summary>
-        public bool DispatchAsync
-        {
-            get { return this.dispatchAsync; }
-            set { this.dispatchAsync = value; }
-        }
-
-        public string ClientId
-        {
-            get { return info.ClientId; }
-            set
-            {
-                if(this.connected.Value)
-                {
-                    throw new NMSException("You cannot change the ClientId once the Connection is connected");
-                }
-
-                this.info.ClientId = value;
-                this.userSpecifiedClientID = true;
-                CheckConnected();
-            }
-        }
-
-        /// <summary>
-        /// The Default Client Id used if the ClientId property is not set explicity.
-        /// </summary>
-        public string DefaultClientId
-        {
-            set
-            {
-                this.info.ClientId = value;
-                this.userSpecifiedClientID = true;
-            }
-        }
-
-        public ConnectionId ConnectionId
-        {
-            get { return info.ConnectionId; }
-        }
-
-        public BrokerInfo BrokerInfo
-        {
-            get { return brokerInfo; }
-        }
-
-        public WireFormatInfo BrokerWireFormat
-        {
-            get { return brokerWireFormatInfo; }
-        }
-
-        public String ResourceManagerId
-        {
-            get
-            {
-                this.brokerInfoReceived.await();
-                return brokerInfo.BrokerId.Value;
-            }
-        }
-
-        /// <summary>
-        /// Get/or set the redelivery policy for this connection.
-        /// </summary>
-        public IRedeliveryPolicy RedeliveryPolicy
-        {
-            get { return this.redeliveryPolicy; }
-            set { this.redeliveryPolicy = value; }
-        }
-
-        public PrefetchPolicy PrefetchPolicy
-        {
-            get { return this.prefetchPolicy; }
-            set { this.prefetchPolicy = value; }
-        }
-
-        public ICompressionPolicy CompressionPolicy
-        {
-            get { return this.compressionPolicy; }
-            set { this.compressionPolicy = value; }
-        }
-
-        internal MessageTransformation MessageTransformation
-        {
-            get { return this.messageTransformation; }
-        }
-
-        #endregion
-
-        /// <summary>
-        /// Starts asynchronous message delivery of incoming messages for this connection.
-        /// Synchronous delivery is unaffected.
-        /// </summary>
-        public void Start()
-        {
-            CheckConnected();
-            if(started.CompareAndSet(false, true))
-            {
-                lock(sessions.SyncRoot)
-                {
-                    foreach(Session session in sessions)
-                    {
-                        session.Start();
-                    }
-                }
-            }
-        }
-
-        /// <summary>
-        /// This property determines if the asynchronous message delivery of incoming
-        /// messages has been started for this connection.
-        /// </summary>
-        public bool IsStarted
-        {
-            get { return started.Value; }
-        }
-
-        /// <summary>
-        /// Temporarily stop asynchronous delivery of inbound messages for this connection.
-        /// The sending of outbound messages is unaffected.
-        /// </summary>
-        public void Stop()
-        {
-            CheckConnected();
-            if(started.CompareAndSet(true, false))
-            {
-                lock(sessions.SyncRoot)
-                {
-                    foreach(Session session in sessions)
-                    {
-                        session.Stop();
-                    }
-                }
-            }
-        }
-
-        /// <summary>
-        /// Creates a new session to work on this connection
-        /// </summary>
-        public ISession CreateSession()
-        {
-            return CreateAtiveMQSession(acknowledgementMode);
-        }
-
-        /// <summary>
-        /// Creates a new session to work on this connection
-        /// </summary>
-        public ISession CreateSession(AcknowledgementMode sessionAcknowledgementMode)
-        {
-            return CreateAtiveMQSession(sessionAcknowledgementMode);
-        }
-
-        protected virtual Session CreateAtiveMQSession(AcknowledgementMode ackMode)
-        {
-            CheckConnected();
-            return new Session(this, NextSessionId, ackMode);
-        }
-
-        internal void AddSession(Session session)
-        {
-            if(!this.closing.Value)
-            {
-                sessions.Add(session);
-            }
-        }
-
-        internal void RemoveSession(Session session)
-        {
-            if(!this.closing.Value)
-            {
-                sessions.Remove(session);
-            }
-        }
-
-        internal void addDispatcher( ConsumerId id, IDispatcher dispatcher )
-        {
-            if(!this.closing.Value)
-            {
-                this.dispatchers.Add( id, dispatcher );
-            }
-        }
-
-        internal void removeDispatcher( ConsumerId id )
-        {
-            if(!this.closing.Value)
-            {
-                this.dispatchers.Remove( id );
-            }
-        }
-
-        internal void addProducer( ProducerId id, MessageProducer producer )
-        {
-            if(!this.closing.Value)
-            {
-                this.producers.Add( id, producer );
-            }
-        }
-
-        internal void removeProducer( ProducerId id )
-        {
-            if(!this.closing.Value)
-            {
-                this.producers.Remove( id );
-            }
-        }
+	/// <summary>
+	/// Represents a connection with a message broker
+	/// </summary>
+	public class Connection : IConnection
+	{
+		private static readonly IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
+
+		// Uri configurable options.
+		private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
+		private bool asyncSend = false;
+		private bool alwaysSyncSend = false;
+		private bool asyncClose = true;
+		private bool useCompression = false;
+		private bool copyMessageOnSend = true;
+		private bool sendAcksAsync = false;
+		private bool dispatchAsync = true;
+		private int producerWindowSize = 0;
+		private bool messagePrioritySupported=true;
+
+		private bool userSpecifiedClientID;
+		private readonly Uri brokerUri;
+		private ITransport transport;
+		private readonly ConnectionInfo info;
+		private TimeSpan requestTimeout; // from connection factory
+		private BrokerInfo brokerInfo; // from broker
+		private readonly CountDownLatch brokerInfoReceived = new CountDownLatch(1);
+		private WireFormatInfo brokerWireFormatInfo; // from broker
+		private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
+		private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
+		private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable());
+		private readonly object myLock = new object();
+		private readonly Atomic<bool> connected = new Atomic<bool>(false);
+		private readonly Atomic<bool> closed = new Atomic<bool>(false);
+		private readonly Atomic<bool> closing = new Atomic<bool>(false);
+		private readonly Atomic<bool> transportFailed = new Atomic<bool>(false);
+		private Exception firstFailureError = null;
+		private int sessionCounter = 0;
+		private int temporaryDestinationCounter = 0;
+		private int localTransactionCounter;
+		private readonly Atomic<bool> started = new Atomic<bool>(false);
+		private ConnectionMetaData metaData = null;
+		private bool disposed = false;
+		private IRedeliveryPolicy redeliveryPolicy;
+		private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
+		private ICompressionPolicy compressionPolicy = new CompressionPolicy();
+		private readonly IdGenerator clientIdGenerator;
+		private volatile CountDownLatch transportInterruptionProcessingComplete;
+		private readonly MessageTransformation messageTransformation;
+		private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor();
+
+		public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
+		{
+			this.brokerUri = connectionUri;
+			this.clientIdGenerator = clientIdGenerator;
+
+			this.transport = transport;
+			this.transport.Command = new CommandHandler(OnCommand);
+			this.transport.Exception = new ExceptionHandler(OnTransportException);
+			this.transport.Interrupted = new InterruptedHandler(OnTransportInterrupted);
+			this.transport.Resumed = new ResumedHandler(OnTransportResumed);
+
+			ConnectionId id = new ConnectionId();
+			id.Value = CONNECTION_ID_GENERATOR.GenerateId();
+
+			this.info = new ConnectionInfo();
+			this.info.ConnectionId = id;
+			this.info.FaultTolerant = transport.IsFaultTolerant;
+
+			this.messageTransformation = new ActiveMQMessageTransformation(this);
+		}
+
+		~Connection()
+		{
+			Dispose(false);
+		}
+
+		/// <summary>
+		/// A delegate that can receive transport level exceptions.
+		/// </summary>
+		public event ExceptionListener ExceptionListener;
+
+		/// <summary>
+		/// An asynchronous listener that is notified when a Fault tolerant connection
+		/// has been interrupted.
+		/// </summary>
+		public event ConnectionInterruptedListener ConnectionInterruptedListener;
+
+		/// <summary>
+		/// An asynchronous listener that is notified when a Fault tolerant connection
+		/// has been resumed.
+		/// </summary>
+		public event ConnectionResumedListener ConnectionResumedListener;
+
+		private ConsumerTransformerDelegate consumerTransformer;
+		public ConsumerTransformerDelegate ConsumerTransformer
+		{
+			get { return this.consumerTransformer; }
+			set { this.consumerTransformer = value; }
+		}
+
+		private ProducerTransformerDelegate producerTransformer;
+		public ProducerTransformerDelegate ProducerTransformer
+		{
+			get { return this.producerTransformer; }
+			set { this.producerTransformer = value; }
+		}
+
+		#region Properties
+
+		public String UserName
+		{
+			get { return this.info.UserName; }
+			set { this.info.UserName = value; }
+		}
+
+		public String Password
+		{
+			get { return this.info.Password; }
+			set { this.info.Password = value; }
+		}
+
+		/// <summary>
+		/// This property indicates what version of the Protocol we are using to
+		/// communicate with the Broker, if not set we return the lowest version
+		/// number to indicate we support only the basic command set.
+		/// </summary>
+		public int ProtocolVersion
+		{
+			get
+			{
+				if(brokerWireFormatInfo != null)
+				{
+					return brokerWireFormatInfo.Version;
+				}
+
+				return 1;
+			}
+		}
+
+		/// <summary>
+		/// This property indicates whether or not async send is enabled.
+		/// </summary>
+		public bool AsyncSend
+		{
+			get { return asyncSend; }
+			set { asyncSend = value; }
+		}
+
+		/// <summary>
+		/// This property indicates whether or not async close is enabled.
+		/// When the connection is closed, it will either send a synchronous
+		/// DisposeOf command to the broker and wait for confirmation (if true),
+		/// or it will send the DisposeOf command asynchronously.
+		/// </summary>
+		public bool AsyncClose
+		{
+			get { return asyncClose; }
+			set { asyncClose = value; }
+		}
+
+		/// <summary>
+		/// This property indicates whether or not async sends are used for
+		/// message acknowledgement messages.  Sending Acks async can improve
+		/// performance but may decrease reliability.
+		/// </summary>
+		public bool SendAcksAsync
+		{
+			get { return sendAcksAsync; }
+			set { sendAcksAsync = value; }
+		}
+
+		/// <summary>
+		/// This property sets the acknowledgment mode for the connection.
+		/// The URI parameter connection.ackmode can be set to a string value
+		/// that maps to the enumeration value.
+		/// </summary>
+		public string AckMode
+		{
+			set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); }
+		}
+
+		/// <summary>
+		/// This property is the maximum number of bytes in memory that a producer will transmit
+		/// to a broker before waiting for acknowledgement messages from the broker that it has
+		/// accepted the previously sent messages. In other words, this how you configure the
+		/// producer flow control window that is used for async sends where the client is responsible
+		/// for managing memory usage. The default value of 0 means no flow control at the client
+		/// </summary>
+		public int ProducerWindowSize
+		{
+			get { return producerWindowSize; }
+			set { producerWindowSize = value; }
+		}
+
+		/// <summary>
+		/// This property forces all messages that are sent to be sent synchronously overriding
+		/// any usage of the AsyncSend flag. This can reduce performance in some cases since the
+		/// only messages we normally send synchronously are Persistent messages not sent in a
+		/// transaction. This options guarantees that no send will return until the broker has
+		/// acknowledge receipt of the message
+		/// </summary>
+		public bool AlwaysSyncSend
+		{
+			get { return alwaysSyncSend; }
+			set { alwaysSyncSend = value; }
+		}
+
+		/// <summary>
+		/// This property indicates whether Message's should be copied before being sent via
+		/// one of the Connection's send methods.  Copying the Message object allows the user
+		/// to resuse the Object over for another send.  If the message isn't copied performance
+		/// can improve but the user must not reuse the Object as it may not have been sent
+		/// before they reset its payload.
+		/// </summary>
+		public bool CopyMessageOnSend
+		{
+			get { return copyMessageOnSend; }
+			set { copyMessageOnSend = value; }
+		}
+
+		/// <summary>
+		/// Enable or Disable the use of Compression on Message bodies.  When enabled all
+		/// messages have their body compressed using the Deflate compression algorithm.
+		/// The recipient of the message must support the use of message compression as well
+		/// otherwise the receiving client will receive a message whose body appears in the
+		/// compressed form.
+		/// </summary>
+		public bool UseCompression
+		{
+			get { return this.useCompression; }
+			set { this.useCompression = value; }
+		}
+
+		/// <summary>
+		/// Indicate whether or not the resources of this Connection should support the
+		/// Message Priority value of incoming messages and dispatch them accordingly.
+		/// When disabled Message are always dispatched to Consumers in FIFO order.
+		/// </summary>
+		public bool MessagePrioritySupported
+		{
+			get { return this.messagePrioritySupported; }
+			set { this.messagePrioritySupported = value; }
+		}
+
+		public IConnectionMetaData MetaData
+		{
+			get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); }
+		}
+
+		public Uri BrokerUri
+		{
+			get { return brokerUri; }
+		}
+
+		public ITransport ITransport
+		{
+			get { return transport; }
+			set { this.transport = value; }
+		}
+
+		public bool TransportFailed
+		{
+			get { return this.transportFailed.Value; }
+		}
+
+		public Exception FirstFailureError
+		{
+			get { return this.firstFailureError; }
+		}
+
+		public TimeSpan RequestTimeout
+		{
+			get { return this.requestTimeout; }
+			set { this.requestTimeout = value; }
+		}
+
+		public AcknowledgementMode AcknowledgementMode
+		{
+			get { return acknowledgementMode; }
+			set { this.acknowledgementMode = value; }
+		}
+
+		/// <summary>
+		/// synchronously or asynchronously by the broker.
+		/// </summary>
+		public bool DispatchAsync
+		{
+			get { return this.dispatchAsync; }
+			set { this.dispatchAsync = value; }
+		}
+
+		public string ClientId
+		{
+			get { return info.ClientId; }
+			set
+			{
+				if(this.connected.Value)
+				{
+					throw new NMSException("You cannot change the ClientId once the Connection is connected");
+				}
+
+				this.info.ClientId = value;
+				this.userSpecifiedClientID = true;
+				CheckConnected();
+			}
+		}
+
+		/// <summary>
+		/// The Default Client Id used if the ClientId property is not set explicity.
+		/// </summary>
+		public string DefaultClientId
+		{
+			set
+			{
+				this.info.ClientId = value;
+				this.userSpecifiedClientID = true;
+			}
+		}
+
+		public ConnectionId ConnectionId
+		{
+			get { return info.ConnectionId; }
+		}
+
+		public BrokerInfo BrokerInfo
+		{
+			get { return brokerInfo; }
+		}
+
+		public WireFormatInfo BrokerWireFormat
+		{
+			get { return brokerWireFormatInfo; }
+		}
+
+		public String ResourceManagerId
+		{
+			get
+			{
+				this.brokerInfoReceived.await();
+				return brokerInfo.BrokerId.Value;
+			}
+		}
+
+		/// <summary>
+		/// Get/or set the redelivery policy for this connection.
+		/// </summary>
+		public IRedeliveryPolicy RedeliveryPolicy
+		{
+			get { return this.redeliveryPolicy; }
+			set { this.redeliveryPolicy = value; }
+		}
+
+		public PrefetchPolicy PrefetchPolicy
+		{
+			get { return this.prefetchPolicy; }
+			set { this.prefetchPolicy = value; }
+		}
+
+		public ICompressionPolicy CompressionPolicy
+		{
+			get { return this.compressionPolicy; }
+			set { this.compressionPolicy = value; }
+		}
+
+		internal MessageTransformation MessageTransformation
+		{
+			get { return this.messageTransformation; }
+		}
+
+		#endregion
+
+		/// <summary>
+		/// Starts asynchronous message delivery of incoming messages for this connection.
+		/// Synchronous delivery is unaffected.
+		/// </summary>
+		public void Start()
+		{
+			CheckConnected();
+			if(started.CompareAndSet(false, true))
+			{
+				lock(sessions.SyncRoot)
+				{
+					foreach(Session session in sessions)
+					{
+						session.Start();
+					}
+				}
+			}
+		}
 
-        public void Close()
-        {
+		/// <summary>
+		/// This property determines if the asynchronous message delivery of incoming
+		/// messages has been started for this connection.
+		/// </summary>
+		public bool IsStarted
+		{
+			get { return started.Value; }
+		}
+
+		/// <summary>
+		/// Temporarily stop asynchronous delivery of inbound messages for this connection.
+		/// The sending of outbound messages is unaffected.
+		/// </summary>
+		public void Stop()
+		{
+			CheckConnected();
+			if(started.CompareAndSet(true, false))
+			{
+				lock(sessions.SyncRoot)
+				{
+					foreach(Session session in sessions)
+					{
+						session.Stop();
+					}
+				}
+			}
+		}
+
+		/// <summary>
+		/// Creates a new session to work on this connection
+		/// </summary>
+		public ISession CreateSession()
+		{
+			return CreateAtiveMQSession(acknowledgementMode);
+		}
+
+		/// <summary>
+		/// Creates a new session to work on this connection
+		/// </summary>
+		public ISession CreateSession(AcknowledgementMode sessionAcknowledgementMode)
+		{
+			return CreateAtiveMQSession(sessionAcknowledgementMode);
+		}
+
+		protected virtual Session CreateAtiveMQSession(AcknowledgementMode ackMode)
+		{
+			CheckConnected();
+			return new Session(this, NextSessionId, ackMode);
+		}
+
+		internal void AddSession(Session session)
+		{
+			if(!this.closing.Value)
+			{
+				sessions.Add(session);
+			}
+		}
+
+		internal void RemoveSession(Session session)
+		{
+			if(!this.closing.Value)
+			{
+				sessions.Remove(session);
+			}
+		}
+
+		internal void addDispatcher(ConsumerId id, IDispatcher dispatcher)
+		{
+			if(!this.closing.Value)
+			{
+				this.dispatchers.Add(id, dispatcher);
+			}
+		}
+
+		internal void removeDispatcher(ConsumerId id)
+		{
+			if(!this.closing.Value)
+			{
+				this.dispatchers.Remove(id);
+			}
+		}
+
+		internal void addProducer(ProducerId id, MessageProducer producer)
+		{
+			if(!this.closing.Value)
+			{
+				this.producers.Add(id, producer);
+			}
+		}
+
+		internal void removeProducer(ProducerId id)
+		{
+			if(!this.closing.Value)
+			{
+				this.producers.Remove(id);
+			}
+		}
+
+		public void Close()
+		{
 			if(!this.closed.Value && !transportFailed.Value)
 			{
 				this.Stop();
 			}
-						
-            lock(myLock)
-            {
-                if(this.closed.Value)
-                {
-                    return;
-                }
-				
-                try
-                {
-                    Tracer.Info("Connection.Close(): Closing Connection Now.");
-                    this.closing.Value = true;
-
-                    lock(sessions.SyncRoot)
-                    {
-                        foreach(Session session in sessions)
-                        {
-                            session.Shutdown();
-                        }
-                    }
-                    sessions.Clear();
-
-                    // Connected is true only when we've successfully sent our ConnectionInfo
-                    // to the broker, so if we haven't announced ourselves there's no need to
-                    // inform the broker of a remove, and if the transport is failed, why bother.
-                    if(connected.Value && !transportFailed.Value)
-                    {
-                        DisposeOf(ConnectionId);
-                        ShutdownInfo shutdowninfo = new ShutdownInfo();
-                        transport.Oneway(shutdowninfo);
-                    }
-
-                    executor.Shutdown();
-
-                    Tracer.Info("Connection: Disposing of the Transport.");
-                    transport.Dispose();
-                }
-                catch(Exception ex)
-                {
-                    Tracer.ErrorFormat("Error during connection close: {0}", ex);
-                }
-                finally
-                {
+
+			lock(myLock)
+			{
+				if(this.closed.Value)
+				{
+					return;
+				}
+
+				try
+				{
+					Tracer.Info("Connection.Close(): Closing Connection Now.");
+					this.closing.Value = true;
+
+					lock(sessions.SyncRoot)
+					{
+						foreach(Session session in sessions)
+						{
+							session.Shutdown();
+						}
+					}
+					sessions.Clear();
+
+					// Connected is true only when we've successfully sent our ConnectionInfo
+					// to the broker, so if we haven't announced ourselves there's no need to
+					// inform the broker of a remove, and if the transport is failed, why bother.
+					if(connected.Value && !transportFailed.Value)
+					{
+						DisposeOf(ConnectionId);
+						ShutdownInfo shutdowninfo = new ShutdownInfo();
+						transport.Oneway(shutdowninfo);
+					}
+
+					executor.Shutdown();
+
+					Tracer.Info("Connection: Disposing of the Transport.");
+					transport.Dispose();
+				}
+				catch(Exception ex)
+				{
+					Tracer.ErrorFormat("Error during connection close: {0}", ex);
+				}
+				finally
+				{
 					if(executor != null)
 					{
-                    	executor.Shutdown();
+						executor.Shutdown();
+					}
+
+					this.transport = null;
+					this.closed.Value = true;
+					this.connected.Value = false;
+					this.closing.Value = false;
+				}
+			}
+		}
+
+		public void Dispose()
+		{
+			Dispose(true);
+			GC.SuppressFinalize(this);
+		}
+
+		protected void Dispose(bool disposing)
+		{
+			if(disposed)
+			{
+				return;
+			}
+
+			if(disposing)
+			{
+				// Dispose managed code here.
+			}
+
+			try
+			{
+				Close();
+			}
+			catch
+			{
+				// Ignore network errors.
+			}
+
+			disposed = true;
+		}
+
+		// Implementation methods
+
+		/// <summary>
+		/// Performs a synchronous request-response with the broker
+		/// </summary>
+		///
+		public Response SyncRequest(Command command)
+		{
+			return SyncRequest(command, this.RequestTimeout);
+		}
+
+		/// <summary>
+		/// Performs a synchronous request-response with the broker for requested timeout duration.
+		/// </summary>
+		/// <param name="command"></param>
+		/// <param name="requestTimeout"></param>
+		/// <returns></returns>
+		public Response SyncRequest(Command command, TimeSpan requestTimeout)
+		{
+			CheckConnected();
+
+			try
+			{
+				Response response = transport.Request(command, requestTimeout);
+				if(response is ExceptionResponse)
+				{
+					ExceptionResponse exceptionResponse = (ExceptionResponse) response;
+					BrokerError brokerError = exceptionResponse.Exception;
+					throw new BrokerException(brokerError);
+				}
+				return response;
+			}
+			catch(Exception ex)
+			{
+				throw NMSExceptionSupport.Create(ex);
+			}
+		}
+
+		public void Oneway(Command command)
+		{
+			CheckConnected();
+
+			try
+			{
+				transport.Oneway(command);
+			}
+			catch(Exception ex)
+			{
+				throw NMSExceptionSupport.Create(ex);
+			}
+		}
+
+		private void DisposeOf(DataStructure objectId)
+		{
+			try
+			{
+				RemoveInfo command = new RemoveInfo();
+				command.ObjectId = objectId;
+				if(asyncClose)
+				{
+					Tracer.Info("Asynchronously disposing of Connection.");
+					if(connected.Value)
+					{
+						transport.Oneway(command);
 					}
-					
-                    this.transport = null;
-                    this.closed.Value = true;
-                    this.connected.Value = false;
-                    this.closing.Value = false;
-                }
-            }
-        }
-
-        public void Dispose()
-        {
-            Dispose(true);
-            GC.SuppressFinalize(this);
-        }
-
-        protected void Dispose(bool disposing)
-        {
-            if(disposed)
-            {
-                return;
-            }
-
-            if(disposing)
-            {
-                // Dispose managed code here.
-            }
-
-            try
-            {
-                Close();
-            }
-            catch
-            {
-                // Ignore network errors.
-            }
-
-            disposed = true;
-        }
-
-        // Implementation methods
-
-        /// <summary>
-        /// Performs a synchronous request-response with the broker
-        /// </summary>
-        ///
-
-        public Response SyncRequest(Command command)
-        {
-            try
-            {
-                return SyncRequest(command, this.RequestTimeout);
-            }
-            catch(Exception ex)
-            {
-                throw NMSExceptionSupport.Create(ex);
-            }
-        }
-
-        public Response SyncRequest(Command command, TimeSpan requestTimeout)
-        {
-            CheckConnected();
-
-            try
-            {
-                Response response = transport.Request(command, requestTimeout);
-                if(response is ExceptionResponse)
-                {
-                    ExceptionResponse exceptionResponse = (ExceptionResponse) response;
-                    BrokerError brokerError = exceptionResponse.Exception;
-                    throw new BrokerException(brokerError);
-                }
-                return response;
-            }
-            catch(Exception ex)
-            {
-                throw NMSExceptionSupport.Create(ex);
-            }
-        }
-
-        public void Oneway(Command command)
-        {
-            CheckConnected();
-
-            try
-            {
-                transport.Oneway(command);
-            }
-            catch(Exception ex)
-            {
-                throw NMSExceptionSupport.Create(ex);
-            }
-        }
-
-        private void DisposeOf(DataStructure objectId)
-        {
-            try
-            {
-                RemoveInfo command = new RemoveInfo();
-                command.ObjectId = objectId;
-                if(asyncClose)
-                {
-                    Tracer.Info("Asynchronously disposing of Connection.");
-                    if(connected.Value)
-                    {
-                        transport.Oneway(command);
-                    }
-                    Tracer.Info("Oneway command sent to broker.");
-                }
-                else
-                {
-                    // Ensure that the object is disposed to avoid potential race-conditions
-                    // of trying to re-create the same object in the broker faster than
-                    // the broker can dispose of the object.  Allow up to 5 seconds to process.
-                    Tracer.Info("Synchronously disposing of Connection.");
-                    SyncRequest(command, TimeSpan.FromSeconds(5));
-                    Tracer.Info("Synchronously closed Connection.");
-                }
-            }
-            catch // (BrokerException)
-            {
-                // Ignore exceptions while shutting down.
-            }
-        }
-
-        internal void CheckConnected()
-        {
-            if(closed.Value)
-            {
-                throw new ConnectionClosedException();
-            }
-
-            if(!connected.Value)
-            {
-                if(!this.userSpecifiedClientID)
-                {
-                    this.info.ClientId = this.clientIdGenerator.GenerateId();
-                }
-
-                connected.Value = true;
-                // now lets send the connection and see if we get an ack/nak
-                if(null == SyncRequest(info))
-                {
-                    closed.Value = true;
-                    connected.Value = false;
-                    throw new ConnectionClosedException();
-                }
-            }
-        }
-
-        /// <summary>
-        /// Handle incoming commands
-        /// </summary>
-        /// <param name="commandTransport">An ITransport</param>
-        /// <param name="command">A  Command</param>
-        protected void OnCommand(ITransport commandTransport, Command command)
-        {
-            if(command.IsMessageDispatch)
-            {
-                WaitForTransportInterruptionProcessingToComplete();
-                DispatchMessage((MessageDispatch) command);
-            }
-            else if(command.IsKeepAliveInfo)
-            {
-                OnKeepAliveCommand(commandTransport, (KeepAliveInfo) command);
-            }
-            else if(command.IsWireFormatInfo)
-            {
-                this.brokerWireFormatInfo = (WireFormatInfo) command;
-            }
-            else if(command.IsBrokerInfo)
-            {
-                this.brokerInfo = (BrokerInfo) command;
-                this.brokerInfoReceived.countDown();
-            }
-            else if(command.IsShutdownInfo)
-            {
-                if(!closing.Value && !closed.Value)
-                {
-                    OnException(new NMSException("Broker closed this connection."));
-                }
-            }
-            else if(command.IsProducerAck)
-            {
-                ProducerAck ack = (ProducerAck) command as ProducerAck;
-                if(ack.ProducerId != null)
-                {
-                    MessageProducer producer = producers[ack.ProducerId] as MessageProducer;
-                    if(producer != null)
-                    {
-                        if(Tracer.IsDebugEnabled)
-                        {
-                            Tracer.Debug("Connection: Received a new ProducerAck -> " + ack);
-                        }
-
-                        producer.OnProducerAck(ack);
-                    }
-                }
-            }
-            else if(command.IsConnectionError)
-            {
-                if(!closing.Value && !closed.Value)
-                {
-                    ConnectionError connectionError = (ConnectionError) command;
-                    BrokerError brokerError = connectionError.Exception;
-                    string message = "Broker connection error.";
-                    string cause = "";
-
-                    if(null != brokerError)
-                    {
-                        message = brokerError.Message;
-                        if(null != brokerError.Cause)
-                        {
-                            cause = brokerError.Cause.Message;
-                        }
-                    }
-
-                    OnException(new NMSConnectionException(message, cause));
-                }
-            }
-            else
-            {
-                Tracer.Error("Unknown command: " + command);
-            }
-        }
-
-        protected void DispatchMessage(MessageDispatch dispatch)
-        {
-            lock(dispatchers.SyncRoot)
-            {
-                if(dispatchers.Contains(dispatch.ConsumerId))
-                {
-                    IDispatcher dispatcher = (IDispatcher) dispatchers[dispatch.ConsumerId];
-
-                    // Can be null when a consumer has sent a MessagePull and there was
-                    // no available message at the broker to dispatch or when signalled
-                    // that the end of a Queue browse has been reached.
-                    if(dispatch.Message != null)
-                    {
-                        dispatch.Message.ReadOnlyBody = true;
-                        dispatch.Message.ReadOnlyProperties = true;
-                        dispatch.Message.RedeliveryCounter = dispatch.RedeliveryCounter;
-                    }
-
-                    dispatcher.Dispatch(dispatch);
-
-                    return;
-                }
-            }
-
-            Tracer.Error("No such consumer active: " + dispatch.ConsumerId);
-        }
-
-        protected void OnKeepAliveCommand(ITransport commandTransport, KeepAliveInfo info)
-        {
-            Tracer.Info("Keep alive message received.");
-
-            try
-            {
-                if(connected.Value)
-                {
-                    Tracer.Info("Returning KeepAliveInfo Response.");
-                    info.ResponseRequired = false;
-                    transport.Oneway(info);
-                }
-            }
-            catch(Exception ex)
-            {
-                if(!closing.Value && !closed.Value)
-                {
-                    OnException(ex);
-                }
-            }
-        }
-
-        internal void OnAsyncException(Exception error)
-        {
-            if(!this.closed.Value && !this.closing.Value)
-            {
-                if(this.ExceptionListener != null)
-                {
-                    if(!(error is NMSException))
-                    {
-                        error = NMSExceptionSupport.Create(error);
-                    }
-                    NMSException e = (NMSException)error;
-
-                    // Called in another thread so that processing can continue
-                    // here, ensures no lock contention.
-                    executor.QueueUserWorkItem(AsyncCallExceptionListener, e);
-                }
-                else
-                {
-                    Tracer.Debug("Async exception with no exception listener: " + error);
-                }
-            }
-        }
-
-        private void AsyncCallExceptionListener(object error)
-        {
-            NMSException exception = error as NMSException;
-            this.ExceptionListener(exception);
-        }
-
-        internal void OnTransportException(ITransport source, Exception cause)
-        {
-            this.OnException(cause);
-        }
-
-        internal void OnException(Exception error)
-        {
-            // Will fire an exception listener callback if there's any set.
-            OnAsyncException(error);
-
-            if(!this.closing.Value && !this.closed.Value)
-            {
-                // Perform the actual work in another thread to avoid lock contention
-                // and allow the caller to continue on in its error cleanup.
-                executor.QueueUserWorkItem(AsyncOnExceptionHandler, error);
-            }
-        }
-
-        private void AsyncOnExceptionHandler(object error)
-        {
-            Exception cause = error as Exception;
-
-            MarkTransportFailed(cause);
-
-            try
-            {
-                this.transport.Dispose();
-            }
-            catch(Exception ex)
-            {
-                Tracer.Debug("Caught Exception While disposing of Transport: " + ex);
-            }
-
-            this.brokerInfoReceived.countDown();
-
-            IList sessionsCopy = null;
-            lock(this.sessions.SyncRoot)
-            {
-                sessionsCopy = new ArrayList(this.sessions);
-            }
-
-            // Use a copy so we don't concurrently modify the Sessions list if the
-            // client is closing at the same time.
-            foreach(Session session in sessionsCopy)
-            {
-                try
-                {
-                    session.Shutdown();
-                }
-                catch(Exception ex)
-                {
-                    Tracer.Debug("Caught Exception While disposing of Sessions: " + ex);
-                }
-            }
-        }
-
-        private void MarkTransportFailed(Exception error)
-        {
-            this.transportFailed.Value = true;
-            if(this.firstFailureError == null)
-            {
-                this.firstFailureError = error;
-            }
-        }
-
-        protected void OnTransportInterrupted(ITransport sender)
-        {
-            Tracer.Debug("Connection: Transport has been Interrupted.");
-
-            this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.Count);
-            if(Tracer.IsDebugEnabled)
-            {
-                Tracer.Debug("transport interrupted, dispatchers: " + dispatchers.Count);
-            }
-
-            SignalInterruptionProcessingNeeded();
-
-            foreach(Session session in this.sessions)
-            {
-                try
-                {
-                    session.ClearMessagesInProgress();
-                }
-                catch(Exception ex)
-                {
-                    Tracer.Warn("Exception while clearing messages: " + ex.Message);
-                    Tracer.Warn(ex.StackTrace);
-                }
-            }
-
-            if(this.ConnectionInterruptedListener != null && !this.closing.Value)
-            {
-                try
-                {
-                    this.ConnectionInterruptedListener();
-                }
-                catch
-                {
-                }
-            }
-        }
-
-        protected void OnTransportResumed(ITransport sender)
-        {
-            Tracer.Debug("Transport has resumed normal operation.");
-
-            if(this.ConnectionResumedListener != null && !this.closing.Value)
-            {
-                try
-                {
-                    this.ConnectionResumedListener();
-                }
-                catch
-                {
-                }
-            }
-        }
-
-        internal void OnSessionException(Session sender, Exception exception)
-        {
-            if(ExceptionListener != null)
-            {
-                try
-                {
-                    ExceptionListener(exception);
-                }
-                catch
-                {
-                    sender.Close();
-                }
-            }
-        }
-
-        /// <summary>
-        /// Creates a new local transaction ID
-        /// </summary>
-        public LocalTransactionId CreateLocalTransactionId()
-        {
-            LocalTransactionId id = new LocalTransactionId();
-            id.ConnectionId = ConnectionId;
-            id.Value = Interlocked.Increment(ref localTransactionCounter);
-            return id;
-        }
-
-        protected SessionId NextSessionId
-        {
-            get { return new SessionId(this.info.ConnectionId, Interlocked.Increment(ref this.sessionCounter)); }
-        }
-
-        public ActiveMQTempDestination CreateTemporaryDestination(bool topic)
-        {
-            ActiveMQTempDestination destination = null;
-
-            if(topic)
-            {
-                destination = new ActiveMQTempTopic(
-                    info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter));
-            }
-            else
-            {
-                destination = new ActiveMQTempQueue(
-                    info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter));
-            }
-
-            DestinationInfo command = new DestinationInfo();
-            command.ConnectionId = ConnectionId;
-            command.OperationType = DestinationInfo.ADD_OPERATION_TYPE; // 0 is add
-            command.Destination = destination;
-
-            this.SyncRequest(command);
-
-            destination.Connection = this;
-
-            return destination;
-        }
-
-        protected void CreateTemporaryDestination(ActiveMQDestination tempDestination)
-        {
-        }
-
-        public void DeleteTemporaryDestination(IDestination destination)
-        {
-            this.DeleteDestination(destination);
-        }
-
-        public void DeleteDestination(IDestination destination)
-        {
-            DestinationInfo command = new DestinationInfo();
-            command.ConnectionId = this.ConnectionId;
-            command.OperationType = DestinationInfo.REMOVE_OPERATION_TYPE; // 1 is remove
-            command.Destination = (ActiveMQDestination) destination;
-
-            this.Oneway(command);
-        }
-
-        private void WaitForTransportInterruptionProcessingToComplete()
-        {
-            CountDownLatch cdl = this.transportInterruptionProcessingComplete;
-            if(cdl != null)
-            {
-                if(!closed.Value && cdl.Remaining > 0)
-                {
-                    Tracer.Warn("dispatch paused, waiting for outstanding dispatch interruption " +
-                                "processing (" + cdl.Remaining + ") to complete..");
-                    cdl.await(TimeSpan.FromSeconds(10));
-                }
-
-                SignalInterruptionProcessingComplete();
-            }
-        }
-
-        internal void TransportInterruptionProcessingComplete()
-        {
-            CountDownLatch cdl = this.transportInterruptionProcessingComplete;
-            if(cdl != null)
-            {
-                cdl.countDown();
-                try
-                {
-                    SignalInterruptionProcessingComplete();
-                }
-                catch
-                {
-                }
-            }
-        }
-
-        private void SignalInterruptionProcessingComplete()
-        {
-            CountDownLatch cdl = this.transportInterruptionProcessingComplete;
-            if(cdl.Remaining == 0)
-            {
-                if(Tracer.IsDebugEnabled)
-                {
-                    Tracer.Debug("transportInterruptionProcessingComplete for: " + this.info.ConnectionId);
-                }
-
-                this.transportInterruptionProcessingComplete = null;
-
-                FailoverTransport failoverTransport = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
-                if(failoverTransport != null)
-                {
-                    failoverTransport.ConnectionInterruptProcessingComplete(this.info.ConnectionId);
-                    if(Tracer.IsDebugEnabled)
-                    {
-                        Tracer.Debug("notified failover transport (" + failoverTransport +
-                                     ") of interruption completion for: " + this.info.ConnectionId);
-                    }
-                }
-            }
-        }
-
-        private void SignalInterruptionProcessingNeeded()
-        {
-            FailoverTransport failoverTransport = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
-
-            if(failoverTransport != null)
-            {
-                failoverTransport.StateTracker.TransportInterrupted(this.info.ConnectionId);
-                if(Tracer.IsDebugEnabled)
-                {
-                    Tracer.Debug("notified failover transport (" + failoverTransport +
-                                 ") of pending interruption processing for: " + this.info.ConnectionId);
-                }
-            }
-        }
-    }
+					Tracer.Info("Oneway command sent to broker.");
+				}
+				else
+				{
+					// Ensure that the object is disposed to avoid potential race-conditions
+					// of trying to re-create the same object in the broker faster than
+					// the broker can dispose of the object.  Allow up to 5 seconds to process.
+					Tracer.Info("Synchronously disposing of Connection.");
+					SyncRequest(command, TimeSpan.FromSeconds(5));
+					Tracer.Info("Synchronously closed Connection.");
+				}
+			}
+			catch // (BrokerException)
+			{
+				// Ignore exceptions while shutting down.
+			}
+		}
+
+		private object checkConnectedLock = new object();
+
+		/// <summary>
+		/// Check and ensure that the connection objcet is connected.  If it is not
+		/// connected or is closed, a ConnectionClosedException is thrown.
+		/// </summary>
+		internal void CheckConnected()
+		{
+			if(closed.Value)
+			{
+				throw new ConnectionClosedException();
+			}
+
+			if(!connected.Value)
+			{
+				DateTime timeoutTime = DateTime.Now + this.RequestTimeout;
+				int waitCount = 1;
+
+				while(true)
+				{
+					if(Monitor.TryEnter(checkConnectedLock))
+					{
+						try
+						{
+							if(!connected.Value)
+							{
+								if(!this.userSpecifiedClientID)
+								{
+									this.info.ClientId = this.clientIdGenerator.GenerateId();
+								}
+
+								try
+								{
+									if(null != transport)
+									{
+										// Send the connection and see if an ack/nak is returned.
+										Response response = transport.Request(this.info, this.RequestTimeout);
+										if(!(response is ExceptionResponse))
+										{
+											connected.Value = true;
+										}
+									}
+								}
+								catch
+								{
+								}
+							}
+						}
+						finally
+						{
+							Monitor.Exit(checkConnectedLock);
+						}
+					}
+
+					if(connected.Value || DateTime.Now > timeoutTime)
+					{
+						break;
+					}
+
+					// Back off from being overly aggressive.  Having too many threads
+					// aggressively trying to connect to a down broker pegs the CPU.
+					Thread.Sleep(5 * (waitCount++));
+				}
+
+				if(!connected.Value)
+				{
+					throw new ConnectionClosedException();
+				}
+			}
+		}
+
+		/// <summary>
+		/// Handle incoming commands
+		/// </summary>
+		/// <param name="commandTransport">An ITransport</param>
+		/// <param name="command">A  Command</param>
+		protected void OnCommand(ITransport commandTransport, Command command)
+		{
+			if(command.IsMessageDispatch)
+			{
+				WaitForTransportInterruptionProcessingToComplete();
+				DispatchMessage((MessageDispatch) command);
+			}
+			else if(command.IsKeepAliveInfo)
+			{
+				OnKeepAliveCommand(commandTransport, (KeepAliveInfo) command);
+			}
+			else if(command.IsWireFormatInfo)
+			{
+				this.brokerWireFormatInfo = (WireFormatInfo) command;
+			}
+			else if(command.IsBrokerInfo)
+			{
+				this.brokerInfo = (BrokerInfo) command;
+				this.brokerInfoReceived.countDown();
+			}
+			else if(command.IsShutdownInfo)
+			{
+				if(!closing.Value && !closed.Value)
+				{
+					OnException(new NMSException("Broker closed this connection."));
+				}
+			}
+			else if(command.IsProducerAck)
+			{
+				ProducerAck ack = (ProducerAck) command as ProducerAck;
+				if(ack.ProducerId != null)
+				{
+					MessageProducer producer = producers[ack.ProducerId] as MessageProducer;
+					if(producer != null)
+					{
+						if(Tracer.IsDebugEnabled)
+						{
+							Tracer.Debug("Connection: Received a new ProducerAck -> " + ack);
+						}
+
+						producer.OnProducerAck(ack);
+					}
+				}
+			}
+			else if(command.IsConnectionError)
+			{
+				if(!closing.Value && !closed.Value)
+				{
+					ConnectionError connectionError = (ConnectionError) command;
+					BrokerError brokerError = connectionError.Exception;
+					string message = "Broker connection error.";
+					string cause = "";
+
+					if(null != brokerError)
+					{
+						message = brokerError.Message;
+						if(null != brokerError.Cause)
+						{
+							cause = brokerError.Cause.Message;
+						}
+					}
+
+					OnException(new NMSConnectionException(message, cause));
+				}
+			}
+			else
+			{
+				Tracer.Error("Unknown command: " + command);
+			}
+		}
+
+		protected void DispatchMessage(MessageDispatch dispatch)
+		{
+			lock(dispatchers.SyncRoot)
+			{
+				if(dispatchers.Contains(dispatch.ConsumerId))
+				{
+					IDispatcher dispatcher = (IDispatcher) dispatchers[dispatch.ConsumerId];
+
+					// Can be null when a consumer has sent a MessagePull and there was
+					// no available message at the broker to dispatch or when signalled
+					// that the end of a Queue browse has been reached.
+					if(dispatch.Message != null)
+					{
+						dispatch.Message.ReadOnlyBody = true;
+						dispatch.Message.ReadOnlyProperties = true;
+						dispatch.Message.RedeliveryCounter = dispatch.RedeliveryCounter;
+					}
+
+					dispatcher.Dispatch(dispatch);
+
+					return;
+				}
+			}
+
+			Tracer.Error("No such consumer active: " + dispatch.ConsumerId);
+		}
+
+		protected void OnKeepAliveCommand(ITransport commandTransport, KeepAliveInfo info)
+		{
+			Tracer.Info("Keep alive message received.");
+
+			try
+			{
+				if(connected.Value)
+				{
+					Tracer.Info("Returning KeepAliveInfo Response.");
+					info.ResponseRequired = false;
+					transport.Oneway(info);
+				}
+			}
+			catch(Exception ex)
+			{
+				if(!closing.Value && !closed.Value)
+				{
+					OnException(ex);
+				}
+			}
+		}
+
+		internal void OnAsyncException(Exception error)
+		{
+			if(!this.closed.Value && !this.closing.Value)
+			{
+				if(this.ExceptionListener != null)
+				{
+					if(!(error is NMSException))
+					{
+						error = NMSExceptionSupport.Create(error);
+					}
+					NMSException e = (NMSException) error;
+
+					// Called in another thread so that processing can continue
+					// here, ensures no lock contention.
+					executor.QueueUserWorkItem(AsyncCallExceptionListener, e);
+				}
+				else
+				{
+					Tracer.Debug("Async exception with no exception listener: " + error);
+				}
+			}
+		}
+
+		private void AsyncCallExceptionListener(object error)
+		{
+			NMSException exception = error as NMSException;
+			this.ExceptionListener(exception);
+		}
+
+		internal void OnTransportException(ITransport source, Exception cause)
+		{
+			this.OnException(cause);
+		}
+
+		internal void OnException(Exception error)
+		{
+			// Will fire an exception listener callback if there's any set.
+			OnAsyncException(error);
+
+			if(!this.closing.Value && !this.closed.Value)
+			{
+				// Perform the actual work in another thread to avoid lock contention
+				// and allow the caller to continue on in its error cleanup.
+				executor.QueueUserWorkItem(AsyncOnExceptionHandler, error);
+			}
+		}
+
+		private void AsyncOnExceptionHandler(object error)
+		{
+			Exception cause = error as Exception;
+
+			MarkTransportFailed(cause);
+
+			try
+			{
+				this.transport.Dispose();
+			}
+			catch(Exception ex)
+			{
+				Tracer.Debug("Caught Exception While disposing of Transport: " + ex);
+			}
+
+			this.brokerInfoReceived.countDown();
+
+			IList sessionsCopy = null;
+			lock(this.sessions.SyncRoot)
+			{
+				sessionsCopy = new ArrayList(this.sessions);
+			}
+
+			// Use a copy so we don't concurrently modify the Sessions list if the
+			// client is closing at the same time.
+			foreach(Session session in sessionsCopy)
+			{
+				try
+				{
+					session.Shutdown();
+				}
+				catch(Exception ex)
+				{
+					Tracer.Debug("Caught Exception While disposing of Sessions: " + ex);
+				}
+			}
+		}
+
+		private void MarkTransportFailed(Exception error)
+		{
+			this.transportFailed.Value = true;
+			if(this.firstFailureError == null)
+			{
+				this.firstFailureError = error;
+			}
+		}
+
+		protected void OnTransportInterrupted(ITransport sender)
+		{
+			Tracer.Debug("Connection: Transport has been Interrupted.");
+
+			this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.Count);
+			if(Tracer.IsDebugEnabled)
+			{
+				Tracer.Debug("transport interrupted, dispatchers: " + dispatchers.Count);
+			}
+
+			SignalInterruptionProcessingNeeded();
+
+			foreach(Session session in this.sessions)
+			{
+				try
+				{
+					session.ClearMessagesInProgress();
+				}
+				catch(Exception ex)
+				{
+					Tracer.Warn("Exception while clearing messages: " + ex.Message);
+					Tracer.Warn(ex.StackTrace);
+				}
+			}
+
+			if(this.ConnectionInterruptedListener != null && !this.closing.Value)
+			{
+				try
+				{
+					this.ConnectionInterruptedListener();
+				}
+				catch
+				{
+				}
+			}
+		}
+
+		protected void OnTransportResumed(ITransport sender)
+		{
+			Tracer.Debug("Transport has resumed normal operation.");
+
+			if(this.ConnectionResumedListener != null && !this.closing.Value)
+			{
+				try
+				{
+					this.ConnectionResumedListener();
+				}
+				catch
+				{
+				}
+			}
+		}
+
+		internal void OnSessionException(Session sender, Exception exception)
+		{
+			if(ExceptionListener != null)
+			{
+				try
+				{
+					ExceptionListener(exception);
+				}
+				catch
+				{
+					sender.Close();
+				}
+			}
+		}
+
+		/// <summary>
+		/// Creates a new local transaction ID
+		/// </summary>
+		public LocalTransactionId CreateLocalTransactionId()
+		{
+			LocalTransactionId id = new LocalTransactionId();
+			id.ConnectionId = ConnectionId;
+			id.Value = Interlocked.Increment(ref localTransactionCounter);
+			return id;
+		}
+
+		protected SessionId NextSessionId
+		{
+			get { return new SessionId(this.info.ConnectionId, Interlocked.Increment(ref this.sessionCounter)); }
+		}
+
+		public ActiveMQTempDestination CreateTemporaryDestination(bool topic)
+		{
+			ActiveMQTempDestination destination = null;
+
+			if(topic)
+			{
+				destination = new ActiveMQTempTopic(
+					info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter));
+			}
+			else
+			{
+				destination = new ActiveMQTempQueue(
+					info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter));
+			}
+
+			DestinationInfo command = new DestinationInfo();
+			command.ConnectionId = ConnectionId;
+			command.OperationType = DestinationInfo.ADD_OPERATION_TYPE; // 0 is add
+			command.Destination = destination;
+
+			this.SyncRequest(command);
+
+			destination.Connection = this;
+
+			return destination;
+		}
+
+		protected void CreateTemporaryDestination(ActiveMQDestination tempDestination)
+		{
+		}
+
+		public void DeleteTemporaryDestination(IDestination destination)
+		{
+			this.DeleteDestination(destination);
+		}
+
+		public void DeleteDestination(IDestination destination)
+		{
+			DestinationInfo command = new DestinationInfo();
+			command.ConnectionId = this.ConnectionId;
+			command.OperationType = DestinationInfo.REMOVE_OPERATION_TYPE; // 1 is remove
+			command.Destination = (ActiveMQDestination) destination;
+
+			this.Oneway(command);
+		}
+
+		private void WaitForTransportInterruptionProcessingToComplete()
+		{
+			CountDownLatch cdl = this.transportInterruptionProcessingComplete;
+			if(cdl != null)
+			{
+				if(!closed.Value && cdl.Remaining > 0)
+				{
+					Tracer.Warn("dispatch paused, waiting for outstanding dispatch interruption " +
+								"processing (" + cdl.Remaining + ") to complete..");
+					cdl.await(TimeSpan.FromSeconds(10));
+				}
+
+				SignalInterruptionProcessingComplete();
+			}
+		}
+
+		internal void TransportInterruptionProcessingComplete()
+		{
+			CountDownLatch cdl = this.transportInterruptionProcessingComplete;
+			if(cdl != null)
+			{
+				cdl.countDown();
+				try
+				{
+					SignalInterruptionProcessingComplete();
+				}
+				catch
+				{
+				}
+			}
+		}
+
+		private void SignalInterruptionProcessingComplete()
+		{
+			CountDownLatch cdl = this.transportInterruptionProcessingComplete;
+			if(cdl.Remaining == 0)
+			{
+				if(Tracer.IsDebugEnabled)
+				{
+					Tracer.Debug("transportInterruptionProcessingComplete for: " + this.info.ConnectionId);
+				}
+
+				this.transportInterruptionProcessingComplete = null;
+
+				FailoverTransport failoverTransport = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
+				if(failoverTransport != null)
+				{
+					failoverTransport.ConnectionInterruptProcessingComplete(this.info.ConnectionId);
+					if(Tracer.IsDebugEnabled)
+					{
+						Tracer.Debug("notified failover transport (" + failoverTransport +
+									 ") of interruption completion for: " + this.info.ConnectionId);
+					}
+				}
+			}
+		}
+
+		private void SignalInterruptionProcessingNeeded()
+		{
+			FailoverTransport failoverTransport = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
+
+			if(failoverTransport != null)
+			{
+				failoverTransport.StateTracker.TransportInterrupted(this.info.ConnectionId);
+				if(Tracer.IsDebugEnabled)
+				{
+					Tracer.Debug("notified failover transport (" + failoverTransport +
+								 ") of pending interruption processing for: " + this.info.ConnectionId);
+				}
+			}
+		}
+	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionState.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionState.cs?rev=1137081&r1=1137080&r2=1137081&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionState.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionState.cs Fri Jun 17 23:43:33 2011
@@ -83,7 +83,14 @@ namespace Apache.NMS.ActiveMQ.State
 		{
 			get
 			{
-				return transactions[id];
+				TransactionState state;
+
+				if(transactions.TryGetValue(id, out state))
+				{
+					return state;
+				}
+
+				return null;
 			}
 		}
 
@@ -99,32 +106,33 @@ namespace Apache.NMS.ActiveMQ.State
 		{
 			get
 			{
-				#if DEBUG
-				try
+				SessionState sessionState;
+
+				if(sessions.TryGetValue(id, out sessionState))
 				{
-				#endif
-					return sessions[id];
-				#if DEBUG
+					return sessionState;
 				}
-				catch(System.Collections.Generic.KeyNotFoundException)
+
+#if DEBUG
+				// Useful for dignosing missing session ids
+				string sessionList = string.Empty;
+				foreach(SessionId sessionId in sessions.Keys)
 				{
-					// Useful for dignosing missing session ids
-					string sessionList = string.Empty;
-					foreach(SessionId sessionId in sessions.Keys)
-					{
-						sessionList += sessionId.ToString() + "\n";
-					}
-					System.Diagnostics.Debug.Assert(false,
-						string.Format("Session '{0}' did not exist in the sessions collection.\n\nSessions:-\n{1}", id, sessionList));
-					throw;
+					sessionList += sessionId.ToString() + "\n";
 				}
-				#endif
+
+				System.Diagnostics.Debug.Assert(false,
+					string.Format("Session '{0}' did not exist in the sessions collection.\n\nSessions:-\n{1}", id, sessionList));
+#endif
+				return null;
 			}
 		}
 
 		public TransactionState removeTransactionState(TransactionId id)
 		{
-			TransactionState ret = transactions[id];
+			TransactionState ret = null;
+
+			transactions.TryGetValue(id, out ret);
 			transactions.Remove(id);
 			return ret;
 		}
@@ -137,41 +145,31 @@ namespace Apache.NMS.ActiveMQ.State
 
 		public SessionState removeSession(SessionId id)
 		{
-			SessionState ret = sessions[id];
+			SessionState ret = null;
+
+			sessions.TryGetValue(id, out ret);
 			sessions.Remove(id);
 			return ret;
 		}
 
 		public ConnectionInfo Info
 		{
-			get
-			{
-				return info;
-			}
+			get { return info; }
 		}
 
 		public AtomicCollection<SessionId> SessionIds
 		{
-			get
-			{
-				return sessions.Keys;
-			}
+			get { return sessions.Keys; }
 		}
 
 		public AtomicCollection<DestinationInfo> TempDestinations
 		{
-			get
-			{
-				return tempDestinations;
-			}
+			get { return tempDestinations; }
 		}
 
 		public AtomicCollection<SessionState> SessionStates
 		{
-			get
-			{
-				return sessions.Values;
-			}
+			get { return sessions.Values; }
 		}
 
 		private void checkShutdown()

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionStateTracker.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionStateTracker.cs?rev=1137081&r1=1137080&r2=1137081&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionStateTracker.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionStateTracker.cs Fri Jun 17 23:43:33 2011
@@ -17,7 +17,6 @@
 
 using System;
 using System.Collections.Generic;
-
 using Apache.NMS.ActiveMQ.Commands;
 using Apache.NMS.ActiveMQ.Transport;
 
@@ -70,9 +69,12 @@ namespace Apache.NMS.ActiveMQ.State
 
             public override void Run()
             {
-                ConnectionId connectionId = info.ConnectionId;
-                ConnectionState cs = cst.connectionStates[connectionId];
-                cs.removeTransactionState(info.TransactionId);
+                ConnectionState cs;
+
+				if(cst.connectionStates.TryGetValue(info.ConnectionId, out cs))
+				{
+					cs.removeTransactionState(info.TransactionId);
+				}
             }
         }
 
@@ -180,11 +182,15 @@ namespace Apache.NMS.ActiveMQ.State
             // Restore the session's consumers but possibly in pull only (prefetch 0 state) till
             // recovery completes.
 
-            ConnectionState connectionState = connectionStates[sessionState.Info.SessionId.ParentId];
-            bool connectionInterruptionProcessingComplete =
-                connectionState.ConnectionInterruptProcessingComplete;
+			ConnectionState connectionState = null;
+			bool connectionInterruptionProcessingComplete = false;
 
-            // Restore the session's consumers
+			if(connectionStates.TryGetValue(sessionState.Info.SessionId.ParentId, out connectionState))
+			{
+				connectionInterruptionProcessingComplete = connectionState.ConnectionInterruptProcessingComplete;
+			}
+			
+			// Restore the session's consumers
             foreach(ConsumerState consumerState in sessionState.ConsumerStates)
             {
                 ConsumerInfo infoToSend = consumerState.Info;
@@ -192,7 +198,7 @@ namespace Apache.NMS.ActiveMQ.State
                 if(!connectionInterruptionProcessingComplete && infoToSend.PrefetchSize > 0 && transport.WireFormat.Version > 5)
                 {
                     infoToSend = consumerState.Info.Clone() as ConsumerInfo;
-                    connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, consumerState.Info);
+					connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, consumerState.Info);
                     infoToSend.PrefetchSize = 0;
                     if(Tracer.IsDebugEnabled)
                     {
@@ -239,24 +245,25 @@ namespace Apache.NMS.ActiveMQ.State
 
         public override Response processAddDestination(DestinationInfo info)
         {
-            if(info != null)
+            if(info != null && info.Destination.IsTemporary)
             {
-                ConnectionState cs = connectionStates[info.ConnectionId];
-                if(cs != null && info.Destination.IsTemporary)
-                {
-                    cs.addTempDestination(info);
-                }
+                ConnectionState cs;
+
+				if(connectionStates.TryGetValue(info.ConnectionId, out cs))
+				{
+					cs.addTempDestination(info);
+				}
             }
             return TRACKED_RESPONSE_MARKER;
         }
 
         public override Response processRemoveDestination(DestinationInfo info)
         {
-            if(info != null)
+            if(info != null && info.Destination.IsTemporary)
             {
-                ConnectionState cs = connectionStates[info.ConnectionId];
-                if(cs != null && info.Destination.IsTemporary)
-                {
+                ConnectionState cs;
+				if(connectionStates.TryGetValue(info.ConnectionId, out cs))
+				{
                     cs.removeTempDestination(info.Destination);
                 }
             }
@@ -273,8 +280,9 @@ namespace Apache.NMS.ActiveMQ.State
                     ConnectionId connectionId = sessionId.ParentId;
                     if(connectionId != null)
                     {
-                        ConnectionState cs = connectionStates[connectionId];
-                        if(cs != null)
+                        ConnectionState cs;
+						
+						if(connectionStates.TryGetValue(connectionId, out cs))
                         {
                             SessionState ss = cs[sessionId];
                             if(ss != null)
@@ -298,8 +306,9 @@ namespace Apache.NMS.ActiveMQ.State
                     ConnectionId connectionId = sessionId.ParentId;
                     if(connectionId != null)
                     {
-                        ConnectionState cs = connectionStates[connectionId];
-                        if(cs != null)
+						ConnectionState cs = null;
+						
+						if(connectionStates.TryGetValue(connectionId, out cs))
                         {
                             SessionState ss = cs[sessionId];
                             if(ss != null)
@@ -323,8 +332,9 @@ namespace Apache.NMS.ActiveMQ.State
                     ConnectionId connectionId = sessionId.ParentId;
                     if(connectionId != null)
                     {
-                        ConnectionState cs = connectionStates[connectionId];
-                        if(cs != null)
+						ConnectionState cs = null;
+
+						if(connectionStates.TryGetValue(connectionId, out cs))
                         {
                             SessionState ss = cs[sessionId];
                             if(ss != null)
@@ -348,8 +358,9 @@ namespace Apache.NMS.ActiveMQ.State
                     ConnectionId connectionId = sessionId.ParentId;
                     if(connectionId != null)
                     {
-                        ConnectionState cs = connectionStates[connectionId];
-                        if(cs != null)
+						ConnectionState cs = null;
+
+						if(connectionStates.TryGetValue(connectionId, out cs))
                         {
                             SessionState ss = cs[sessionId];
                             if(ss != null)
@@ -370,8 +381,9 @@ namespace Apache.NMS.ActiveMQ.State
                 ConnectionId connectionId = info.SessionId.ParentId;
                 if(connectionId != null)
                 {
-                    ConnectionState cs = connectionStates[connectionId];
-                    if(cs != null)
+					ConnectionState cs = null;
+
+					if(connectionStates.TryGetValue(connectionId, out cs))
                     {
                         cs.addSession(info);
                     }
@@ -387,8 +399,9 @@ namespace Apache.NMS.ActiveMQ.State
                 ConnectionId connectionId = id.ParentId;
                 if(connectionId != null)
                 {
-                    ConnectionState cs = connectionStates[connectionId];
-                    if(cs != null)
+					ConnectionState cs = null;
+
+					if(connectionStates.TryGetValue(connectionId, out cs))
                     {
                         cs.removeSession(id);
                     }
@@ -424,8 +437,9 @@ namespace Apache.NMS.ActiveMQ.State
                     ConnectionId connectionId = send.ProducerId.ParentId.ParentId;
                     if(connectionId != null)
                     {
-                        ConnectionState cs = connectionStates[connectionId];
-                        if(cs != null)
+						ConnectionState cs = null;
+
+						if(connectionStates.TryGetValue(connectionId, out cs))
                         {
                             TransactionState transactionState = cs[send.TransactionId];
                             if(transactionState != null)
@@ -452,8 +466,9 @@ namespace Apache.NMS.ActiveMQ.State
                 ConnectionId connectionId = ack.ConsumerId.ParentId.ParentId;
                 if(connectionId != null)
                 {
-                    ConnectionState cs = connectionStates[connectionId];
-                    if(cs != null)
+					ConnectionState cs = null;
+
+					if(connectionStates.TryGetValue(connectionId, out cs))
                     {
                         TransactionState transactionState = cs[ack.TransactionId];
                         if(transactionState != null)
@@ -474,8 +489,9 @@ namespace Apache.NMS.ActiveMQ.State
                 ConnectionId connectionId = info.ConnectionId;
                 if(connectionId != null)
                 {
-                    ConnectionState cs = connectionStates[connectionId];
-                    if(cs != null)
+					ConnectionState cs = null;
+
+					if(connectionStates.TryGetValue(connectionId, out cs))
                     {
                         cs.addTransactionState(info.TransactionId);
                         TransactionState state = cs[info.TransactionId];
@@ -494,8 +510,9 @@ namespace Apache.NMS.ActiveMQ.State
                 ConnectionId connectionId = info.ConnectionId;
                 if(connectionId != null)
                 {
-                    ConnectionState cs = connectionStates[connectionId];
-                    if(cs != null)
+					ConnectionState cs = null;
+
+					if(connectionStates.TryGetValue(connectionId, out cs))
                     {
                         TransactionState transactionState = cs[info.TransactionId];
                         if(transactionState != null)
@@ -516,8 +533,9 @@ namespace Apache.NMS.ActiveMQ.State
                 ConnectionId connectionId = info.ConnectionId;
                 if(connectionId != null)
                 {
-                    ConnectionState cs = connectionStates[connectionId];
-                    if(cs != null)
+					ConnectionState cs = null;
+
+					if(connectionStates.TryGetValue(connectionId, out cs))
                     {
                         TransactionState transactionState = cs[info.TransactionId];
                         if(transactionState != null)
@@ -538,7 +556,8 @@ namespace Apache.NMS.ActiveMQ.State
                 ConnectionId connectionId = info.ConnectionId;
                 if(connectionId != null)
                 {
-                    ConnectionState cs = connectionStates[connectionId];
+					ConnectionState cs = null;
+
                     if(cs != null)
                     {
                         TransactionState transactionState = cs[info.TransactionId];
@@ -560,8 +579,9 @@ namespace Apache.NMS.ActiveMQ.State
                 ConnectionId connectionId = info.ConnectionId;
                 if(connectionId != null)
                 {
-                    ConnectionState cs = connectionStates[connectionId];
-                    if(cs != null)
+					ConnectionState cs = null;
+
+					if(connectionStates.TryGetValue(connectionId, out cs))
                     {
                         TransactionState transactionState = cs[info.TransactionId];
                         if(transactionState != null)
@@ -582,8 +602,9 @@ namespace Apache.NMS.ActiveMQ.State
                 ConnectionId connectionId = info.ConnectionId;
                 if(connectionId != null)
                 {
-                    ConnectionState cs = connectionStates[connectionId];
-                    if(cs != null)
+					ConnectionState cs = null;
+
+					if(connectionStates.TryGetValue(connectionId, out cs))
                     {
                         TransactionState transactionState = cs[info.TransactionId];
                         if(transactionState != null)
@@ -683,8 +704,9 @@ namespace Apache.NMS.ActiveMQ.State
 
         public void ConnectionInterruptProcessingComplete(ITransport transport, ConnectionId connectionId)
         {
-            ConnectionState connectionState = connectionStates[connectionId];
-            if(connectionState != null)
+			ConnectionState connectionState = null;
+
+			if(connectionStates.TryGetValue(connectionId, out connectionState))
             {
                 connectionState.ConnectionInterruptProcessingComplete = true;
 
@@ -719,8 +741,9 @@ namespace Apache.NMS.ActiveMQ.State
 
         public void TransportInterrupted(ConnectionId id)
         {
-            ConnectionState connection = connectionStates[id];
-            if(connection != null)
+			ConnectionState connection = null;
+
+			if(connectionStates.TryGetValue(id, out connection))
             {
                 connection.ConnectionInterruptProcessingComplete = false;
             }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/SynchronizedObjects.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/SynchronizedObjects.cs?rev=1137081&r1=1137080&r2=1137081&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/SynchronizedObjects.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/SynchronizedObjects.cs Fri Jun 17 23:43:33 2011
@@ -177,6 +177,14 @@ namespace Apache.NMS.ActiveMQ.State
 			}
 		}
 
+		public bool TryGetValue(TKey key, out TValue val)
+		{
+			lock(((ICollection) _dictionary).SyncRoot)
+			{
+				return _dictionary.TryGetValue(key, out val);
+			}
+		}
+
 		public AtomicCollection<TKey> Keys
 		{
 			get

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Transport/MutexTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Transport/MutexTransport.cs?rev=1137081&r1=1137080&r2=1137081&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Transport/MutexTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Transport/MutexTransport.cs Fri Jun 17 23:43:33 2011
@@ -32,6 +32,7 @@ namespace Apache.NMS.ActiveMQ.Transport
 			if(timeout > 0)
 			{
 				DateTime timeoutTime = DateTime.Now + TimeSpan.FromMilliseconds(timeout);
+				int waitCount = 1;
 
 				while(true)
 				{
@@ -45,7 +46,9 @@ namespace Apache.NMS.ActiveMQ.Transport
 						throw new IOException(string.Format("Oneway timed out after {0} milliseconds.", timeout));
 					}
 
-					Thread.Sleep(10);
+					// Back off from being overly aggressive.  Having too many threads
+					// aggressively trying to get the lock pegs the CPU.
+					Thread.Sleep(3 * (waitCount++));
 				}
 			}
 			else