You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2008/05/19 21:24:04 UTC

svn commit: r657927 - /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs

Author: jgomes
Date: Mon May 19 12:24:04 2008
New Revision: 657927

URL: http://svn.apache.org/viewvc?rev=657927&view=rev
Log:
Handle exceptions thrown by ExceptionListener when shutting down the connection.

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=657927&r1=657926&r2=657927&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Mon May 19 12:24:04 2008
@@ -23,31 +23,31 @@
 
 namespace Apache.NMS.ActiveMQ
 {
-    /// <summary>
-    /// Represents a connection with a message broker
-    /// </summary>
-    public class Connection : IConnection
-    {
-    	private readonly Uri brokerUri;
+	/// <summary>
+	/// Represents a connection with a message broker
+	/// </summary>
+	public class Connection : IConnection
+	{
+		private readonly Uri brokerUri;
 		private ITransport transport;
 		private readonly ConnectionInfo info;
-        private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
-        private BrokerInfo brokerInfo; // from broker
-        private WireFormatInfo brokerWireFormatInfo; // from broker
-        private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
-        private bool asyncSend = false;
-        private bool connected = false;
-        private bool closed = false;
-        private long sessionCounter = 0;
-        private long temporaryDestinationCounter = 0;
-        private long localTransactionCounter;
-        private bool closing = false;
-        private readonly AtomicBoolean started = new AtomicBoolean(true);
-    	private bool disposed = false;
-        
-        public Connection(Uri connectionUri, ITransport transport, ConnectionInfo info)
-        {
-        	this.brokerUri = connectionUri;
+		private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
+		private BrokerInfo brokerInfo; // from broker
+		private WireFormatInfo brokerWireFormatInfo; // from broker
+		private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
+		private bool asyncSend = false;
+		private bool connected = false;
+		private bool closed = false;
+		private bool closing = false;
+		private long sessionCounter = 0;
+		private long temporaryDestinationCounter = 0;
+		private long localTransactionCounter;
+		private readonly AtomicBoolean started = new AtomicBoolean(true);
+		private bool disposed = false;
+		
+		public Connection(Uri connectionUri, ITransport transport, ConnectionInfo info)
+		{
+			this.brokerUri = connectionUri;
 			this.info = info;
 			this.transport = transport;
 			this.transport.Command = OnCommand;
@@ -55,12 +55,12 @@
 			this.transport.Start();
 		}
 
-        ~Connection()
-        {
-        	Dispose(false);
-        }
+		~Connection()
+		{
+			Dispose(false);
+		}
 
-        public event ExceptionListener ExceptionListener;
+		public event ExceptionListener ExceptionListener;
 
 
 		public bool IsStarted
@@ -85,9 +85,9 @@
 		/// that maps to the enumeration value.
 		/// </summary>
 		public string AckMode
-    	{
+		{
 			set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); }
-    	}
+		}
 
 		#endregion
 
@@ -98,7 +98,7 @@
 		public void Start()
 		{
 			CheckConnected();
-			if (started.CompareAndSet(false, true))
+			if(started.CompareAndSet(false, true))
 			{
 				foreach(Session session in sessions)
 				{
@@ -114,7 +114,7 @@
 		public void Stop()
 		{
 			CheckConnected();
-			if (started.CompareAndSet(true, false))
+			if(started.CompareAndSet(true, false))
 			{
 				foreach(Session session in sessions)
 				{
@@ -122,20 +122,20 @@
 				}
 			}
 		}
-        
-        /// <summary>
-        /// Creates a new session to work on this connection
-        /// </summary>
-        public ISession CreateSession()
-        {
-            return CreateSession(acknowledgementMode);
-        }
-        
-        /// <summary>
-        /// Creates a new session to work on this connection
-        /// </summary>
-        public ISession CreateSession(AcknowledgementMode sessionAcknowledgementMode)
-        {
+		
+		/// <summary>
+		/// Creates a new session to work on this connection
+		/// </summary>
+		public ISession CreateSession()
+		{
+			return CreateSession(acknowledgementMode);
+		}
+		
+		/// <summary>
+		/// Creates a new session to work on this connection
+		/// </summary>
+		public ISession CreateSession(AcknowledgementMode sessionAcknowledgementMode)
+		{
 			SessionInfo info = CreateSessionInfo(sessionAcknowledgementMode);
 			SyncRequest(info);
 			Session session = new Session(this, info, sessionAcknowledgementMode);
@@ -145,8 +145,8 @@
 			URISupport.SetProperties(session, map, "session.");
 
 			sessions.Add(session);
-            return session;
-        }
+			return session;
+		}
 
 		public void RemoveSession(Session session)
 		{
@@ -160,28 +160,34 @@
 
 		public void Close()
 		{
-			if(!closed)
+			lock(this)
 			{
-				closing = true;
-				foreach(Session session in sessions)
+				if(closed)
 				{
-					session.Close();
+					return;
 				}
-				sessions.Clear();
 
 				try
 				{
+					closing = true;
+					foreach(Session session in sessions)
+					{
+						session.Close();
+					}
+					sessions.Clear();
+
 					DisposeOf(ConnectionId);
 					transport.Oneway(new ShutdownInfo());
+					transport.Dispose();
 				}
 				catch(Exception ex)
 				{
 					Tracer.ErrorFormat("Error during connection close: {0}", ex);
 				}
 
-				transport.Dispose();
 				transport = null;
 				closed = true;
+				closing = false;
 			}
 		}
 
@@ -218,7 +224,7 @@
 			disposed = true;
 		}
 		
-        // Properties
+		// Properties
 
 		public Uri BrokerUri
 		{
@@ -226,10 +232,10 @@
 		}
 		
 		public ITransport ITransport
-        {
-            get { return transport; }
-            set { this.transport = value; }
-        }
+		{
+			get { return transport; }
+			set { this.transport = value; }
+		}
 
 		public AcknowledgementMode AcknowledgementMode
 		{
@@ -238,50 +244,50 @@
 		}
 		
 		public string ClientId
-        {
-            get { return info.ClientId; }
-            set
-			{
-                if (connected)
-                {
-                    throw new NMSException("You cannot change the ClientId once the Connection is connected");
-                }
-                info.ClientId = value;
-            }
-        }
-        
-        public ConnectionId ConnectionId
-        {
-            get { return info.ConnectionId; }
-        }
-        
-        public BrokerInfo BrokerInfo
-        {
-            get { return brokerInfo; }
-        }
-        
-        public WireFormatInfo BrokerWireFormat
-        {
-            get { return brokerWireFormatInfo; }
-        }
-        
-        // Implementation methods
+		{
+			get { return info.ClientId; }
+			set
+			{
+				if(connected)
+				{
+					throw new NMSException("You cannot change the ClientId once the Connection is connected");
+				}
+				info.ClientId = value;
+			}
+		}
+		
+		public ConnectionId ConnectionId
+		{
+			get { return info.ConnectionId; }
+		}
+		
+		public BrokerInfo BrokerInfo
+		{
+			get { return brokerInfo; }
+		}
+		
+		public WireFormatInfo BrokerWireFormat
+		{
+			get { return brokerWireFormatInfo; }
+		}
+		
+		// Implementation methods
 
 		/// <summary>
-        /// Performs a synchronous request-response with the broker
-        /// </summary>
-        public Response SyncRequest(Command command)
-		{
-            CheckConnected();
-            Response response = transport.Request(command);
-            if (response is ExceptionResponse)
-            {
-                ExceptionResponse exceptionResponse = (ExceptionResponse) response;
-                BrokerError brokerError = exceptionResponse.Exception;
-                throw new BrokerException(brokerError);
-            }
-            return response;
-        }
+		/// Performs a synchronous request-response with the broker
+		/// </summary>
+		public Response SyncRequest(Command command)
+		{
+			CheckConnected();
+			Response response = transport.Request(command);
+			if(response is ExceptionResponse)
+			{
+				ExceptionResponse exceptionResponse = (ExceptionResponse) response;
+				BrokerError brokerError = exceptionResponse.Exception;
+				throw new BrokerException(brokerError);
+			}
+			return response;
+		}
 
 		public void OneWay(Command command)
 		{
@@ -299,80 +305,83 @@
 			SyncRequest(command);
 		}
 
-        /// <summary>
-        /// Creates a new temporary destination name
-        /// </summary>
-        public String CreateTemporaryDestinationName()
-        {
-            lock (this)
-            {
-                return info.ConnectionId.Value + ":" + (++temporaryDestinationCounter);
-            }
-        }
-        
-        /// <summary>
-        /// Creates a new local transaction ID
-        /// </summary>
-        public LocalTransactionId CreateLocalTransactionId()
-        {
-            LocalTransactionId id= new LocalTransactionId();
-            id.ConnectionId = ConnectionId;
-            lock (this)
-            {
-                id.Value = (++localTransactionCounter);
-            }
-            return id;
-        }
-        
-        protected void CheckConnected()
-        {
-            if (closed)
-            {
-                throw new ConnectionClosedException();
-            }
-            if (!connected)
-            {
-                connected = true;
-                // now lets send the connection and see if we get an ack/nak
-                if(null == SyncRequest(info))
-                {
-                	throw new ConnectionClosedException();
-                }
+		/// <summary>
+		/// Creates a new temporary destination name
+		/// </summary>
+		public String CreateTemporaryDestinationName()
+		{
+			lock(this)
+			{
+				return info.ConnectionId.Value + ":" + (++temporaryDestinationCounter);
 			}
-        }
-        
+		}
+		
 		/// <summary>
-        /// Handle incoming commands
-        /// </summary>
+		/// Creates a new local transaction ID
+		/// </summary>
+		public LocalTransactionId CreateLocalTransactionId()
+		{
+			LocalTransactionId id= new LocalTransactionId();
+			id.ConnectionId = ConnectionId;
+			lock(this)
+			{
+				id.Value = (++localTransactionCounter);
+			}
+			return id;
+		}
+		
+		protected void CheckConnected()
+		{
+			if(closed)
+			{
+				throw new ConnectionClosedException();
+			}
+
+			if(!connected)
+			{
+				connected = true;
+				// now lets send the connection and see if we get an ack/nak
+				if(null == SyncRequest(info))
+				{
+					closed = true;
+					connected = false;
+					throw new ConnectionClosedException();
+				}
+			}
+		}
+		
+		/// <summary>
+		/// Handle incoming commands
+		/// </summary>
 		/// <param name="commandTransport">An ITransport</param>
-        /// <param name="command">A  Command</param>
-        protected void OnCommand(ITransport commandTransport, Command command)
-        {
-            if(command is MessageDispatch)
-            {
+		/// <param name="command">A  Command</param>
+		protected void OnCommand(ITransport commandTransport, Command command)
+		{
+			if(command is MessageDispatch)
+			{
 				DispatchMessage((MessageDispatch) command);
-            }
-            else if(command is WireFormatInfo)
-            {
-                this.brokerWireFormatInfo = (WireFormatInfo) command;
-            }
-            else if(command is BrokerInfo)
-            {
-                this.brokerInfo = (BrokerInfo) command;
-            }
-            else if(command is ShutdownInfo)
-            {
-                //ShutdownInfo info = (ShutdownInfo)command;
-                if(!closing && !closed)
-                {
+			}
+			else if(command is WireFormatInfo)
+			{
+				this.brokerWireFormatInfo = (WireFormatInfo) command;
+			}
+			else if(command is BrokerInfo)
+			{
+				this.brokerInfo = (BrokerInfo) command;
+			}
+			else if(command is ShutdownInfo)
+			{
+				//ShutdownInfo info = (ShutdownInfo)command;
+				if(!closing && !closed)
+				{
 					OnException(commandTransport, new NMSException("Broker closed this connection."));
-                }
-            }
-            else
-            {
-                Tracer.Error("Unknown command: " + command);
-            }
-        }
+				}
+			}
+			else
+			{
+				Tracer.Error("Unknown command: " + command);
+			}
+		}
 
 		protected void DispatchMessage(MessageDispatch dispatch)
 		{
@@ -393,36 +402,50 @@
 			}
 		}
 
-    	protected void OnException(ITransport sender, Exception exception)
-        {
-            Tracer.ErrorFormat("Transport Exception: {0}", exception.ToString());
-            if(ExceptionListener != null)
-            {
-            	ExceptionListener(exception);
-            }
-        }
+		protected void OnException(ITransport sender, Exception exception)
+		{
+			Tracer.ErrorFormat("Transport Exception: {0}", exception.ToString());
+			if(ExceptionListener != null)
+			{
+				try
+				{
+					ExceptionListener(exception);
+				}
+				catch
+				{
+					sender.Dispose();
+				}
+			}
+		}
 
 		internal void OnSessionException(Session sender, Exception exception)
 		{
 			Tracer.ErrorFormat("Session Exception: {0}", exception.ToString());
 			if(ExceptionListener != null)
 			{
-				ExceptionListener(exception);
+				try
+				{
+					ExceptionListener(exception);
+				}
+				catch
+				{
+					sender.Close();
+				}
+			}
+		}
+		
+		protected SessionInfo CreateSessionInfo(AcknowledgementMode sessionAcknowledgementMode)
+		{
+			SessionInfo answer = new SessionInfo();
+			SessionId sessionId = new SessionId();
+			sessionId.ConnectionId = info.ConnectionId.Value;
+			lock(this)
+			{
+				sessionId.Value = ++sessionCounter;
 			}
+			answer.SessionId = sessionId;
+			return answer;
 		}
-        
-        protected SessionInfo CreateSessionInfo(AcknowledgementMode sessionAcknowledgementMode)
-        {
-            SessionInfo answer = new SessionInfo();
-            SessionId sessionId = new SessionId();
-            sessionId.ConnectionId = info.ConnectionId.Value;
-            lock (this)
-            {
-                sessionId.Value = ++sessionCounter;
-            }
-            answer.SessionId = sessionId;
-            return answer;
-        }
-        
-    }
+		
+	}
 }