You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/10/20 15:11:52 UTC

svn commit: r827427 - /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs

Author: tabish
Date: Tue Oct 20 13:11:51 2009
New Revision: 827427

URL: http://svn.apache.org/viewvc?rev=827427&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQNET-197

Always respond to KeepAliveInfo command whether the broker wants it or not so that we don't drop connection on 5.3 brokers.

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=827427&r1=827426&r2=827427&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Tue Oct 20 13:11:51 2009
@@ -24,406 +24,406 @@
 
 namespace Apache.NMS.ActiveMQ
 {
-	/// <summary>
-	/// Represents a connection with a message broker
-	/// </summary>
-	public class Connection : IConnection
-	{
-		private readonly Uri brokerUri;
-		private ITransport transport;
-		private readonly ConnectionInfo info;
-		private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
-		private TimeSpan requestTimeout;
-		private BrokerInfo brokerInfo; // from broker
-		private WireFormatInfo brokerWireFormatInfo; // from broker
-		private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
-		private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
-		private readonly object myLock = new object();
-		private bool asyncSend = false;
-		private bool alwaysSyncSend = false;
-		private bool asyncClose = true;
-		private bool copyMessageOnSend = true;
-		private int producerWindowSize = 0;
-		private bool connected = false;
-		private bool closed = false;
-		private bool closing = false;
-		private int sessionCounter = 0;
-		private int temporaryDestinationCounter = 0;
-		private int localTransactionCounter;
-		private readonly Atomic<bool> started = new Atomic<bool>(false);
-		private ConnectionMetaData metaData = null;
-		private bool disposed = false;
-
-		public Connection(Uri connectionUri, ITransport transport, ConnectionInfo info)
-		{
-			this.brokerUri = connectionUri;
-			this.info = info;
-			this.requestTimeout = transport.RequestTimeout;
-			this.transport = transport;
-			this.transport.Command = new CommandHandler(OnCommand);
-			this.transport.Exception = new ExceptionHandler(OnException);
-			this.transport.Interrupted = new InterruptedHandler(OnTransportInterrupted);
-			this.transport.Resumed = new ResumedHandler(OnTransportResumed);
-		}
-
-		~Connection()
-		{
-			Dispose(false);
-		}
-
-		/// <summary>
-		/// A delegate that can receive transport level exceptions.
-		/// </summary>
-		public event ExceptionListener ExceptionListener;
-
-		/// <summary>
-		/// An asynchronous listener that is notified when a Fault tolerant connection
-		/// has been interrupted.
-		/// </summary>
-		public event ConnectionInterruptedListener ConnectionInterruptedListener;
-
-		/// <summary>
-		/// An asynchronous listener that is notified when a Fault tolerant connection
-		/// has been resumed.
-		/// </summary>
-		public event ConnectionResumedListener ConnectionResumedListener;
-
-		#region Properties
-
-		/// <summary>
-		/// This property indicates what version of the Protocol we are using to
-		/// communicate with the Broker, if not set we return the lowest version
-		/// number to indicate we support only the basic command set.
-		/// </summary>
-		public int ProtocolVersion
-		{
-			get
-			{
-				if(brokerWireFormatInfo != null)
-				{
-					return brokerWireFormatInfo.Version;
-				}
-
-				return 1;
-			}
-		}
-
-		/// <summary>
-		/// This property indicates whether or not async send is enabled.
-		/// </summary>
-		public bool AsyncSend
-		{
-			get { return asyncSend; }
-			set { asyncSend = value; }
-		}
-
-		/// <summary>
-		/// This property indicates whether or not async close is enabled.
-		/// When the connection is closed, it will either send a synchronous
-		/// DisposeOf command to the broker and wait for confirmation (if true),
-		/// or it will send the DisposeOf command asynchronously.
-		/// </summary>
-		public bool AsyncClose
-		{
-			get { return asyncClose; }
-			set { asyncClose = value; }
-		}
-
-		/// <summary>
-		/// This property sets the acknowledgment mode for the connection.
-		/// The URI parameter connection.ackmode can be set to a string value
-		/// that maps to the enumeration value.
-		/// </summary>
-		public string AckMode
-		{
-			set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); }
-		}
-
-		/// <summary>
-		/// This property is the maximum number of bytes in memory that a producer will transmit
-		/// to a broker before waiting for acknowledgement messages from the broker that it has
-		/// accepted the previously sent messages. In other words, this how you configure the
-		/// producer flow control window that is used for async sends where the client is responsible
-		/// for managing memory usage. The default value of 0 means no flow control at the client
-		/// </summary>
-		public int ProducerWindowSize
-		{
-			get { return producerWindowSize; }
-			set { producerWindowSize = value; }
-		}
-
-		/// <summary>
-		/// This property forces all messages that are sent to be sent synchronously overriding
-		/// any usage of the AsyncSend flag. This can reduce performance in some cases since the
-		/// only messages we normally send synchronously are Persistent messages not sent in a
-		/// transaction. This options guarantees that no send will return until the broker has
-		/// acknowledge receipt of the message
-		/// </summary>
-		public bool AlwaysSyncSend
-		{
-			get { return alwaysSyncSend; }
-			set { alwaysSyncSend = value; }
-		}
-
-		/// <summary>
-		/// This property indicates whether Message's should be copied before being sent via
-		/// one of the Connection's send methods.  Copying the Message object allows the user
-		/// to resuse the Object over for another send.  If the message isn't copied performance
-		/// can improve but the user must not reuse the Object as it may not have been sent
-		/// before they reset its payload.
-		/// </summary>
-		public bool CopyMessageOnSend
-		{
-			get { return copyMessageOnSend; }
-			set { copyMessageOnSend = value; }
-		}
-
-		public IConnectionMetaData MetaData
-		{
-			get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); }
-		}
-
-		#endregion
-
-		/// <summary>
-		/// Starts asynchronous message delivery of incoming messages for this connection.
-		/// Synchronous delivery is unaffected.
-		/// </summary>
-		public void Start()
-		{
-			CheckConnected();
-			if(started.CompareAndSet(false, true))
-			{
-				lock(sessions.SyncRoot)
-				{
-					foreach(Session session in sessions)
-					{
-						session.StartAsyncDelivery();
-					}
-				}
-			}
-		}
-
-		/// <summary>
-		/// This property determines if the asynchronous message delivery of incoming
-		/// messages has been started for this connection.
-		/// </summary>
-		public bool IsStarted
-		{
-			get { return started.Value; }
-		}
-
-		/// <summary>
-		/// Temporarily stop asynchronous delivery of inbound messages for this connection.
-		/// The sending of outbound messages is unaffected.
-		/// </summary>
-		public void Stop()
-		{
-			CheckConnected();
-			if(started.CompareAndSet(true, false))
-			{
-				lock(sessions.SyncRoot)
-				{
-					foreach(Session session in sessions)
-					{
-						session.StopAsyncDelivery();
-					}
-				}
-			}
-		}
-
-		/// <summary>
-		/// Creates a new session to work on this connection
-		/// </summary>
-		public ISession CreateSession()
-		{
-			return CreateSession(acknowledgementMode);
-		}
-
-		/// <summary>
-		/// Creates a new session to work on this connection
-		/// </summary>
-		public ISession CreateSession(AcknowledgementMode sessionAcknowledgementMode)
-		{
-			SessionInfo info = CreateSessionInfo(sessionAcknowledgementMode);
-			SyncRequest(info, this.RequestTimeout);
-			Session session = new Session(this, info, sessionAcknowledgementMode);
-
-			// Set properties on session using parameters prefixed with "session."
-			URISupport.CompositeData c = URISupport.parseComposite(this.brokerUri);
-			URISupport.SetProperties(session, c.Parameters, "session.");
-
-			if(IsStarted)
-			{
-				session.StartAsyncDelivery();
-			}
-
-			sessions.Add(session);
-			return session;
-		}
-
-		public void RemoveSession(Session session)
-		{
-			DisposeOf(session.SessionId);
-
-			if(!this.closing)
-			{
-				sessions.Remove(session);
-			}
-		}
-
-		public void addProducer( ProducerId id, MessageProducer producer )
-		{
-			this.producers.Add( id, producer );
-		}
-
-		public void removeProducer( ProducerId id )
-		{
-			this.producers.Remove( id );
-		}
-
-		public void Close()
-		{
-			lock(myLock)
-			{
-				if(this.closed)
-				{
-					return;
-				}
-
-				try
-				{
-					Tracer.Info("Closing Connection.");
-					this.closing = true;
-					lock(sessions.SyncRoot)
-					{
-						foreach(Session session in sessions)
-						{
-							session.Close();
-						}
-					}
-					sessions.Clear();
-
-					if(connected)
-					{
-						DisposeOf(ConnectionId);
-						ShutdownInfo shutdowninfo = new ShutdownInfo();
-						shutdowninfo.ResponseRequired = false;
-						transport.Oneway(shutdowninfo);
-					}
+    /// <summary>
+    /// Represents a connection with a message broker
+    /// </summary>
+    public class Connection : IConnection
+    {
+        private readonly Uri brokerUri;
+        private ITransport transport;
+        private readonly ConnectionInfo info;
+        private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
+        private TimeSpan requestTimeout;
+        private BrokerInfo brokerInfo; // from broker
+        private WireFormatInfo brokerWireFormatInfo; // from broker
+        private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
+        private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
+        private readonly object myLock = new object();
+        private bool asyncSend = false;
+        private bool alwaysSyncSend = false;
+        private bool asyncClose = true;
+        private bool copyMessageOnSend = true;
+        private int producerWindowSize = 0;
+        private bool connected = false;
+        private bool closed = false;
+        private bool closing = false;
+        private int sessionCounter = 0;
+        private int temporaryDestinationCounter = 0;
+        private int localTransactionCounter;
+        private readonly Atomic<bool> started = new Atomic<bool>(false);
+        private ConnectionMetaData metaData = null;
+        private bool disposed = false;
+
+        public Connection(Uri connectionUri, ITransport transport, ConnectionInfo info)
+        {
+            this.brokerUri = connectionUri;
+            this.info = info;
+            this.requestTimeout = transport.RequestTimeout;
+            this.transport = transport;
+            this.transport.Command = new CommandHandler(OnCommand);
+            this.transport.Exception = new ExceptionHandler(OnException);
+            this.transport.Interrupted = new InterruptedHandler(OnTransportInterrupted);
+            this.transport.Resumed = new ResumedHandler(OnTransportResumed);
+        }
+
+        ~Connection()
+        {
+            Dispose(false);
+        }
+
+        /// <summary>
+        /// A delegate that can receive transport level exceptions.
+        /// </summary>
+        public event ExceptionListener ExceptionListener;
+
+        /// <summary>
+        /// An asynchronous listener that is notified when a Fault tolerant connection
+        /// has been interrupted.
+        /// </summary>
+        public event ConnectionInterruptedListener ConnectionInterruptedListener;
+
+        /// <summary>
+        /// An asynchronous listener that is notified when a Fault tolerant connection
+        /// has been resumed.
+        /// </summary>
+        public event ConnectionResumedListener ConnectionResumedListener;
+
+        #region Properties
+
+        /// <summary>
+        /// This property indicates what version of the Protocol we are using to
+        /// communicate with the Broker, if not set we return the lowest version
+        /// number to indicate we support only the basic command set.
+        /// </summary>
+        public int ProtocolVersion
+        {
+            get
+            {
+                if(brokerWireFormatInfo != null)
+                {
+                    return brokerWireFormatInfo.Version;
+                }
+
+                return 1;
+            }
+        }
+
+        /// <summary>
+        /// This property indicates whether or not async send is enabled.
+        /// </summary>
+        public bool AsyncSend
+        {
+            get { return asyncSend; }
+            set { asyncSend = value; }
+        }
+
+        /// <summary>
+        /// This property indicates whether or not async close is enabled.
+        /// When the connection is closed, it will either send a synchronous
+        /// DisposeOf command to the broker and wait for confirmation (if true),
+        /// or it will send the DisposeOf command asynchronously.
+        /// </summary>
+        public bool AsyncClose
+        {
+            get { return asyncClose; }
+            set { asyncClose = value; }
+        }
+
+        /// <summary>
+        /// This property sets the acknowledgment mode for the connection.
+        /// The URI parameter connection.ackmode can be set to a string value
+        /// that maps to the enumeration value.
+        /// </summary>
+        public string AckMode
+        {
+            set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); }
+        }
+
+        /// <summary>
+        /// This property is the maximum number of bytes in memory that a producer will transmit
+        /// to a broker before waiting for acknowledgement messages from the broker that it has
+        /// accepted the previously sent messages. In other words, this how you configure the
+        /// producer flow control window that is used for async sends where the client is responsible
+        /// for managing memory usage. The default value of 0 means no flow control at the client
+        /// </summary>
+        public int ProducerWindowSize
+        {
+            get { return producerWindowSize; }
+            set { producerWindowSize = value; }
+        }
+
+        /// <summary>
+        /// This property forces all messages that are sent to be sent synchronously overriding
+        /// any usage of the AsyncSend flag. This can reduce performance in some cases since the
+        /// only messages we normally send synchronously are Persistent messages not sent in a
+        /// transaction. This options guarantees that no send will return until the broker has
+        /// acknowledge receipt of the message
+        /// </summary>
+        public bool AlwaysSyncSend
+        {
+            get { return alwaysSyncSend; }
+            set { alwaysSyncSend = value; }
+        }
+
+        /// <summary>
+        /// This property indicates whether Message's should be copied before being sent via
+        /// one of the Connection's send methods.  Copying the Message object allows the user
+        /// to resuse the Object over for another send.  If the message isn't copied performance
+        /// can improve but the user must not reuse the Object as it may not have been sent
+        /// before they reset its payload.
+        /// </summary>
+        public bool CopyMessageOnSend
+        {
+            get { return copyMessageOnSend; }
+            set { copyMessageOnSend = value; }
+        }
+
+        public IConnectionMetaData MetaData
+        {
+            get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); }
+        }
+
+        #endregion
+
+        /// <summary>
+        /// Starts asynchronous message delivery of incoming messages for this connection.
+        /// Synchronous delivery is unaffected.
+        /// </summary>
+        public void Start()
+        {
+            CheckConnected();
+            if(started.CompareAndSet(false, true))
+            {
+                lock(sessions.SyncRoot)
+                {
+                    foreach(Session session in sessions)
+                    {
+                        session.StartAsyncDelivery();
+                    }
+                }
+            }
+        }
+
+        /// <summary>
+        /// This property determines if the asynchronous message delivery of incoming
+        /// messages has been started for this connection.
+        /// </summary>
+        public bool IsStarted
+        {
+            get { return started.Value; }
+        }
+
+        /// <summary>
+        /// Temporarily stop asynchronous delivery of inbound messages for this connection.
+        /// The sending of outbound messages is unaffected.
+        /// </summary>
+        public void Stop()
+        {
+            CheckConnected();
+            if(started.CompareAndSet(true, false))
+            {
+                lock(sessions.SyncRoot)
+                {
+                    foreach(Session session in sessions)
+                    {
+                        session.StopAsyncDelivery();
+                    }
+                }
+            }
+        }
+
+        /// <summary>
+        /// Creates a new session to work on this connection
+        /// </summary>
+        public ISession CreateSession()
+        {
+            return CreateSession(acknowledgementMode);
+        }
+
+        /// <summary>
+        /// Creates a new session to work on this connection
+        /// </summary>
+        public ISession CreateSession(AcknowledgementMode sessionAcknowledgementMode)
+        {
+            SessionInfo info = CreateSessionInfo(sessionAcknowledgementMode);
+            SyncRequest(info, this.RequestTimeout);
+            Session session = new Session(this, info, sessionAcknowledgementMode);
+
+            // Set properties on session using parameters prefixed with "session."
+            URISupport.CompositeData c = URISupport.parseComposite(this.brokerUri);
+            URISupport.SetProperties(session, c.Parameters, "session.");
+
+            if(IsStarted)
+            {
+                session.StartAsyncDelivery();
+            }
+
+            sessions.Add(session);
+            return session;
+        }
+
+        public void RemoveSession(Session session)
+        {
+            DisposeOf(session.SessionId);
+
+            if(!this.closing)
+            {
+                sessions.Remove(session);
+            }
+        }
+
+        public void addProducer( ProducerId id, MessageProducer producer )
+        {
+            this.producers.Add( id, producer );
+        }
+
+        public void removeProducer( ProducerId id )
+        {
+            this.producers.Remove( id );
+        }
+
+        public void Close()
+        {
+            lock(myLock)
+            {
+                if(this.closed)
+                {
+                    return;
+                }
+
+                try
+                {
+                    Tracer.Info("Closing Connection.");
+                    this.closing = true;
+                    lock(sessions.SyncRoot)
+                    {
+                        foreach(Session session in sessions)
+                        {
+                            session.Close();
+                        }
+                    }
+                    sessions.Clear();
+
+                    if(connected)
+                    {
+                        DisposeOf(ConnectionId);
+                        ShutdownInfo shutdowninfo = new ShutdownInfo();
+                        shutdowninfo.ResponseRequired = false;
+                        transport.Oneway(shutdowninfo);
+                    }
 
                     Tracer.Info("Disposing of the Transport.");
-					transport.Dispose();
-				}
-				catch(Exception ex)
-				{
-					Tracer.ErrorFormat("Error during connection close: {0}", ex);
-				}
-				finally
-				{
-					this.transport = null;
-					this.closed = true;
-					this.connected = false;
-					this.closing = false;
-				}
-			}
-		}
-
-		public void Dispose()
-		{
-			Dispose(true);
-			GC.SuppressFinalize(this);
-		}
-
-		protected void Dispose(bool disposing)
-		{
-			if(disposed)
-			{
-				return;
-			}
-
-			if(disposing)
-			{
-				// Dispose managed code here.
-			}
-
-			try
-			{
-				// For now we do not distinguish between Dispose() and Close().
-				// In theory Dispose should possibly be lighter-weight and perform a (faster)
-				// disorderly close.
-				Close();
-			}
-			catch
-			{
-				// Ignore network errors.
-			}
-
-			disposed = true;
-		}
-
-		// Properties
-
-		public Uri BrokerUri
-		{
-			get { return brokerUri; }
-		}
-
-		public ITransport ITransport
-		{
-			get { return transport; }
-			set { this.transport = value; }
-		}
-
-		public TimeSpan RequestTimeout
-		{
-			get { return this.requestTimeout; }
-			set { this.requestTimeout = value; }
-		}
-
-		public AcknowledgementMode AcknowledgementMode
-		{
-			get { return acknowledgementMode; }
-			set { this.acknowledgementMode = value; }
-		}
-
-		public string ClientId
-		{
-			get { return info.ClientId; }
-			set
-			{
-				if(connected)
-				{
-					throw new NMSException("You cannot change the ClientId once the Connection is connected");
-				}
-				info.ClientId = value;
-			}
-		}
-
-		public ConnectionId ConnectionId
-		{
-			get { return info.ConnectionId; }
-		}
-
-		public BrokerInfo BrokerInfo
-		{
-			get { return brokerInfo; }
-		}
-
-		public WireFormatInfo BrokerWireFormat
-		{
-			get { return brokerWireFormatInfo; }
-		}
-
-		// Implementation methods
-
-		/// <summary>
-		/// Performs a synchronous request-response with the broker
-		/// </summary>
-		///
+                    transport.Dispose();
+                }
+                catch(Exception ex)
+                {
+                    Tracer.ErrorFormat("Error during connection close: {0}", ex);
+                }
+                finally
+                {
+                    this.transport = null;
+                    this.closed = true;
+                    this.connected = false;
+                    this.closing = false;
+                }
+            }
+        }
 
