You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/07/19 22:45:51 UTC

svn commit: r965623 - /activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs

Author: tabish
Date: Mon Jul 19 20:45:51 2010
New Revision: 965623

URL: http://svn.apache.org/viewvc?rev=965623&view=rev
Log:
Fix the createSession method to properly get the parameters form the Uri

Modified:
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs?rev=965623&r1=965622&r2=965623&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs Mon Jul 19 20:45:51 2010
@@ -17,6 +17,7 @@
 
 using System;
 using System.Collections;
+using System.Collections.Specialized;
 using System.Threading;
 using Apache.NMS.Stomp.Commands;
 using Apache.NMS.Stomp.Transport;
@@ -26,589 +27,589 @@ using Apache.NMS.Util;
 
 namespace Apache.NMS.Stomp
 {
-	/// <summary>
-	/// Represents a connection with a message broker
-	/// </summary>
-	public class Connection : IConnection
-	{
-		private static readonly IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
-
-		private readonly Uri brokerUri;
-		private ITransport transport;
-		private readonly ConnectionInfo info;
-
-		private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
-		private bool asyncSend = false;
-		private bool alwaysSyncSend = false;
-		private bool copyMessageOnSend = true;
-		private bool sendAcksAsync = false;
-		private bool dispatchAsync = true;
-		private string transformation = null;
-		private IRedeliveryPolicy redeliveryPolicy;
-		private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
-
-		private bool userSpecifiedClientID;
-		private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
-		private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
-		private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable());
-		private readonly object myLock = new object();
-		private bool connected = false;
-		private bool closed = false;
-		private bool closing = false;
-		private int sessionCounter = 0;
-		private int temporaryDestinationCounter = 0;
-		private int localTransactionCounter;
-		private readonly Atomic<bool> started = new Atomic<bool>(false);
-		private ConnectionMetaData metaData = null;
-		private bool disposed = false;
-		private IdGenerator clientIdGenerator;
+    /// <summary>
+    /// Represents a connection with a message broker
+    /// </summary>
+    public class Connection : IConnection
+    {
+        private static readonly IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
+
+        private readonly Uri brokerUri;
+        private ITransport transport;
+        private readonly ConnectionInfo info;
+
+        private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
+        private bool asyncSend = false;
+        private bool alwaysSyncSend = false;
+        private bool copyMessageOnSend = true;
+        private bool sendAcksAsync = false;
+        private bool dispatchAsync = true;
+        private string transformation = null;
+        private IRedeliveryPolicy redeliveryPolicy;
+        private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
+
+        private bool userSpecifiedClientID;
+        private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
+        private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
+        private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable());
+        private readonly object myLock = new object();
+        private bool connected = false;
+        private bool closed = false;
+        private bool closing = false;
+        private int sessionCounter = 0;
+        private int temporaryDestinationCounter = 0;
+        private int localTransactionCounter;
+        private readonly Atomic<bool> started = new Atomic<bool>(false);
+        private ConnectionMetaData metaData = null;
+        private bool disposed = false;
+        private IdGenerator clientIdGenerator;
         private CountDownLatch transportInterruptionProcessingComplete;
 
-		public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
-		{
-			this.brokerUri = connectionUri;
-			this.clientIdGenerator = clientIdGenerator;
-
-			this.transport = transport;
-			this.transport.Command = new CommandHandler(OnCommand);
-			this.transport.Exception = new ExceptionHandler(OnException);
-			this.transport.Interrupted = new InterruptedHandler(OnTransportInterrupted);
-			this.transport.Resumed = new ResumedHandler(OnTransportResumed);
-
-			ConnectionId id = new ConnectionId();
-			id.Value = CONNECTION_ID_GENERATOR.GenerateId();
-
-			this.info = new ConnectionInfo();
-			this.info.ConnectionId = id;
-		}
-
-		~Connection()
-		{
-			Dispose(false);
-		}
-
-		/// <summary>
-		/// A delegate that can receive transport level exceptions.
-		/// </summary>
-		public event ExceptionListener ExceptionListener;
-
-		/// <summary>
-		/// An asynchronous listener that is notified when a Fault tolerant connection
-		/// has been interrupted.
-		/// </summary>
-		public event ConnectionInterruptedListener ConnectionInterruptedListener;
-
-		/// <summary>
-		/// An asynchronous listener that is notified when a Fault tolerant connection
-		/// has been resumed.
-		/// </summary>
-		public event ConnectionResumedListener ConnectionResumedListener;
-
-		#region Properties
-
-		public String UserName
-		{
-			get { return this.info.UserName; }
-			set { this.info.UserName = value; }
-		}
-
-		public String Password
-		{
-			get { return this.info.Password; }
-			set { this.info.Password = value; }
-		}
-
-		/// <summary>
-		/// This property indicates whether or not async send is enabled.
-		/// </summary>
-		public bool AsyncSend
-		{
-			get { return asyncSend; }
-			set { asyncSend = value; }
-		}
-
-		/// <summary>
-		/// This property sets the acknowledgment mode for the connection.
-		/// The URI parameter connection.ackmode can be set to a string value
-		/// that maps to the enumeration value.
-		/// </summary>
-		public string AckMode
-		{
-			set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); }
-		}
-
-		/// <summary>
-		/// This property forces all messages that are sent to be sent synchronously overriding
-		/// any usage of the AsyncSend flag. This can reduce performance in some cases since the
-		/// only messages we normally send synchronously are Persistent messages not sent in a
-		/// transaction. This options guarantees that no send will return until the broker has
-		/// acknowledge receipt of the message
-		/// </summary>
-		public bool AlwaysSyncSend
-		{
-			get { return alwaysSyncSend; }
-			set { alwaysSyncSend = value; }
-		}
-
-		/// <summary>
-		/// This property indicates whether Message's should be copied before being sent via
-		/// one of the Connection's send methods.  Copying the Message object allows the user
-		/// to resuse the Object over for another send.  If the message isn't copied performance
-		/// can improve but the user must not reuse the Object as it may not have been sent
-		/// before they reset its payload.
-		/// </summary>
-		public bool CopyMessageOnSend
-		{
-			get { return copyMessageOnSend; }
-			set { copyMessageOnSend = value; }
-		}
-
-		/// <summary>
-		/// This property indicates whether or not async sends are used for
-		/// message acknowledgement messages.  Sending Acks async can improve
-		/// performance but may decrease reliability.
-		/// </summary>
-		public bool SendAcksAsync
-		{
-			get { return sendAcksAsync; }
-			set { sendAcksAsync = value; }
-		}
-
-		/// <summary>
-		/// synchronously or asynchronously by the broker.  Set to false for a slow
-		/// consumer and true for a fast consumer.
-		/// </summary>
-		public bool DispatchAsync
-		{
-			get { return this.dispatchAsync; }
-			set { this.dispatchAsync = value; }
-		}
-
-		/// <summary>
-		/// Sets the default Transformation attribute applied to Consumers.  If a consumer
-		/// is to receive Map messages from the Broker then the user should set the "jms-map-xml"
-		/// transformation on the consumer so that all MapMessages are sent as XML.
-		/// </summary>
-		public string Transformation
-		{
-			get { return this.transformation; }
-			set { this.transformation = value; }
-		}
-
-		public IConnectionMetaData MetaData
-		{
-			get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); }
-		}
-
-		public Uri BrokerUri
-		{
-			get { return brokerUri; }
-		}
-
-		public ITransport ITransport
-		{
-			get { return transport; }
-			set { this.transport = value; }
-		}
-
-		public TimeSpan RequestTimeout
-		{
-			get { return this.requestTimeout; }
-			set { this.requestTimeout = value; }
-		}
-
-		public AcknowledgementMode AcknowledgementMode
-		{
-			get { return acknowledgementMode; }
-			set { this.acknowledgementMode = value; }
-		}
-
-		public string ClientId
-		{
-			get { return info.ClientId; }
-			set
-			{
-				if(this.connected)
-				{
-					throw new NMSException("You cannot change the ClientId once the Connection is connected");
-				}
-
-				this.info.ClientId = value;
-				this.userSpecifiedClientID = true;
-				CheckConnected();
-			}
-		}
-
-		/// <summary>
-		/// The Default Client Id used if the ClientId property is not set explicity.
-		/// </summary>
-		public string DefaultClientId
-		{
-			set
-			{
-				this.info.ClientId = value;
-				this.userSpecifiedClientID = true;
-			}
-		}
-
-		public ConnectionId ConnectionId
-		{
-			get { return info.ConnectionId; }
-		}
-
-		/// <summary>
-		/// Get/or set the redelivery policy for this connection.
-		/// </summary>
-		public IRedeliveryPolicy RedeliveryPolicy
-		{
-			get { return this.redeliveryPolicy; }
-			set { this.redeliveryPolicy = value; }
-		}
-
-		public PrefetchPolicy PrefetchPolicy
-		{
-			get { return this.prefetchPolicy; }
-			set { this.prefetchPolicy = value; }
-		}
-
-		#endregion
-
-		/// <summary>
-		/// Starts asynchronous message delivery of incoming messages for this connection.
-		/// Synchronous delivery is unaffected.
-		/// </summary>
-		public void Start()
-		{
-			CheckConnected();
-			if(started.CompareAndSet(false, true))
-			{
-				lock(sessions.SyncRoot)
-				{
-					foreach(Session session in sessions)
-					{
-						session.Start();
-					}
-				}
-			}
-		}
-
-		/// <summary>
-		/// This property determines if the asynchronous message delivery of incoming
-		/// messages has been started for this connection.
-		/// </summary>
-		public bool IsStarted
-		{
-			get { return started.Value; }
-		}
-
-		/// <summary>
-		/// Temporarily stop asynchronous delivery of inbound messages for this connection.
-		/// The sending of outbound messages is unaffected.
-		/// </summary>
-		public void Stop()
-		{
-			CheckConnected();
-			if(started.CompareAndSet(true, false))
-			{
-				lock(sessions.SyncRoot)
-				{
-					foreach(Session session in sessions)
-					{
-						session.Stop();
-					}
-				}
-			}
-		}
-
-		/// <summary>
-		/// Creates a new session to work on this connection
-		/// </summary>
-		public ISession CreateSession()
-		{
-			return CreateSession(acknowledgementMode);
-		}
-
-		/// <summary>
-		/// Creates a new session to work on this connection
-		/// </summary>
-		public ISession CreateSession(AcknowledgementMode sessionAcknowledgementMode)
-		{
-			SessionInfo info = CreateSessionInfo(sessionAcknowledgementMode);
-			Session session = new Session(this, info, sessionAcknowledgementMode, this.dispatchAsync);
-
-			// Set properties on session using parameters prefixed with "session."
-			URISupport.CompositeData c = URISupport.ParseComposite(this.brokerUri);
-			URISupport.SetProperties(session, c.Parameters, "session.");
-
-			if(IsStarted)
-			{
-				session.Start();
-			}
-
-			sessions.Add(session);
-			return session;
-		}
-
-		internal void RemoveSession(Session session)
-		{
-			if(!this.closing)
-			{
-				sessions.Remove(session);
-			}
-		}
-
-		internal void addDispatcher(ConsumerId id, IDispatcher dispatcher)
-		{
-			this.dispatchers.Add(id, dispatcher);
-		}
-
-		internal void removeDispatcher(ConsumerId id)
-		{
-			this.dispatchers.Remove(id);
-		}
-
-		public void Close()
-		{
-			lock(myLock)
-			{
-				if(this.closed)
-				{
-					return;
-				}
-
-				try
-				{
-					Tracer.Info("Closing Connection.");
-					this.closing = true;
-					lock(sessions.SyncRoot)
-					{
-						foreach(Session session in sessions)
-						{
-							session.DoClose();
-						}
-					}
-					sessions.Clear();
-
-					if(connected)
-					{
-						ShutdownInfo shutdowninfo = new ShutdownInfo();
-						transport.Oneway(shutdowninfo);
-					}
-
-					Tracer.Info("Disposing of the Transport.");
-					transport.Dispose();
-				}
-				catch(Exception ex)
-				{
-					Tracer.ErrorFormat("Error during connection close: {0}", ex);
-				}
-				finally
-				{
-					this.transport = null;
-					this.closed = true;
-					this.connected = false;
-					this.closing = false;
-				}
-			}
-		}
-
-		public void Dispose()
-		{
-			Dispose(true);
-			GC.SuppressFinalize(this);
-		}
-
-		protected void Dispose(bool disposing)
-		{
-			if(disposed)
-			{
-				return;
-			}
-
-			if(disposing)
-			{
-				// Dispose managed code here.
-			}
-
-			try
-			{
-				// For now we do not distinguish between Dispose() and Close().
-				// In theory Dispose should possibly be lighter-weight and perform a (faster)
-				// disorderly close.
-				Close();
-			}
-			catch
-			{
-				// Ignore network errors.
-			}
-
-			disposed = true;
-		}
-
-		// Implementation methods
-
-		/// <summary>
-		/// Performs a synchronous request-response with the broker
-		/// </summary>
-		///
-
-		public Response SyncRequest(Command command)
-		{
-			try
-			{
-				return SyncRequest(command, this.RequestTimeout);
-			}
-			catch(Exception ex)
-			{
-				throw NMSExceptionSupport.Create(ex);
-			}
-		}
-
-		public Response SyncRequest(Command command, TimeSpan requestTimeout)
-		{
-			CheckConnected();
-
-			try
-			{
-				Response response = transport.Request(command, requestTimeout);
-				if(response is ExceptionResponse)
-				{
-					ExceptionResponse exceptionResponse = (ExceptionResponse) response;
-					BrokerError brokerError = exceptionResponse.Exception;
-					throw new BrokerException(brokerError);
-				}
-				return response;
-			}
-			catch(Exception ex)
-			{
-				throw NMSExceptionSupport.Create(ex);
-			}
-		}
-
-		public void Oneway(Command command)
-		{
-			CheckConnected();
-
-			try
-			{
-				transport.Oneway(command);
-			}
-			catch(Exception ex)
-			{
-				throw NMSExceptionSupport.Create(ex);
-			}
-		}
-
-		protected void CheckConnected()
-		{
-			if(closed)
-			{
-				throw new ConnectionClosedException();
-			}
-
-			if(!connected)
-			{
-				if(!this.userSpecifiedClientID)
-				{
-					this.info.ClientId = this.clientIdGenerator.GenerateId();
-				}
-
-				connected = true;
-				// now lets send the connection and see if we get an ack/nak
-				if(null == SyncRequest(info))
-				{
-					closed = true;
-					connected = false;
-					throw new ConnectionClosedException();
-				}
-			}
-		}
-
-		/// <summary>
-		/// Handle incoming commands
-		/// </summary>
-		/// <param name="commandTransport">An ITransport</param>
-		/// <param name="command">A  Command</param>
-		protected void OnCommand(ITransport commandTransport, Command command)
-		{
-			if(command is MessageDispatch)
-			{
+        public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
+        {
+            this.brokerUri = connectionUri;
+            this.clientIdGenerator = clientIdGenerator;
+
+            this.transport = transport;
+            this.transport.Command = new CommandHandler(OnCommand);
+            this.transport.Exception = new ExceptionHandler(OnException);
+            this.transport.Interrupted = new InterruptedHandler(OnTransportInterrupted);
+            this.transport.Resumed = new ResumedHandler(OnTransportResumed);
+
+            ConnectionId id = new ConnectionId();
+            id.Value = CONNECTION_ID_GENERATOR.GenerateId();
+
+            this.info = new ConnectionInfo();
+            this.info.ConnectionId = id;
+        }
+
+        ~Connection()
+        {
+            Dispose(false);
+        }
+
+        /// <summary>
+        /// A delegate that can receive transport level exceptions.
+        /// </summary>
+        public event ExceptionListener ExceptionListener;
+
+        /// <summary>
+        /// An asynchronous listener that is notified when a Fault tolerant connection
+        /// has been interrupted.
+        /// </summary>
+        public event ConnectionInterruptedListener ConnectionInterruptedListener;
+
+        /// <summary>
+        /// An asynchronous listener that is notified when a Fault tolerant connection
+        /// has been resumed.
+        /// </summary>
+        public event ConnectionResumedListener ConnectionResumedListener;
+
+        #region Properties
+
+        public String UserName
+        {
+            get { return this.info.UserName; }
+            set { this.info.UserName = value; }
+        }
+
+        public String Password
+        {
+            get { return this.info.Password; }
+            set { this.info.Password = value; }
+        }
+
+        /// <summary>
+        /// This property indicates whether or not async send is enabled.
+        /// </summary>
+        public bool AsyncSend
+        {
+            get { return asyncSend; }
+            set { asyncSend = value; }
+        }
+
+        /// <summary>
+        /// This property sets the acknowledgment mode for the connection.
+        /// The URI parameter connection.ackmode can be set to a string value
+        /// that maps to the enumeration value.
+        /// </summary>
+        public string AckMode
+        {
+            set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); }
+        }
+
+        /// <summary>
+        /// This property forces all messages that are sent to be sent synchronously overriding
+        /// any usage of the AsyncSend flag. This can reduce performance in some cases since the
+        /// only messages we normally send synchronously are Persistent messages not sent in a
+        /// transaction. This options guarantees that no send will return until the broker has
+        /// acknowledge receipt of the message
+        /// </summary>
+        public bool AlwaysSyncSend
+        {
+            get { return alwaysSyncSend; }
+            set { alwaysSyncSend = value; }
+        }
+
+        /// <summary>
+        /// This property indicates whether Message's should be copied before being sent via
+        /// one of the Connection's send methods.  Copying the Message object allows the user
+        /// to resuse the Object over for another send.  If the message isn't copied performance
+        /// can improve but the user must not reuse the Object as it may not have been sent
+        /// before they reset its payload.
+        /// </summary>
+        public bool CopyMessageOnSend
+        {
+            get { return copyMessageOnSend; }
+            set { copyMessageOnSend = value; }
+        }
+
+        /// <summary>
+        /// This property indicates whether or not async sends are used for
+        /// message acknowledgement messages.  Sending Acks async can improve
+        /// performance but may decrease reliability.
+        /// </summary>
+        public bool SendAcksAsync
+        {
+            get { return sendAcksAsync; }
+            set { sendAcksAsync = value; }
+        }
+
+        /// <summary>
+        /// synchronously or asynchronously by the broker.  Set to false for a slow
+        /// consumer and true for a fast consumer.
+        /// </summary>
+        public bool DispatchAsync
+        {
+            get { return this.dispatchAsync; }
+            set { this.dispatchAsync = value; }
+        }
+
+        /// <summary>
+        /// Sets the default Transformation attribute applied to Consumers.  If a consumer
+        /// is to receive Map messages from the Broker then the user should set the "jms-map-xml"
+        /// transformation on the consumer so that all MapMessages are sent as XML.
+        /// </summary>
+        public string Transformation
+        {
+            get { return this.transformation; }
+            set { this.transformation = value; }
+        }
+
+        public IConnectionMetaData MetaData
+        {
+            get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); }
+        }
+
+        public Uri BrokerUri
+        {
+            get { return brokerUri; }
+        }
+
+        public ITransport ITransport
+        {
+            get { return transport; }
+            set { this.transport = value; }
+        }
+
+        public TimeSpan RequestTimeout
+        {
+            get { return this.requestTimeout; }
+            set { this.requestTimeout = value; }
+        }
+
+        public AcknowledgementMode AcknowledgementMode
+        {
+            get { return acknowledgementMode; }
+            set { this.acknowledgementMode = value; }
+        }
+
+        public string ClientId
+        {
+            get { return info.ClientId; }
+            set
+            {
+                if(this.connected)
+                {
+                    throw new NMSException("You cannot change the ClientId once the Connection is connected");
+                }
+
+                this.info.ClientId = value;
+                this.userSpecifiedClientID = true;
+                CheckConnected();
+            }
+        }
+
+        /// <summary>
+        /// The Default Client Id used if the ClientId property is not set explicity.
+        /// </summary>
+        public string DefaultClientId
+        {
+            set
+            {
+                this.info.ClientId = value;
+                this.userSpecifiedClientID = true;
+            }
+        }
+
+        public ConnectionId ConnectionId
+        {
+            get { return info.ConnectionId; }
+        }
+
+        /// <summary>
+        /// Get/or set the redelivery policy for this connection.
+        /// </summary>
+        public IRedeliveryPolicy RedeliveryPolicy
+        {
+            get { return this.redeliveryPolicy; }
+            set { this.redeliveryPolicy = value; }
+        }
+
+        public PrefetchPolicy PrefetchPolicy
+        {
+            get { return this.prefetchPolicy; }
+            set { this.prefetchPolicy = value; }
+        }
+
+        #endregion
+
+        /// <summary>
+        /// Starts asynchronous message delivery of incoming messages for this connection.
+        /// Synchronous delivery is unaffected.
+        /// </summary>
+        public void Start()
+        {
+            CheckConnected();
+            if(started.CompareAndSet(false, true))
+            {
+                lock(sessions.SyncRoot)
+                {
+                    foreach(Session session in sessions)
+                    {
+                        session.Start();
+                    }
+                }
+            }
+        }
+
+        /// <summary>
+        /// This property determines if the asynchronous message delivery of incoming
+        /// messages has been started for this connection.
+        /// </summary>
+        public bool IsStarted
+        {
+            get { return started.Value; }
+        }
+
+        /// <summary>
+        /// Temporarily stop asynchronous delivery of inbound messages for this connection.
+        /// The sending of outbound messages is unaffected.
+        /// </summary>
+        public void Stop()
+        {
+            CheckConnected();
+            if(started.CompareAndSet(true, false))
+            {
+                lock(sessions.SyncRoot)
+                {
+                    foreach(Session session in sessions)
+                    {
+                        session.Stop();
+                    }
+                }
+            }
+        }
+
+        /// <summary>
+        /// Creates a new session to work on this connection
+        /// </summary>
+        public ISession CreateSession()
+        {
+            return CreateSession(acknowledgementMode);
+        }
+
+        /// <summary>
+        /// Creates a new session to work on this connection
+        /// </summary>
+        public ISession CreateSession(AcknowledgementMode sessionAcknowledgementMode)
+        {
+            SessionInfo info = CreateSessionInfo(sessionAcknowledgementMode);
+            Session session = new Session(this, info, sessionAcknowledgementMode, this.dispatchAsync);
+
+            // Set properties on session using parameters prefixed with "session."
+            StringDictionary options = URISupport.ParseQuery(this.brokerUri.Query);
+            URISupport.SetProperties(session, options, "session.");
+
+            if(IsStarted)
+            {
+                session.Start();
+            }
+
+            sessions.Add(session);
+            return session;
+        }
+
+        internal void RemoveSession(Session session)
+        {
+            if(!this.closing)
+            {
+                sessions.Remove(session);
+            }
+        }
+
+        internal void addDispatcher(ConsumerId id, IDispatcher dispatcher)
+        {
+            this.dispatchers.Add(id, dispatcher);
+        }
+
+        internal void removeDispatcher(ConsumerId id)
+        {
+            this.dispatchers.Remove(id);
+        }
+
+        public void Close()
+        {
+            lock(myLock)
+            {
+                if(this.closed)
+                {
+                    return;
+                }
+
+                try
+                {
+                    Tracer.Info("Closing Connection.");
+                    this.closing = true;
+                    lock(sessions.SyncRoot)
+                    {
+                        foreach(Session session in sessions)
+                        {
+                            session.DoClose();
+                        }
+                    }
+                    sessions.Clear();
+
+                    if(connected)
+                    {
+                        ShutdownInfo shutdowninfo = new ShutdownInfo();
+                        transport.Oneway(shutdowninfo);
+                    }
+
+                    Tracer.Info("Disposing of the Transport.");
+                    transport.Dispose();
+                }
+                catch(Exception ex)
+                {
+                    Tracer.ErrorFormat("Error during connection close: {0}", ex);
+                }
+                finally
+                {
+                    this.transport = null;
+                    this.closed = true;
+                    this.connected = false;
+                    this.closing = false;
+                }
+            }
+        }
+
+        public void Dispose()
+        {
+            Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        protected void Dispose(bool disposing)
+        {
+            if(disposed)
+            {
+                return;
+            }
+
+            if(disposing)
+            {
+                // Dispose managed code here.
+            }
+
+            try
+            {
+                // For now we do not distinguish between Dispose() and Close().
+                // In theory Dispose should possibly be lighter-weight and perform a (faster)
+                // disorderly close.
+                Close();
+            }
+            catch
+            {
+                // Ignore network errors.
+            }
+
+            disposed = true;
+        }
+
+        // Implementation methods
+
+        /// <summary>
+        /// Performs a synchronous request-response with the broker
+        /// </summary>
+        ///
+
+        public Response SyncRequest(Command command)
+        {
+            try
+            {
+                return SyncRequest(command, this.RequestTimeout);
+            }
+            catch(Exception ex)
+            {
+                throw NMSExceptionSupport.Create(ex);
+            }
+        }
+
+        public Response SyncRequest(Command command, TimeSpan requestTimeout)
+        {
+            CheckConnected();
+
+            try
+            {
+                Response response = transport.Request(command, requestTimeout);
+                if(response is ExceptionResponse)
+                {
+                    ExceptionResponse exceptionResponse = (ExceptionResponse) response;
+                    BrokerError brokerError = exceptionResponse.Exception;
+                    throw new BrokerException(brokerError);
+                }
+                return response;
+            }
+            catch(Exception ex)
+            {
+                throw NMSExceptionSupport.Create(ex);
+            }
+        }
+
+        public void Oneway(Command command)
+        {
+            CheckConnected();
+
+            try
+            {
+                transport.Oneway(command);
+            }
+            catch(Exception ex)
+            {
+                throw NMSExceptionSupport.Create(ex);
+            }
+        }
+
+        protected void CheckConnected()
+        {
+            if(closed)
+            {
+                throw new ConnectionClosedException();
+            }
+
+            if(!connected)
+            {
+                if(!this.userSpecifiedClientID)
+                {
+                    this.info.ClientId = this.clientIdGenerator.GenerateId();
+                }
+
+                connected = true;
+                // now lets send the connection and see if we get an ack/nak
+                if(null == SyncRequest(info))
+                {
+                    closed = true;
+                    connected = false;
+                    throw new ConnectionClosedException();
+                }
+            }
+        }
+
+        /// <summary>
+        /// Handle incoming commands
+        /// </summary>
+        /// <param name="commandTransport">An ITransport</param>
+        /// <param name="command">A  Command</param>
+        protected void OnCommand(ITransport commandTransport, Command command)
+        {
+            if(command is MessageDispatch)
+            {
                 // We wait if the Connection is still processing interruption
                 // code to reset the MessageConsumers.
                 WaitForTransportInterruptionProcessingToComplete();
-				DispatchMessage((MessageDispatch) command);
-			}
-			else if(command is ConnectionError)
-			{
-				if(!closing && !closed)
-				{
-					ConnectionError connectionError = (ConnectionError) command;
-					BrokerError brokerError = connectionError.Exception;
-					string message = "Broker connection error.";
-					string cause = "";
-
-					if(null != brokerError)
-					{
-						message = brokerError.Message;
-						if(null != brokerError.Cause)
-						{
-							cause = brokerError.Cause.Message;
-						}
-					}
-
-					OnException(commandTransport, new NMSConnectionException(message, cause));
-				}
-			}
-			else
-			{
-				Tracer.Error("Unknown command: " + command);
-			}
-		}
-
-		protected void DispatchMessage(MessageDispatch dispatch)
-		{
-			lock(dispatchers.SyncRoot)
-			{
-				if(dispatchers.Contains(dispatch.ConsumerId))
-				{
-					IDispatcher dispatcher = (IDispatcher) dispatchers[dispatch.ConsumerId];
-
-					// Can be null when a consumer has sent a MessagePull and there was
-					// no available message at the broker to dispatch.
-					if(dispatch.Message != null)
-					{
-						dispatch.Message.ReadOnlyBody = true;
-						dispatch.Message.ReadOnlyProperties = true;
-						dispatch.Message.RedeliveryCounter = dispatch.RedeliveryCounter;
-					}
-
-					dispatcher.Dispatch(dispatch);
-
-					return;
-				}
-			}
-
-			Tracer.Error("No such consumer active: " + dispatch.ConsumerId);
-		}
-
-		protected void OnException(ITransport sender, Exception exception)
-		{
-			if(ExceptionListener != null && !this.closing)
-			{
-				try
-				{
-					ExceptionListener(exception);
-				}
-				catch
-				{
-					sender.Dispose();
-				}
-			}
-		}
-
-		protected void OnTransportInterrupted(ITransport sender)
-		{
-			Tracer.Debug("Transport has been Interrupted.");
+                DispatchMessage((MessageDispatch) command);
+            }
+            else if(command is ConnectionError)
+            {
+                if(!closing && !closed)
+                {
+                    ConnectionError connectionError = (ConnectionError) command;
+                    BrokerError brokerError = connectionError.Exception;
+                    string message = "Broker connection error.";
+                    string cause = "";
+
+                    if(null != brokerError)
+                    {
+                        message = brokerError.Message;
+                        if(null != brokerError.Cause)
+                        {
+                            cause = brokerError.Cause.Message;
+                        }
+                    }
+
+                    OnException(commandTransport, new NMSConnectionException(message, cause));
+                }
+            }
+            else
+            {
+                Tracer.Error("Unknown command: " + command);
+            }
+        }
+
+        protected void DispatchMessage(MessageDispatch dispatch)
+        {
+            lock(dispatchers.SyncRoot)
+            {
+                if(dispatchers.Contains(dispatch.ConsumerId))
+                {
+                    IDispatcher dispatcher = (IDispatcher) dispatchers[dispatch.ConsumerId];
+
+                    // Can be null when a consumer has sent a MessagePull and there was
+                    // no available message at the broker to dispatch.
+                    if(dispatch.Message != null)
+                    {
+                        dispatch.Message.ReadOnlyBody = true;
+                        dispatch.Message.ReadOnlyProperties = true;
+                        dispatch.Message.RedeliveryCounter = dispatch.RedeliveryCounter;
+                    }
+
+                    dispatcher.Dispatch(dispatch);
+
+                    return;
+                }
+            }
+
+            Tracer.Error("No such consumer active: " + dispatch.ConsumerId);
+        }
+
+        protected void OnException(ITransport sender, Exception exception)
+        {
+            if(ExceptionListener != null && !this.closing)
+            {
+                try
+                {
+                    ExceptionListener(exception);
+                }
+                catch
+                {
+                    sender.Dispose();
+                }
+            }
+        }
+
+        protected void OnTransportInterrupted(ITransport sender)
+        {
+            Tracer.Debug("Transport has been Interrupted.");
 
             this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.Count);
             if(Tracer.IsDebugEnabled)
