You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/07/19 22:45:51 UTC
svn commit: r965623 -
/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs
Author: tabish
Date: Mon Jul 19 20:45:51 2010
New Revision: 965623
URL: http://svn.apache.org/viewvc?rev=965623&view=rev
Log:
Fix the createSession method to properly get the parameters form the Uri
Modified:
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs?rev=965623&r1=965622&r2=965623&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs Mon Jul 19 20:45:51 2010
@@ -17,6 +17,7 @@
using System;
using System.Collections;
+using System.Collections.Specialized;
using System.Threading;
using Apache.NMS.Stomp.Commands;
using Apache.NMS.Stomp.Transport;
@@ -26,589 +27,589 @@ using Apache.NMS.Util;
namespace Apache.NMS.Stomp
{
- /// <summary>
- /// Represents a connection with a message broker
- /// </summary>
- public class Connection : IConnection
- {
- private static readonly IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
-
- private readonly Uri brokerUri;
- private ITransport transport;
- private readonly ConnectionInfo info;
-
- private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
- private bool asyncSend = false;
- private bool alwaysSyncSend = false;
- private bool copyMessageOnSend = true;
- private bool sendAcksAsync = false;
- private bool dispatchAsync = true;
- private string transformation = null;
- private IRedeliveryPolicy redeliveryPolicy;
- private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
-
- private bool userSpecifiedClientID;
- private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
- private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
- private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable());
- private readonly object myLock = new object();
- private bool connected = false;
- private bool closed = false;
- private bool closing = false;
- private int sessionCounter = 0;
- private int temporaryDestinationCounter = 0;
- private int localTransactionCounter;
- private readonly Atomic<bool> started = new Atomic<bool>(false);
- private ConnectionMetaData metaData = null;
- private bool disposed = false;
- private IdGenerator clientIdGenerator;
+ /// <summary>
+ /// Represents a connection with a message broker
+ /// </summary>
+ public class Connection : IConnection
+ {
+ private static readonly IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
+
+ private readonly Uri brokerUri;
+ private ITransport transport;
+ private readonly ConnectionInfo info;
+
+ private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
+ private bool asyncSend = false;
+ private bool alwaysSyncSend = false;
+ private bool copyMessageOnSend = true;
+ private bool sendAcksAsync = false;
+ private bool dispatchAsync = true;
+ private string transformation = null;
+ private IRedeliveryPolicy redeliveryPolicy;
+ private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
+
+ private bool userSpecifiedClientID;
+ private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
+ private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
+ private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable());
+ private readonly object myLock = new object();
+ private bool connected = false;
+ private bool closed = false;
+ private bool closing = false;
+ private int sessionCounter = 0;
+ private int temporaryDestinationCounter = 0;
+ private int localTransactionCounter;
+ private readonly Atomic<bool> started = new Atomic<bool>(false);
+ private ConnectionMetaData metaData = null;
+ private bool disposed = false;
+ private IdGenerator clientIdGenerator;
private CountDownLatch transportInterruptionProcessingComplete;
- public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
- {
- this.brokerUri = connectionUri;
- this.clientIdGenerator = clientIdGenerator;
-
- this.transport = transport;
- this.transport.Command = new CommandHandler(OnCommand);
- this.transport.Exception = new ExceptionHandler(OnException);
- this.transport.Interrupted = new InterruptedHandler(OnTransportInterrupted);
- this.transport.Resumed = new ResumedHandler(OnTransportResumed);
-
- ConnectionId id = new ConnectionId();
- id.Value = CONNECTION_ID_GENERATOR.GenerateId();
-
- this.info = new ConnectionInfo();
- this.info.ConnectionId = id;
- }
-
- ~Connection()
- {
- Dispose(false);
- }
-
- /// <summary>
- /// A delegate that can receive transport level exceptions.
- /// </summary>
- public event ExceptionListener ExceptionListener;
-
- /// <summary>
- /// An asynchronous listener that is notified when a Fault tolerant connection
- /// has been interrupted.
- /// </summary>
- public event ConnectionInterruptedListener ConnectionInterruptedListener;
-
- /// <summary>
- /// An asynchronous listener that is notified when a Fault tolerant connection
- /// has been resumed.
- /// </summary>
- public event ConnectionResumedListener ConnectionResumedListener;
-
- #region Properties
-
- public String UserName
- {
- get { return this.info.UserName; }
- set { this.info.UserName = value; }
- }
-
- public String Password
- {
- get { return this.info.Password; }
- set { this.info.Password = value; }
- }
-
- /// <summary>
- /// This property indicates whether or not async send is enabled.
- /// </summary>
- public bool AsyncSend
- {
- get { return asyncSend; }
- set { asyncSend = value; }
- }
-
- /// <summary>
- /// This property sets the acknowledgment mode for the connection.
- /// The URI parameter connection.ackmode can be set to a string value
- /// that maps to the enumeration value.
- /// </summary>
- public string AckMode
- {
- set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); }
- }
-
- /// <summary>
- /// This property forces all messages that are sent to be sent synchronously overriding
- /// any usage of the AsyncSend flag. This can reduce performance in some cases since the
- /// only messages we normally send synchronously are Persistent messages not sent in a
- /// transaction. This options guarantees that no send will return until the broker has
- /// acknowledge receipt of the message
- /// </summary>
- public bool AlwaysSyncSend
- {
- get { return alwaysSyncSend; }
- set { alwaysSyncSend = value; }
- }
-
- /// <summary>
- /// This property indicates whether Message's should be copied before being sent via
- /// one of the Connection's send methods. Copying the Message object allows the user
- /// to resuse the Object over for another send. If the message isn't copied performance
- /// can improve but the user must not reuse the Object as it may not have been sent
- /// before they reset its payload.
- /// </summary>
- public bool CopyMessageOnSend
- {
- get { return copyMessageOnSend; }
- set { copyMessageOnSend = value; }
- }
-
- /// <summary>
- /// This property indicates whether or not async sends are used for
- /// message acknowledgement messages. Sending Acks async can improve
- /// performance but may decrease reliability.
- /// </summary>
- public bool SendAcksAsync
- {
- get { return sendAcksAsync; }
- set { sendAcksAsync = value; }
- }
-
- /// <summary>
- /// synchronously or asynchronously by the broker. Set to false for a slow
- /// consumer and true for a fast consumer.
- /// </summary>
- public bool DispatchAsync
- {
- get { return this.dispatchAsync; }
- set { this.dispatchAsync = value; }
- }
-
- /// <summary>
- /// Sets the default Transformation attribute applied to Consumers. If a consumer
- /// is to receive Map messages from the Broker then the user should set the "jms-map-xml"
- /// transformation on the consumer so that all MapMessages are sent as XML.
- /// </summary>
- public string Transformation
- {
- get { return this.transformation; }
- set { this.transformation = value; }
- }
-
- public IConnectionMetaData MetaData
- {
- get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); }
- }
-
- public Uri BrokerUri
- {
- get { return brokerUri; }
- }
-
- public ITransport ITransport
- {
- get { return transport; }
- set { this.transport = value; }
- }
-
- public TimeSpan RequestTimeout
- {
- get { return this.requestTimeout; }
- set { this.requestTimeout = value; }
- }
-
- public AcknowledgementMode AcknowledgementMode
- {
- get { return acknowledgementMode; }
- set { this.acknowledgementMode = value; }
- }
-
- public string ClientId
- {
- get { return info.ClientId; }
- set
- {
- if(this.connected)
- {
- throw new NMSException("You cannot change the ClientId once the Connection is connected");
- }
-
- this.info.ClientId = value;
- this.userSpecifiedClientID = true;
- CheckConnected();
- }
- }
-
- /// <summary>
- /// The Default Client Id used if the ClientId property is not set explicity.
- /// </summary>
- public string DefaultClientId
- {
- set
- {
- this.info.ClientId = value;
- this.userSpecifiedClientID = true;
- }
- }
-
- public ConnectionId ConnectionId
- {
- get { return info.ConnectionId; }
- }
-
- /// <summary>
- /// Get/or set the redelivery policy for this connection.
- /// </summary>
- public IRedeliveryPolicy RedeliveryPolicy
- {
- get { return this.redeliveryPolicy; }
- set { this.redeliveryPolicy = value; }
- }
-
- public PrefetchPolicy PrefetchPolicy
- {
- get { return this.prefetchPolicy; }
- set { this.prefetchPolicy = value; }
- }
-
- #endregion
-
- /// <summary>
- /// Starts asynchronous message delivery of incoming messages for this connection.
- /// Synchronous delivery is unaffected.
- /// </summary>
- public void Start()
- {
- CheckConnected();
- if(started.CompareAndSet(false, true))
- {
- lock(sessions.SyncRoot)
- {
- foreach(Session session in sessions)
- {
- session.Start();
- }
- }
- }
- }
-
- /// <summary>
- /// This property determines if the asynchronous message delivery of incoming
- /// messages has been started for this connection.
- /// </summary>
- public bool IsStarted
- {
- get { return started.Value; }
- }
-
- /// <summary>
- /// Temporarily stop asynchronous delivery of inbound messages for this connection.
- /// The sending of outbound messages is unaffected.
- /// </summary>
- public void Stop()
- {
- CheckConnected();
- if(started.CompareAndSet(true, false))
- {
- lock(sessions.SyncRoot)
- {
- foreach(Session session in sessions)
- {
- session.Stop();
- }
- }
- }
- }
-
- /// <summary>
- /// Creates a new session to work on this connection
- /// </summary>
- public ISession CreateSession()
- {
- return CreateSession(acknowledgementMode);
- }
-
- /// <summary>
- /// Creates a new session to work on this connection
- /// </summary>
- public ISession CreateSession(AcknowledgementMode sessionAcknowledgementMode)
- {
- SessionInfo info = CreateSessionInfo(sessionAcknowledgementMode);
- Session session = new Session(this, info, sessionAcknowledgementMode, this.dispatchAsync);
-
- // Set properties on session using parameters prefixed with "session."
- URISupport.CompositeData c = URISupport.ParseComposite(this.brokerUri);
- URISupport.SetProperties(session, c.Parameters, "session.");
-
- if(IsStarted)
- {
- session.Start();
- }
-
- sessions.Add(session);
- return session;
- }
-
- internal void RemoveSession(Session session)
- {
- if(!this.closing)
- {
- sessions.Remove(session);
- }
- }
-
- internal void addDispatcher(ConsumerId id, IDispatcher dispatcher)
- {
- this.dispatchers.Add(id, dispatcher);
- }
-
- internal void removeDispatcher(ConsumerId id)
- {
- this.dispatchers.Remove(id);
- }
-
- public void Close()
- {
- lock(myLock)
- {
- if(this.closed)
- {
- return;
- }
-
- try
- {
- Tracer.Info("Closing Connection.");
- this.closing = true;
- lock(sessions.SyncRoot)
- {
- foreach(Session session in sessions)
- {
- session.DoClose();
- }
- }
- sessions.Clear();
-
- if(connected)
- {
- ShutdownInfo shutdowninfo = new ShutdownInfo();
- transport.Oneway(shutdowninfo);
- }
-
- Tracer.Info("Disposing of the Transport.");
- transport.Dispose();
- }
- catch(Exception ex)
- {
- Tracer.ErrorFormat("Error during connection close: {0}", ex);
- }
- finally
- {
- this.transport = null;
- this.closed = true;
- this.connected = false;
- this.closing = false;
- }
- }
- }
-
- public void Dispose()
- {
- Dispose(true);
- GC.SuppressFinalize(this);
- }
-
- protected void Dispose(bool disposing)
- {
- if(disposed)
- {
- return;
- }
-
- if(disposing)
- {
- // Dispose managed code here.
- }
-
- try
- {
- // For now we do not distinguish between Dispose() and Close().
- // In theory Dispose should possibly be lighter-weight and perform a (faster)
- // disorderly close.
- Close();
- }
- catch
- {
- // Ignore network errors.
- }
-
- disposed = true;
- }
-
- // Implementation methods
-
- /// <summary>
- /// Performs a synchronous request-response with the broker
- /// </summary>
- ///
-
- public Response SyncRequest(Command command)
- {
- try
- {
- return SyncRequest(command, this.RequestTimeout);
- }
- catch(Exception ex)
- {
- throw NMSExceptionSupport.Create(ex);
- }
- }
-
- public Response SyncRequest(Command command, TimeSpan requestTimeout)
- {
- CheckConnected();
-
- try
- {
- Response response = transport.Request(command, requestTimeout);
- if(response is ExceptionResponse)
- {
- ExceptionResponse exceptionResponse = (ExceptionResponse) response;
- BrokerError brokerError = exceptionResponse.Exception;
- throw new BrokerException(brokerError);
- }
- return response;
- }
- catch(Exception ex)
- {
- throw NMSExceptionSupport.Create(ex);
- }
- }
-
- public void Oneway(Command command)
- {
- CheckConnected();
-
- try
- {
- transport.Oneway(command);
- }
- catch(Exception ex)
- {
- throw NMSExceptionSupport.Create(ex);
- }
- }
-
- protected void CheckConnected()
- {
- if(closed)
- {
- throw new ConnectionClosedException();
- }
-
- if(!connected)
- {
- if(!this.userSpecifiedClientID)
- {
- this.info.ClientId = this.clientIdGenerator.GenerateId();
- }
-
- connected = true;
- // now lets send the connection and see if we get an ack/nak
- if(null == SyncRequest(info))
- {
- closed = true;
- connected = false;
- throw new ConnectionClosedException();
- }
- }
- }
-
- /// <summary>
- /// Handle incoming commands
- /// </summary>
- /// <param name="commandTransport">An ITransport</param>
- /// <param name="command">A Command</param>
- protected void OnCommand(ITransport commandTransport, Command command)
- {
- if(command is MessageDispatch)
- {
+ public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
+ {
+ this.brokerUri = connectionUri;
+ this.clientIdGenerator = clientIdGenerator;
+
+ this.transport = transport;
+ this.transport.Command = new CommandHandler(OnCommand);
+ this.transport.Exception = new ExceptionHandler(OnException);
+ this.transport.Interrupted = new InterruptedHandler(OnTransportInterrupted);
+ this.transport.Resumed = new ResumedHandler(OnTransportResumed);
+
+ ConnectionId id = new ConnectionId();
+ id.Value = CONNECTION_ID_GENERATOR.GenerateId();
+
+ this.info = new ConnectionInfo();
+ this.info.ConnectionId = id;
+ }
+
+ ~Connection()
+ {
+ Dispose(false);
+ }
+
+ /// <summary>
+ /// A delegate that can receive transport level exceptions.
+ /// </summary>
+ public event ExceptionListener ExceptionListener;
+
+ /// <summary>
+ /// An asynchronous listener that is notified when a Fault tolerant connection
+ /// has been interrupted.
+ /// </summary>
+ public event ConnectionInterruptedListener ConnectionInterruptedListener;
+
+ /// <summary>
+ /// An asynchronous listener that is notified when a Fault tolerant connection
+ /// has been resumed.
+ /// </summary>
+ public event ConnectionResumedListener ConnectionResumedListener;
+
+ #region Properties
+
+ public String UserName
+ {
+ get { return this.info.UserName; }
+ set { this.info.UserName = value; }
+ }
+
+ public String Password
+ {
+ get { return this.info.Password; }
+ set { this.info.Password = value; }
+ }
+
+ /// <summary>
+ /// This property indicates whether or not async send is enabled.
+ /// </summary>
+ public bool AsyncSend
+ {
+ get { return asyncSend; }
+ set { asyncSend = value; }
+ }
+
+ /// <summary>
+ /// This property sets the acknowledgment mode for the connection.
+ /// The URI parameter connection.ackmode can be set to a string value
+ /// that maps to the enumeration value.
+ /// </summary>
+ public string AckMode
+ {
+ set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); }
+ }
+
+ /// <summary>
+ /// This property forces all messages that are sent to be sent synchronously overriding
+ /// any usage of the AsyncSend flag. This can reduce performance in some cases since the
+ /// only messages we normally send synchronously are Persistent messages not sent in a
+ /// transaction. This options guarantees that no send will return until the broker has
+ /// acknowledge receipt of the message
+ /// </summary>
+ public bool AlwaysSyncSend
+ {
+ get { return alwaysSyncSend; }
+ set { alwaysSyncSend = value; }
+ }
+
+ /// <summary>
+ /// This property indicates whether Message's should be copied before being sent via
+ /// one of the Connection's send methods. Copying the Message object allows the user
+ /// to resuse the Object over for another send. If the message isn't copied performance
+ /// can improve but the user must not reuse the Object as it may not have been sent
+ /// before they reset its payload.
+ /// </summary>
+ public bool CopyMessageOnSend
+ {
+ get { return copyMessageOnSend; }
+ set { copyMessageOnSend = value; }
+ }
+
+ /// <summary>
+ /// This property indicates whether or not async sends are used for
+ /// message acknowledgement messages. Sending Acks async can improve
+ /// performance but may decrease reliability.
+ /// </summary>
+ public bool SendAcksAsync
+ {
+ get { return sendAcksAsync; }
+ set { sendAcksAsync = value; }
+ }
+
+ /// <summary>
+ /// synchronously or asynchronously by the broker. Set to false for a slow
+ /// consumer and true for a fast consumer.
+ /// </summary>
+ public bool DispatchAsync
+ {
+ get { return this.dispatchAsync; }
+ set { this.dispatchAsync = value; }
+ }
+
+ /// <summary>
+ /// Sets the default Transformation attribute applied to Consumers. If a consumer
+ /// is to receive Map messages from the Broker then the user should set the "jms-map-xml"
+ /// transformation on the consumer so that all MapMessages are sent as XML.
+ /// </summary>
+ public string Transformation
+ {
+ get { return this.transformation; }
+ set { this.transformation = value; }
+ }
+
+ public IConnectionMetaData MetaData
+ {
+ get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); }
+ }
+
+ public Uri BrokerUri
+ {
+ get { return brokerUri; }
+ }
+
+ public ITransport ITransport
+ {
+ get { return transport; }
+ set { this.transport = value; }
+ }
+
+ public TimeSpan RequestTimeout
+ {
+ get { return this.requestTimeout; }
+ set { this.requestTimeout = value; }
+ }
+
+ public AcknowledgementMode AcknowledgementMode
+ {
+ get { return acknowledgementMode; }
+ set { this.acknowledgementMode = value; }
+ }
+
+ public string ClientId
+ {
+ get { return info.ClientId; }
+ set
+ {
+ if(this.connected)
+ {
+ throw new NMSException("You cannot change the ClientId once the Connection is connected");
+ }
+
+ this.info.ClientId = value;
+ this.userSpecifiedClientID = true;
+ CheckConnected();
+ }
+ }
+
+ /// <summary>
+ /// The Default Client Id used if the ClientId property is not set explicity.
+ /// </summary>
+ public string DefaultClientId
+ {
+ set
+ {
+ this.info.ClientId = value;
+ this.userSpecifiedClientID = true;
+ }
+ }
+
+ public ConnectionId ConnectionId
+ {
+ get { return info.ConnectionId; }
+ }
+
+ /// <summary>
+ /// Get/or set the redelivery policy for this connection.
+ /// </summary>
+ public IRedeliveryPolicy RedeliveryPolicy
+ {
+ get { return this.redeliveryPolicy; }
+ set { this.redeliveryPolicy = value; }
+ }
+
+ public PrefetchPolicy PrefetchPolicy
+ {
+ get { return this.prefetchPolicy; }
+ set { this.prefetchPolicy = value; }
+ }
+
+ #endregion
+
+ /// <summary>
+ /// Starts asynchronous message delivery of incoming messages for this connection.
+ /// Synchronous delivery is unaffected.
+ /// </summary>
+ public void Start()
+ {
+ CheckConnected();
+ if(started.CompareAndSet(false, true))
+ {
+ lock(sessions.SyncRoot)
+ {
+ foreach(Session session in sessions)
+ {
+ session.Start();
+ }
+ }
+ }
+ }
+
+ /// <summary>
+ /// This property determines if the asynchronous message delivery of incoming
+ /// messages has been started for this connection.
+ /// </summary>
+ public bool IsStarted
+ {
+ get { return started.Value; }
+ }
+
+ /// <summary>
+ /// Temporarily stop asynchronous delivery of inbound messages for this connection.
+ /// The sending of outbound messages is unaffected.
+ /// </summary>
+ public void Stop()
+ {
+ CheckConnected();
+ if(started.CompareAndSet(true, false))
+ {
+ lock(sessions.SyncRoot)
+ {
+ foreach(Session session in sessions)
+ {
+ session.Stop();
+ }
+ }
+ }
+ }
+
+ /// <summary>
+ /// Creates a new session to work on this connection
+ /// </summary>
+ public ISession CreateSession()
+ {
+ return CreateSession(acknowledgementMode);
+ }
+
+ /// <summary>
+ /// Creates a new session to work on this connection
+ /// </summary>
+ public ISession CreateSession(AcknowledgementMode sessionAcknowledgementMode)
+ {
+ SessionInfo info = CreateSessionInfo(sessionAcknowledgementMode);
+ Session session = new Session(this, info, sessionAcknowledgementMode, this.dispatchAsync);
+
+ // Set properties on session using parameters prefixed with "session."
+ StringDictionary options = URISupport.ParseQuery(this.brokerUri.Query);
+ URISupport.SetProperties(session, options, "session.");
+
+ if(IsStarted)
+ {
+ session.Start();
+ }
+
+ sessions.Add(session);
+ return session;
+ }
+
+ internal void RemoveSession(Session session)
+ {
+ if(!this.closing)
+ {
+ sessions.Remove(session);
+ }
+ }
+
+ internal void addDispatcher(ConsumerId id, IDispatcher dispatcher)
+ {
+ this.dispatchers.Add(id, dispatcher);
+ }
+
+ internal void removeDispatcher(ConsumerId id)
+ {
+ this.dispatchers.Remove(id);
+ }
+
+ public void Close()
+ {
+ lock(myLock)
+ {
+ if(this.closed)
+ {
+ return;
+ }
+
+ try
+ {
+ Tracer.Info("Closing Connection.");
+ this.closing = true;
+ lock(sessions.SyncRoot)
+ {
+ foreach(Session session in sessions)
+ {
+ session.DoClose();
+ }
+ }
+ sessions.Clear();
+
+ if(connected)
+ {
+ ShutdownInfo shutdowninfo = new ShutdownInfo();
+ transport.Oneway(shutdowninfo);
+ }
+
+ Tracer.Info("Disposing of the Transport.");
+ transport.Dispose();
+ }
+ catch(Exception ex)
+ {
+ Tracer.ErrorFormat("Error during connection close: {0}", ex);
+ }
+ finally
+ {
+ this.transport = null;
+ this.closed = true;
+ this.connected = false;
+ this.closing = false;
+ }
+ }
+ }
+
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ protected void Dispose(bool disposing)
+ {
+ if(disposed)
+ {
+ return;
+ }
+
+ if(disposing)
+ {
+ // Dispose managed code here.
+ }
+
+ try
+ {
+ // For now we do not distinguish between Dispose() and Close().
+ // In theory Dispose should possibly be lighter-weight and perform a (faster)
+ // disorderly close.
+ Close();
+ }
+ catch
+ {
+ // Ignore network errors.
+ }
+
+ disposed = true;
+ }
+
+ // Implementation methods
+
+ /// <summary>
+ /// Performs a synchronous request-response with the broker
+ /// </summary>
+ ///
+
+ public Response SyncRequest(Command command)
+ {
+ try
+ {
+ return SyncRequest(command, this.RequestTimeout);
+ }
+ catch(Exception ex)
+ {
+ throw NMSExceptionSupport.Create(ex);
+ }
+ }
+
+ public Response SyncRequest(Command command, TimeSpan requestTimeout)
+ {
+ CheckConnected();
+
+ try
+ {
+ Response response = transport.Request(command, requestTimeout);
+ if(response is ExceptionResponse)
+ {
+ ExceptionResponse exceptionResponse = (ExceptionResponse) response;
+ BrokerError brokerError = exceptionResponse.Exception;
+ throw new BrokerException(brokerError);
+ }
+ return response;
+ }
+ catch(Exception ex)
+ {
+ throw NMSExceptionSupport.Create(ex);
+ }
+ }
+
+ public void Oneway(Command command)
+ {
+ CheckConnected();
+
+ try
+ {
+ transport.Oneway(command);
+ }
+ catch(Exception ex)
+ {
+ throw NMSExceptionSupport.Create(ex);
+ }
+ }
+
+ protected void CheckConnected()
+ {
+ if(closed)
+ {
+ throw new ConnectionClosedException();
+ }
+
+ if(!connected)
+ {
+ if(!this.userSpecifiedClientID)
+ {
+ this.info.ClientId = this.clientIdGenerator.GenerateId();
+ }
+
+ connected = true;
+ // now lets send the connection and see if we get an ack/nak
+ if(null == SyncRequest(info))
+ {
+ closed = true;
+ connected = false;
+ throw new ConnectionClosedException();
+ }
+ }
+ }
+
+ /// <summary>
+ /// Handle incoming commands
+ /// </summary>
+ /// <param name="commandTransport">An ITransport</param>
+ /// <param name="command">A Command</param>
+ protected void OnCommand(ITransport commandTransport, Command command)
+ {
+ if(command is MessageDispatch)
+ {
// We wait if the Connection is still processing interruption
// code to reset the MessageConsumers.
WaitForTransportInterruptionProcessingToComplete();
- DispatchMessage((MessageDispatch) command);
- }
- else if(command is ConnectionError)
- {
- if(!closing && !closed)
- {
- ConnectionError connectionError = (ConnectionError) command;
- BrokerError brokerError = connectionError.Exception;
- string message = "Broker connection error.";
- string cause = "";
-
- if(null != brokerError)
- {
- message = brokerError.Message;
- if(null != brokerError.Cause)
- {
- cause = brokerError.Cause.Message;
- }
- }
-
- OnException(commandTransport, new NMSConnectionException(message, cause));
- }
- }
- else
- {
- Tracer.Error("Unknown command: " + command);
- }
- }
-
- protected void DispatchMessage(MessageDispatch dispatch)
- {
- lock(dispatchers.SyncRoot)
- {
- if(dispatchers.Contains(dispatch.ConsumerId))
- {
- IDispatcher dispatcher = (IDispatcher) dispatchers[dispatch.ConsumerId];
-
- // Can be null when a consumer has sent a MessagePull and there was
- // no available message at the broker to dispatch.
- if(dispatch.Message != null)
- {
- dispatch.Message.ReadOnlyBody = true;
- dispatch.Message.ReadOnlyProperties = true;
- dispatch.Message.RedeliveryCounter = dispatch.RedeliveryCounter;
- }
-
- dispatcher.Dispatch(dispatch);
-
- return;
- }
- }
-
- Tracer.Error("No such consumer active: " + dispatch.ConsumerId);
- }
-
- protected void OnException(ITransport sender, Exception exception)
- {
- if(ExceptionListener != null && !this.closing)
- {
- try
- {
- ExceptionListener(exception);
- }
- catch
- {
- sender.Dispose();
- }
- }
- }
-
- protected void OnTransportInterrupted(ITransport sender)
- {
- Tracer.Debug("Transport has been Interrupted.");
+ DispatchMessage((MessageDispatch) command);
+ }
+ else if(command is ConnectionError)
+ {
+ if(!closing && !closed)
+ {
+ ConnectionError connectionError = (ConnectionError) command;
+ BrokerError brokerError = connectionError.Exception;
+ string message = "Broker connection error.";
+ string cause = "";
+
+ if(null != brokerError)
+ {
+ message = brokerError.Message;
+ if(null != brokerError.Cause)
+ {
+ cause = brokerError.Cause.Message;
+ }
+ }
+
+ OnException(commandTransport, new NMSConnectionException(message, cause));
+ }
+ }
+ else
+ {
+ Tracer.Error("Unknown command: " + command);
+ }
+ }
+
+ protected void DispatchMessage(MessageDispatch dispatch)
+ {
+ lock(dispatchers.SyncRoot)
+ {
+ if(dispatchers.Contains(dispatch.ConsumerId))
+ {
+ IDispatcher dispatcher = (IDispatcher) dispatchers[dispatch.ConsumerId];
+
+ // Can be null when a consumer has sent a MessagePull and there was
+ // no available message at the broker to dispatch.
+ if(dispatch.Message != null)
+ {
+ dispatch.Message.ReadOnlyBody = true;
+ dispatch.Message.ReadOnlyProperties = true;
+ dispatch.Message.RedeliveryCounter = dispatch.RedeliveryCounter;
+ }
+
+ dispatcher.Dispatch(dispatch);
+
+ return;
+ }
+ }
+
+ Tracer.Error("No such consumer active: " + dispatch.ConsumerId);
+ }
+
+ protected void OnException(ITransport sender, Exception exception)
+ {
+ if(ExceptionListener != null && !this.closing)
+ {
+ try
+ {
+ ExceptionListener(exception);
+ }
+ catch
+ {
+ sender.Dispose();
+ }
+ }
+ }
+
+ protected void OnTransportInterrupted(ITransport sender)
+ {
+ Tracer.Debug("Transport has been Interrupted.");
this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.Count);
if(Tracer.IsDebugEnabled)
@@ -616,82 +617,82 @@ namespace Apache.NMS.Stomp
Tracer.Debug("transport interrupted, dispatchers: " + dispatchers.Count);
}
- foreach(Session session in this.sessions)
- {
- session.ClearMessagesInProgress();
- }
-
- if(this.ConnectionInterruptedListener != null && !this.closing)
- {
- try
- {
- this.ConnectionInterruptedListener();
- }
- catch
- {
- }
- }
- }
-
- protected void OnTransportResumed(ITransport sender)
- {
- Tracer.Debug("Transport has resumed normal operation.");
-
- if(this.ConnectionResumedListener != null && !this.closing)
- {
- try
- {
- this.ConnectionResumedListener();
- }
- catch
- {
- }
- }
- }
-
- internal void OnSessionException(Session sender, Exception exception)
- {
- if(ExceptionListener != null)
- {
- try
- {
- ExceptionListener(exception);
- }
- catch
- {
- sender.Close();
- }
- }
- }
-
- /// <summary>
- /// Creates a new temporary destination name
- /// </summary>
- public String CreateTemporaryDestinationName()
- {
- return info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter);
- }
-
- /// <summary>
- /// Creates a new local transaction ID
- /// </summary>
- public TransactionId CreateLocalTransactionId()
- {
- TransactionId id = new TransactionId();
- id.ConnectionId = ConnectionId;
- id.Value = Interlocked.Increment(ref localTransactionCounter);
- return id;
- }
-
- protected SessionInfo CreateSessionInfo(AcknowledgementMode sessionAcknowledgementMode)
- {
- SessionInfo answer = new SessionInfo();
- SessionId sessionId = new SessionId();
- sessionId.ConnectionId = info.ConnectionId.Value;
- sessionId.Value = Interlocked.Increment(ref sessionCounter);
- answer.SessionId = sessionId;
- return answer;
- }
+ foreach(Session session in this.sessions)
+ {
+ session.ClearMessagesInProgress();
+ }
+
+ if(this.ConnectionInterruptedListener != null && !this.closing)
+ {
+ try
+ {
+ this.ConnectionInterruptedListener();
+ }
+ catch
+ {
+ }
+ }
+ }
+
+ protected void OnTransportResumed(ITransport sender)
+ {
+ Tracer.Debug("Transport has resumed normal operation.");
+
+ if(this.ConnectionResumedListener != null && !this.closing)
+ {
+ try
+ {
+ this.ConnectionResumedListener();
+ }
+ catch
+ {
+ }
+ }
+ }
+
+ internal void OnSessionException(Session sender, Exception exception)
+ {
+ if(ExceptionListener != null)
+ {
+ try
+ {
+ ExceptionListener(exception);
+ }
+ catch
+ {
+ sender.Close();
+ }
+ }
+ }
+
+ /// <summary>
+ /// Creates a new temporary destination name
+ /// </summary>
+ public String CreateTemporaryDestinationName()
+ {
+ return info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter);
+ }
+
+ /// <summary>
+ /// Creates a new local transaction ID
+ /// </summary>
+ public TransactionId CreateLocalTransactionId()
+ {
+ TransactionId id = new TransactionId();
+ id.ConnectionId = ConnectionId;
+ id.Value = Interlocked.Increment(ref localTransactionCounter);
+ return id;
+ }
+
+ protected SessionInfo CreateSessionInfo(AcknowledgementMode sessionAcknowledgementMode)
+ {
+ SessionInfo answer = new SessionInfo();
+ SessionId sessionId = new SessionId();
+ sessionId.ConnectionId = info.ConnectionId.Value;
+ sessionId.Value = Interlocked.Increment(ref sessionCounter);
+ answer.SessionId = sessionId;
+ return answer;
+ }
private void WaitForTransportInterruptionProcessingToComplete()
{
@@ -715,5 +716,5 @@ namespace Apache.NMS.Stomp
cdl.countDown();
}
}
- }
+ }
}