-		public Response SyncRequest(Command command)
-		{
+        public void Dispose()
+        {
+            Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        protected void Dispose(bool disposing)
+        {
+            if(disposed)
+            {
+                return;
+            }
+
+            if(disposing)
+            {
+                // Dispose managed code here.
+            }
+
+            try
+            {
+                // For now we do not distinguish between Dispose() and Close().
+                // In theory Dispose should possibly be lighter-weight and perform a (faster)
+                // disorderly close.
+                Close();
+            }
+            catch
+            {
+                // Ignore network errors.
+            }
+
+            disposed = true;
+        }
+
+        // Properties
+
+        public Uri BrokerUri
+        {
+            get { return brokerUri; }
+        }
+
+        public ITransport ITransport
+        {
+            get { return transport; }
+            set { this.transport = value; }
+        }
+
+        public TimeSpan RequestTimeout
+        {
+            get { return this.requestTimeout; }
+            set { this.requestTimeout = value; }
+        }
+
+        public AcknowledgementMode AcknowledgementMode
+        {
+            get { return acknowledgementMode; }
+            set { this.acknowledgementMode = value; }
+        }
+
+        public string ClientId
+        {
+            get { return info.ClientId; }
+            set
+            {
+                if(connected)
+                {
+                    throw new NMSException("You cannot change the ClientId once the Connection is connected");
+                }
+                info.ClientId = value;
+            }
+        }
+
+        public ConnectionId ConnectionId
+        {
+            get { return info.ConnectionId; }
+        }
+
+        public BrokerInfo BrokerInfo
+        {
+            get { return brokerInfo; }
+        }
+
+        public WireFormatInfo BrokerWireFormat
+        {
+            get { return brokerWireFormatInfo; }
+        }
+
+        // Implementation methods
+
+        /// <summary>
+        /// Performs a synchronous request-response with the broker
+        /// </summary>
+        ///
+
+        public Response SyncRequest(Command command)
+        {
             try
             {
                 return SyncRequest(command, this.RequestTimeout);
@@ -432,32 +432,32 @@
             {
                 throw NMSExceptionSupport.Create(ex);
             }
-		}
+        }
 
-		public Response SyncRequest(Command command, TimeSpan requestTimeout)
-		{
-			CheckConnected();
+        public Response SyncRequest(Command command, TimeSpan requestTimeout)
+        {
+            CheckConnected();
 
             try
             {
-    			Response response = transport.Request(command, requestTimeout);
-    			if(response is ExceptionResponse)
-    			{
-    				ExceptionResponse exceptionResponse = (ExceptionResponse) response;
-    				BrokerError brokerError = exceptionResponse.Exception;
-    				throw new BrokerException(brokerError);
-    			}
-    			return response;
+                Response response = transport.Request(command, requestTimeout);
+                if(response is ExceptionResponse)
+                {
+                    ExceptionResponse exceptionResponse = (ExceptionResponse) response;
+                    BrokerError brokerError = exceptionResponse.Exception;
+                    throw new BrokerException(brokerError);
+                }
+                return response;
             }
             catch(Exception ex)
             {
                 throw NMSExceptionSupport.Create(ex);
             }
-		}
+        }
 
-		public void Oneway(Command command)
-		{
-			CheckConnected();
+        public void Oneway(Command command)
+        {
+            CheckConnected();
 
             try
             {
@@ -465,219 +465,217 @@
             }
             catch(Exception ex)
             {
-                throw NMSExceptionSupport.Create(ex);                
+                throw NMSExceptionSupport.Create(ex);
+            }
+        }
+
+        public void DisposeOf(DataStructure objectId)
+        {
+            try
+            {
+                RemoveInfo command = new RemoveInfo();
+                command.ObjectId = objectId;
+                if(asyncClose)
+                {
+                    Tracer.Info("Asynchronously disposing of Connection.");
+                    if(connected)
+                    {
+                        command.ResponseRequired = false;
+                        transport.Oneway(command);
+                    }
+                }
+                else
+                {
+                    // Ensure that the object is disposed to avoid potential race-conditions
+                    // of trying to re-create the same object in the broker faster than
+                    // the broker can dispose of the object.  Allow up to 5 seconds to process.
+                    Tracer.Info("Synchronously disposing of Connection.");
+                    SyncRequest(command, TimeSpan.FromSeconds(5));
+                }
             }
-		}
+            catch // (BrokerException)
+            {
+                // Ignore exceptions while shutting down.
+            }
+        }
 
-		public void DisposeOf(DataStructure objectId)
-		{
-			try
-			{
-				RemoveInfo command = new RemoveInfo();
-				command.ObjectId = objectId;
-				if(asyncClose)
-				{
-					Tracer.Info("Asynchronously disposing of Connection.");
-					if(connected)
-					{
-						command.ResponseRequired = false;
-						transport.Oneway(command);
-					}
-				}
-				else
-				{
-					// Ensure that the object is disposed to avoid potential race-conditions
-					// of trying to re-create the same object in the broker faster than
-					// the broker can dispose of the object.  Allow up to 5 seconds to process.
-					Tracer.Info("Synchronously disposing of Connection.");
-					SyncRequest(command, TimeSpan.FromSeconds(5));
-				}
-			}
-			catch // (BrokerException)
-			{
-				// Ignore exceptions while shutting down.
-			}
-		}
-
-		/// <summary>
-		/// Creates a new temporary destination name
-		/// </summary>
-		public String CreateTemporaryDestinationName()
-		{
-			return info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter);
-		}
-
-		/// <summary>
-		/// Creates a new local transaction ID
-		/// </summary>
-		public LocalTransactionId CreateLocalTransactionId()
-		{
-			LocalTransactionId id = new LocalTransactionId();
-			id.ConnectionId = ConnectionId;
-			id.Value = Interlocked.Increment(ref localTransactionCounter);
-			return id;
-		}
-
-		protected void CheckConnected()
-		{
-			if(closed)
-			{
-				throw new ConnectionClosedException();
-			}
-
-			if(!connected)
-			{
-				connected = true;
-				// now lets send the connection and see if we get an ack/nak
-				if(null == SyncRequest(info))
-				{
-					closed = true;
-					connected = false;
-					throw new ConnectionClosedException();
-				}
-			}
-		}
-
-		/// <summary>
-		/// Handle incoming commands
-		/// </summary>
-		/// <param name="commandTransport">An ITransport</param>
-		/// <param name="command">A  Command</param>
-		protected void OnCommand(ITransport commandTransport, Command command)
-		{
-			if(command is MessageDispatch)
-			{
-				DispatchMessage((MessageDispatch) command);
-			}
-			else if(command is KeepAliveInfo)
-			{
-				OnKeepAliveCommand(commandTransport, (KeepAliveInfo) command);
-			}
-			else if(command is WireFormatInfo)
-			{
-				this.brokerWireFormatInfo = (WireFormatInfo) command;
-			}
-			else if(command is BrokerInfo)
-			{
-				this.brokerInfo = (BrokerInfo) command;
-			}
-			else if(command is ShutdownInfo)
-			{
-				if(!closing && !closed)
-				{
-					OnException(commandTransport, new NMSException("Broker closed this connection."));
-				}
-			}
-			else if(command is ProducerAck)
-			{
-				ProducerAck ack = (ProducerAck) command;
-				if(ack != null && ack.ProducerId != null) {
-					MessageProducer producer = (MessageProducer) producers[ack.ProducerId];
-					if( producer != null ) {
-						producer.OnProducerAck(ack);
-					}
-				}
-			}
-			else if(command is ConnectionError)
-			{
-				if(!closing && !closed)
-				{
-					ConnectionError connectionError = (ConnectionError) command;
-					BrokerError brokerError = connectionError.Exception;
-					string message = "Broker connection error.";
-					string cause = "";
-
-					if(null != brokerError)
-					{
-						message = brokerError.Message;
-						if(null != brokerError.Cause)
-						{
-							cause = brokerError.Cause.Message;
-						}
-					}
-
-					OnException(commandTransport, new NMSConnectionException(message, cause));
-				}
-			}
-			else
-			{
-				Tracer.Error("Unknown command: " + command);
-			}
-		}
-
-		protected void DispatchMessage(MessageDispatch dispatch)
-		{
-			bool dispatched = false;
-
-			// Override the Message's Destination with the one from the Dispatch since in the
-			// case of a virtual Topic the correct destination ack is the one from the Dispatch.
-			// This is a bit of a hack since we should really be sending the entire dispatch to
-			// the Consumer.
-			dispatch.Message.Destination = dispatch.Destination;
-			dispatch.Message.ReadOnlyBody = true;
-			dispatch.Message.ReadOnlyProperties = true;
-			dispatch.Message.RedeliveryCounter = dispatch.RedeliveryCounter;
-
-			lock(sessions.SyncRoot)
-			{
-				foreach(Session session in sessions)
-				{
-					if(session.DispatchMessage(dispatch.ConsumerId, dispatch.Message))
-					{
-						dispatched = true;
-						break;
-					}
-				}
-			}
-
-			if(!dispatched)
-			{
-				Tracer.Error("No such consumer active: " + dispatch.ConsumerId);
-			}
-		}
-
-		protected void OnKeepAliveCommand(ITransport commandTransport, KeepAliveInfo info)
-		{
-			Tracer.Info("Keep alive message received.");
-			if(info.ResponseRequired)
-			{
-				try
-				{
-					if(connected)
-					{
-                        Tracer.Info("Returning KeepAliveInfo Response.");
-						info.ResponseRequired = false;
-						transport.Oneway(info);
-					}
-				}
-				catch(Exception ex)
-				{
-					if(!closing && !closed)
-					{
-						OnException(commandTransport, ex);
-					}
-				}
-			}
-		}
-
-		protected void OnException(ITransport sender, Exception exception)
-		{
-			if(ExceptionListener != null && !this.closing)
-			{
-				try
-				{
-					ExceptionListener(exception);
-				}
-				catch
-				{
-					sender.Dispose();
-				}
-			}
-		}
-
-		protected void OnTransportInterrupted(ITransport sender)
-		{
-			Tracer.Debug("Transport has been Interrupted.");
+        /// <summary>
+        /// Creates a new temporary destination name
+        /// </summary>
+        public String CreateTemporaryDestinationName()
+        {
+            return info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter);
+        }
+
+        /// <summary>
+        /// Creates a new local transaction ID
+        /// </summary>
+        public LocalTransactionId CreateLocalTransactionId()
+        {
+            LocalTransactionId id = new LocalTransactionId();
+            id.ConnectionId = ConnectionId;
+            id.Value = Interlocked.Increment(ref localTransactionCounter);
+            return id;
+        }
+
+        protected void CheckConnected()
+        {
+            if(closed)
+            {
+                throw new ConnectionClosedException();
+            }
+
+            if(!connected)
+            {
+                connected = true;
+                // now lets send the connection and see if we get an ack/nak
+                if(null == SyncRequest(info))
+                {
+                    closed = true;
+                    connected = false;
+                    throw new ConnectionClosedException();
+                }
+            }
+        }
+
+        /// <summary>
+        /// Handle incoming commands
+        /// </summary>
+        /// <param name="commandTransport">An ITransport</param>
+        /// <param name="command">A  Command</param>
+        protected void OnCommand(ITransport commandTransport, Command command)
+        {
+            if(command is MessageDispatch)
+            {
+                DispatchMessage((MessageDispatch) command);
+            }
+            else if(command is KeepAliveInfo)
+            {
+                OnKeepAliveCommand(commandTransport, (KeepAliveInfo) command);
+            }
+            else if(command is WireFormatInfo)
+            {
+                this.brokerWireFormatInfo = (WireFormatInfo) command;
+            }
+            else if(command is BrokerInfo)
+            {
+                this.brokerInfo = (BrokerInfo) command;
+            }
+            else if(command is ShutdownInfo)
+            {
+                if(!closing && !closed)
+                {
+                    OnException(commandTransport, new NMSException("Broker closed this connection."));
+                }
+            }
+            else if(command is ProducerAck)
+            {
+                ProducerAck ack = (ProducerAck) command;
+                if(ack != null && ack.ProducerId != null) {
+                    MessageProducer producer = (MessageProducer) producers[ack.ProducerId];
+                    if( producer != null ) {
+                        producer.OnProducerAck(ack);
+                    }
+                }
+            }
+            else if(command is ConnectionError)
+            {
+                if(!closing && !closed)
+                {
+                    ConnectionError connectionError = (ConnectionError) command;
+                    BrokerError brokerError = connectionError.Exception;
+                    string message = "Broker connection error.";
+                    string cause = "";
+
+                    if(null != brokerError)
+                    {
+                        message = brokerError.Message;
+                        if(null != brokerError.Cause)
+                        {
+                            cause = brokerError.Cause.Message;
+                        }
+                    }
+
+                    OnException(commandTransport, new NMSConnectionException(message, cause));
+                }
+            }
+            else
+            {
+                Tracer.Error("Unknown command: " + command);
+            }
+        }
+
+        protected void DispatchMessage(MessageDispatch dispatch)
+        {
+            bool dispatched = false;
+
+            // Override the Message's Destination with the one from the Dispatch since in the
+            // case of a virtual Topic the correct destination ack is the one from the Dispatch.
+            // This is a bit of a hack since we should really be sending the entire dispatch to
+            // the Consumer.
+            dispatch.Message.Destination = dispatch.Destination;
+            dispatch.Message.ReadOnlyBody = true;
+            dispatch.Message.ReadOnlyProperties = true;
+            dispatch.Message.RedeliveryCounter = dispatch.RedeliveryCounter;
+
+            lock(sessions.SyncRoot)
+            {
+                foreach(Session session in sessions)
+                {
+                    if(session.DispatchMessage(dispatch.ConsumerId, dispatch.Message))
+                    {
+                        dispatched = true;
+                        break;
+                    }
+                }
+            }
+
+            if(!dispatched)
+            {
+                Tracer.Error("No such consumer active: " + dispatch.ConsumerId);
+            }
+        }
+
+        protected void OnKeepAliveCommand(ITransport commandTransport, KeepAliveInfo info)
+        {
+            Tracer.Info("Keep alive message received.");
+
+            try
+            {
+                if(connected)
+                {
+                    Tracer.Info("Returning KeepAliveInfo Response.");
+                    info.ResponseRequired = false;
+                    transport.Oneway(info);
+                }
+            }
+            catch(Exception ex)
+            {
+                if(!closing && !closed)
+                {
+                    OnException(commandTransport, ex);
+                }
+            }
+        }
+
+        protected void OnException(ITransport sender, Exception exception)
+        {
+            if(ExceptionListener != null && !this.closing)
+            {
+                try
+                {
+                    ExceptionListener(exception);
+                }
+                catch
+                {
+                    sender.Dispose();
+                }
+            }
+        }
+
+        protected void OnTransportInterrupted(ITransport sender)
+        {
+            Tracer.Debug("Transport has been Interrupted.");
 
             if(this.ConnectionInterruptedListener != null && !this.closing )
             {
@@ -687,13 +685,13 @@
                 }
                 catch
                 {
-                }                    
+                }
             }