@@ -616,82 +617,82 @@ namespace Apache.NMS.Stomp
                 Tracer.Debug("transport interrupted, dispatchers: " + dispatchers.Count);
             }
 
-			foreach(Session session in this.sessions)
-			{
-				session.ClearMessagesInProgress();
-			}
-
-			if(this.ConnectionInterruptedListener != null && !this.closing)
-			{
-				try
-				{
-					this.ConnectionInterruptedListener();
-				}
-				catch
-				{
-				}
-			}
-		}
-
-		protected void OnTransportResumed(ITransport sender)
-		{
-			Tracer.Debug("Transport has resumed normal operation.");
-
-			if(this.ConnectionResumedListener != null && !this.closing)
-			{
-				try
-				{
-					this.ConnectionResumedListener();
-				}
-				catch
-				{
-				}
-			}
-		}
-
-		internal void OnSessionException(Session sender, Exception exception)
-		{
-			if(ExceptionListener != null)
-			{
-				try
-				{
-					ExceptionListener(exception);
-				}
-				catch
-				{
-					sender.Close();
-				}
-			}
-		}
-
-		/// <summary>
-		/// Creates a new temporary destination name
-		/// </summary>
-		public String CreateTemporaryDestinationName()
-		{
-			return info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter);
-		}
-
-		/// <summary>
-		/// Creates a new local transaction ID
-		/// </summary>
-		public TransactionId CreateLocalTransactionId()
-		{
-			TransactionId id = new TransactionId();
-			id.ConnectionId = ConnectionId;
-			id.Value = Interlocked.Increment(ref localTransactionCounter);
-			return id;
-		}
-
-		protected SessionInfo CreateSessionInfo(AcknowledgementMode sessionAcknowledgementMode)
-		{
-			SessionInfo answer = new SessionInfo();
-			SessionId sessionId = new SessionId();
-			sessionId.ConnectionId = info.ConnectionId.Value;
-			sessionId.Value = Interlocked.Increment(ref sessionCounter);
-			answer.SessionId = sessionId;
-			return answer;
-		}
+            foreach(Session session in this.sessions)
+            {
+                session.ClearMessagesInProgress();
+            }
+
+            if(this.ConnectionInterruptedListener != null && !this.closing)
+            {
+                try
+                {
+                    this.ConnectionInterruptedListener();
+                }
+                catch
+                {
+                }
+            }
+        }
+
+        protected void OnTransportResumed(ITransport sender)
+        {
+            Tracer.Debug("Transport has resumed normal operation.");
+
+            if(this.ConnectionResumedListener != null && !this.closing)
+            {
+                try
+                {
+                    this.ConnectionResumedListener();
+                }
+                catch
+                {
+                }
+            }
+        }
+
+        internal void OnSessionException(Session sender, Exception exception)
+        {
+            if(ExceptionListener != null)
+            {
+                try
+                {
+                    ExceptionListener(exception);
+                }
+                catch
+                {
+                    sender.Close();
+                }
+            }
+        }
+
+        /// <summary>
+        /// Creates a new temporary destination name
+        /// </summary>
+        public String CreateTemporaryDestinationName()
+        {
+            return info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter);
+        }
+
+        /// <summary>
+        /// Creates a new local transaction ID
+        /// </summary>
+        public TransactionId CreateLocalTransactionId()
+        {
+            TransactionId id = new TransactionId();
+            id.ConnectionId = ConnectionId;
+            id.Value = Interlocked.Increment(ref localTransactionCounter);
+            return id;
+        }
+
+        protected SessionInfo CreateSessionInfo(AcknowledgementMode sessionAcknowledgementMode)
+        {
+            SessionInfo answer = new SessionInfo();
+            SessionId sessionId = new SessionId();
+            sessionId.ConnectionId = info.ConnectionId.Value;
+            sessionId.Value = Interlocked.Increment(ref sessionCounter);
+            answer.SessionId = sessionId;
+            return answer;
+        }
 
         private void WaitForTransportInterruptionProcessingToComplete()
         {
@@ -715,5 +716,5 @@ namespace Apache.NMS.Stomp
                 cdl.countDown();
             }
         }
-	}
+    }
 }