You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2008/05/19 21:24:04 UTC
svn commit: r657927 -
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
Author: jgomes
Date: Mon May 19 12:24:04 2008
New Revision: 657927
URL: http://svn.apache.org/viewvc?rev=657927&view=rev
Log:
Handle exceptions thrown by ExceptionListener when shutting down the connection.
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=657927&r1=657926&r2=657927&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Mon May 19 12:24:04 2008
@@ -23,31 +23,31 @@
namespace Apache.NMS.ActiveMQ
{
- /// <summary>
- /// Represents a connection with a message broker
- /// </summary>
- public class Connection : IConnection
- {
- private readonly Uri brokerUri;
+ /// <summary>
+ /// Represents a connection with a message broker
+ /// </summary>
+ public class Connection : IConnection
+ {
+ private readonly Uri brokerUri;
private ITransport transport;
private readonly ConnectionInfo info;
- private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
- private BrokerInfo brokerInfo; // from broker
- private WireFormatInfo brokerWireFormatInfo; // from broker
- private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
- private bool asyncSend = false;
- private bool connected = false;
- private bool closed = false;
- private long sessionCounter = 0;
- private long temporaryDestinationCounter = 0;
- private long localTransactionCounter;
- private bool closing = false;
- private readonly AtomicBoolean started = new AtomicBoolean(true);
- private bool disposed = false;
-
- public Connection(Uri connectionUri, ITransport transport, ConnectionInfo info)
- {
- this.brokerUri = connectionUri;
+ private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
+ private BrokerInfo brokerInfo; // from broker
+ private WireFormatInfo brokerWireFormatInfo; // from broker
+ private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
+ private bool asyncSend = false;
+ private bool connected = false;
+ private bool closed = false;
+ private bool closing = false;
+ private long sessionCounter = 0;
+ private long temporaryDestinationCounter = 0;
+ private long localTransactionCounter;
+ private readonly AtomicBoolean started = new AtomicBoolean(true);
+ private bool disposed = false;
+
+ public Connection(Uri connectionUri, ITransport transport, ConnectionInfo info)
+ {
+ this.brokerUri = connectionUri;
this.info = info;
this.transport = transport;
this.transport.Command = OnCommand;
@@ -55,12 +55,12 @@
this.transport.Start();
}
- ~Connection()
- {
- Dispose(false);
- }
+ ~Connection()
+ {
+ Dispose(false);
+ }
- public event ExceptionListener ExceptionListener;
+ public event ExceptionListener ExceptionListener;
public bool IsStarted
@@ -85,9 +85,9 @@
/// that maps to the enumeration value.
/// </summary>
public string AckMode
- {
+ {
set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); }
- }
+ }
#endregion
@@ -98,7 +98,7 @@
public void Start()
{
CheckConnected();
- if (started.CompareAndSet(false, true))
+ if(started.CompareAndSet(false, true))
{
foreach(Session session in sessions)
{
@@ -114,7 +114,7 @@
public void Stop()
{
CheckConnected();
- if (started.CompareAndSet(true, false))
+ if(started.CompareAndSet(true, false))
{
foreach(Session session in sessions)
{
@@ -122,20 +122,20 @@
}
}
}
-
- /// <summary>
- /// Creates a new session to work on this connection
- /// </summary>
- public ISession CreateSession()
- {
- return CreateSession(acknowledgementMode);
- }
-
- /// <summary>
- /// Creates a new session to work on this connection
- /// </summary>
- public ISession CreateSession(AcknowledgementMode sessionAcknowledgementMode)
- {
+
+ /// <summary>
+ /// Creates a new session to work on this connection
+ /// </summary>
+ public ISession CreateSession()
+ {
+ return CreateSession(acknowledgementMode);
+ }
+
+ /// <summary>
+ /// Creates a new session to work on this connection
+ /// </summary>
+ public ISession CreateSession(AcknowledgementMode sessionAcknowledgementMode)
+ {
SessionInfo info = CreateSessionInfo(sessionAcknowledgementMode);
SyncRequest(info);
Session session = new Session(this, info, sessionAcknowledgementMode);
@@ -145,8 +145,8 @@
URISupport.SetProperties(session, map, "session.");
sessions.Add(session);
- return session;
- }
+ return session;
+ }
public void RemoveSession(Session session)
{
@@ -160,28 +160,34 @@
public void Close()
{
- if(!closed)
+ lock(this)
{
- closing = true;
- foreach(Session session in sessions)
+ if(closed)
{
- session.Close();
+ return;
}
- sessions.Clear();
try
{
+ closing = true;
+ foreach(Session session in sessions)
+ {
+ session.Close();
+ }
+ sessions.Clear();
+
DisposeOf(ConnectionId);
transport.Oneway(new ShutdownInfo());
+ transport.Dispose();
}
catch(Exception ex)
{
Tracer.ErrorFormat("Error during connection close: {0}", ex);
}
- transport.Dispose();
transport = null;
closed = true;
+ closing = false;
}
}
@@ -218,7 +224,7 @@
disposed = true;
}
- // Properties
+ // Properties
public Uri BrokerUri
{
@@ -226,10 +232,10 @@
}
public ITransport ITransport
- {
- get { return transport; }
- set { this.transport = value; }
- }
+ {
+ get { return transport; }
+ set { this.transport = value; }
+ }
public AcknowledgementMode AcknowledgementMode
{
@@ -238,50 +244,50 @@
}
public string ClientId
- {
- get { return info.ClientId; }
- set
- {
- if (connected)
- {
- throw new NMSException("You cannot change the ClientId once the Connection is connected");
- }
- info.ClientId = value;
- }
- }
-
- public ConnectionId ConnectionId
- {
- get { return info.ConnectionId; }
- }
-
- public BrokerInfo BrokerInfo
- {
- get { return brokerInfo; }
- }
-
- public WireFormatInfo BrokerWireFormat
- {
- get { return brokerWireFormatInfo; }
- }
-
- // Implementation methods
+ {
+ get { return info.ClientId; }
+ set
+ {
+ if(connected)
+ {
+ throw new NMSException("You cannot change the ClientId once the Connection is connected");
+ }
+ info.ClientId = value;
+ }
+ }
+
+ public ConnectionId ConnectionId
+ {
+ get { return info.ConnectionId; }
+ }
+
+ public BrokerInfo BrokerInfo
+ {
+ get { return brokerInfo; }
+ }
+
+ public WireFormatInfo BrokerWireFormat
+ {
+ get { return brokerWireFormatInfo; }
+ }
+
+ // Implementation methods
/// <summary>
- /// Performs a synchronous request-response with the broker
- /// </summary>
- public Response SyncRequest(Command command)
- {
- CheckConnected();
- Response response = transport.Request(command);
- if (response is ExceptionResponse)
- {
- ExceptionResponse exceptionResponse = (ExceptionResponse) response;
- BrokerError brokerError = exceptionResponse.Exception;
- throw new BrokerException(brokerError);
- }
- return response;
- }
+ /// Performs a synchronous request-response with the broker
+ /// </summary>
+ public Response SyncRequest(Command command)
+ {
+ CheckConnected();
+ Response response = transport.Request(command);
+ if(response is ExceptionResponse)
+ {
+ ExceptionResponse exceptionResponse = (ExceptionResponse) response;
+ BrokerError brokerError = exceptionResponse.Exception;
+ throw new BrokerException(brokerError);
+ }
+ return response;
+ }
public void OneWay(Command command)
{
@@ -299,80 +305,83 @@
SyncRequest(command);
}
- /// <summary>
- /// Creates a new temporary destination name
- /// </summary>
- public String CreateTemporaryDestinationName()
- {
- lock (this)
- {
- return info.ConnectionId.Value + ":" + (++temporaryDestinationCounter);
- }
- }
-
- /// <summary>
- /// Creates a new local transaction ID
- /// </summary>
- public LocalTransactionId CreateLocalTransactionId()
- {
- LocalTransactionId id= new LocalTransactionId();
- id.ConnectionId = ConnectionId;
- lock (this)
- {
- id.Value = (++localTransactionCounter);
- }
- return id;
- }
-
- protected void CheckConnected()
- {
- if (closed)
- {
- throw new ConnectionClosedException();
- }
- if (!connected)
- {
- connected = true;
- // now lets send the connection and see if we get an ack/nak
- if(null == SyncRequest(info))
- {
- throw new ConnectionClosedException();
- }
+ /// <summary>
+ /// Creates a new temporary destination name
+ /// </summary>
+ public String CreateTemporaryDestinationName()
+ {
+ lock(this)
+ {
+ return info.ConnectionId.Value + ":" + (++temporaryDestinationCounter);
}
- }
-
+ }
+
/// <summary>
- /// Handle incoming commands
- /// </summary>
+ /// Creates a new local transaction ID
+ /// </summary>
+ public LocalTransactionId CreateLocalTransactionId()
+ {
+ LocalTransactionId id= new LocalTransactionId();
+ id.ConnectionId = ConnectionId;
+ lock(this)
+ {
+ id.Value = (++localTransactionCounter);
+ }
+ return id;
+ }
+
+ protected void CheckConnected()
+ {
+ if(closed)
+ {
+ throw new ConnectionClosedException();
+ }
+
+ if(!connected)
+ {
+ connected = true;
+ // now lets send the connection and see if we get an ack/nak
+ if(null == SyncRequest(info))
+ {
+ closed = true;
+ connected = false;
+ throw new ConnectionClosedException();
+ }
+ }
+ }
+
+ /// <summary>
+ /// Handle incoming commands
+ /// </summary>
/// <param name="commandTransport">An ITransport</param>
- /// <param name="command">A Command</param>
- protected void OnCommand(ITransport commandTransport, Command command)
- {
- if(command is MessageDispatch)
- {
+ /// <param name="command">A Command</param>
+ protected void OnCommand(ITransport commandTransport, Command command)
+ {
+ if(command is MessageDispatch)
+ {
DispatchMessage((MessageDispatch) command);
- }
- else if(command is WireFormatInfo)
- {
- this.brokerWireFormatInfo = (WireFormatInfo) command;
- }
- else if(command is BrokerInfo)
- {
- this.brokerInfo = (BrokerInfo) command;
- }
- else if(command is ShutdownInfo)
- {
- //ShutdownInfo info = (ShutdownInfo)command;
- if(!closing && !closed)
- {
+ }
+ else if(command is WireFormatInfo)
+ {
+ this.brokerWireFormatInfo = (WireFormatInfo) command;
+ }
+ else if(command is BrokerInfo)
+ {
+ this.brokerInfo = (BrokerInfo) command;
+ }
+ else if(command is ShutdownInfo)
+ {
+ //ShutdownInfo info = (ShutdownInfo)command;
+ if(!closing && !closed)
+ {
OnException(commandTransport, new NMSException("Broker closed this connection."));
- }
- }
- else
- {
- Tracer.Error("Unknown command: " + command);
- }
- }
+ }
+ }
+ else
+ {
+ Tracer.Error("Unknown command: " + command);
+ }
+ }
protected void DispatchMessage(MessageDispatch dispatch)
{
@@ -393,36 +402,50 @@
}
}
- protected void OnException(ITransport sender, Exception exception)
- {
- Tracer.ErrorFormat("Transport Exception: {0}", exception.ToString());
- if(ExceptionListener != null)
- {
- ExceptionListener(exception);
- }
- }
+ protected void OnException(ITransport sender, Exception exception)
+ {
+ Tracer.ErrorFormat("Transport Exception: {0}", exception.ToString());
+ if(ExceptionListener != null)
+ {
+ try
+ {
+ ExceptionListener(exception);
+ }
+ catch
+ {
+ sender.Dispose();
+ }
+ }
+ }
internal void OnSessionException(Session sender, Exception exception)
{
Tracer.ErrorFormat("Session Exception: {0}", exception.ToString());
if(ExceptionListener != null)
{
- ExceptionListener(exception);
+ try
+ {
+ ExceptionListener(exception);
+ }
+ catch
+ {
+ sender.Close();
+ }
+ }
+ }
+
+ protected SessionInfo CreateSessionInfo(AcknowledgementMode sessionAcknowledgementMode)
+ {
+ SessionInfo answer = new SessionInfo();
+ SessionId sessionId = new SessionId();
+ sessionId.ConnectionId = info.ConnectionId.Value;
+ lock(this)
+ {
+ sessionId.Value = ++sessionCounter;
}
+ answer.SessionId = sessionId;
+ return answer;
}
-
- protected SessionInfo CreateSessionInfo(AcknowledgementMode sessionAcknowledgementMode)
- {
- SessionInfo answer = new SessionInfo();
- SessionId sessionId = new SessionId();
- sessionId.ConnectionId = info.ConnectionId.Value;
- lock (this)
- {
- sessionId.Value = ++sessionCounter;
- }
- answer.SessionId = sessionId;
- return answer;
- }
-
- }
+
+ }
}