-		}
+        }
 
-		protected void OnTransportResumed(ITransport sender)
-		{
-			Tracer.Debug("Transport has resumed normal operation.");
+        protected void OnTransportResumed(ITransport sender)
+        {
+            Tracer.Debug("Transport has resumed normal operation.");
 
             if(this.ConnectionResumedListener != null && !this.closing )
             {
@@ -703,33 +701,33 @@
                 }
                 catch
                 {
-                }                    
-            }            
-		}
-
-		internal void OnSessionException(Session sender, Exception exception)
-		{
-			if(ExceptionListener != null)
-			{
-				try
-				{
-					ExceptionListener(exception);
-				}
-				catch
-				{
-					sender.Close();
-				}
-			}
-		}
-
-		protected SessionInfo CreateSessionInfo(AcknowledgementMode sessionAcknowledgementMode)
-		{
-			SessionInfo answer = new SessionInfo();
-			SessionId sessionId = new SessionId();
-			sessionId.ConnectionId = info.ConnectionId.Value;
-			sessionId.Value = Interlocked.Increment(ref sessionCounter);
-			answer.SessionId = sessionId;
-			return answer;
-		}
-	}
+                }
+            }
+        }
+
+        internal void OnSessionException(Session sender, Exception exception)
+        {
+            if(ExceptionListener != null)
+            {
+                try
+                {
+                    ExceptionListener(exception);
+                }
+                catch
+                {
+                    sender.Close();
+                }
+            }
+        }
+
+        protected SessionInfo CreateSessionInfo(AcknowledgementMode sessionAcknowledgementMode)
+        {
+            SessionInfo answer = new SessionInfo();
+            SessionId sessionId = new SessionId();
+            sessionId.ConnectionId = info.ConnectionId.Value;
+            sessionId.Value = Interlocked.Increment(ref sessionCounter);
+            answer.SessionId = sessionId;
+            return answer;
+        }
+    }
 }