You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/04/19 00:32:36 UTC
svn commit: r1327710 -
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Connection.cs
Author: tabish
Date: Wed Apr 18 22:32:36 2012
New Revision: 1327710
URL: http://svn.apache.org/viewvc?rev=1327710&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQNET-377
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Connection.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Connection.cs?rev=1327710&r1=1327709&r2=1327710&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Connection.cs Wed Apr 18 22:32:36 2012
@@ -29,1270 +29,1270 @@ using Apache.NMS.Util;
namespace Apache.NMS.ActiveMQ
{
- /// <summary>
- /// Represents a connection with a message broker
- /// </summary>
- public class Connection : IConnection
- {
- private static readonly IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
-
- // Uri configurable options.
- private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
- private bool asyncSend = false;
- private bool alwaysSyncSend = false;
- private bool asyncClose = true;
- private bool useCompression = false;
- private bool copyMessageOnSend = true;
- private bool sendAcksAsync = false;
- private bool dispatchAsync = true;
- private int producerWindowSize = 0;
- private bool messagePrioritySupported = true;
- private bool watchTopicAdviosires = true;
-
- private bool userSpecifiedClientID;
- private readonly Uri brokerUri;
- private ITransport transport;
- private readonly ConnectionInfo info;
- private TimeSpan requestTimeout; // from connection factory
- private BrokerInfo brokerInfo; // from broker
- private readonly CountDownLatch brokerInfoReceived = new CountDownLatch(1);
- private WireFormatInfo brokerWireFormatInfo; // from broker
- private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
- private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
- private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable());
- private readonly IDictionary tempDests = Hashtable.Synchronized(new Hashtable());
- private readonly object myLock = new object();
- private readonly Atomic<bool> connected = new Atomic<bool>(false);
- private readonly Atomic<bool> closed = new Atomic<bool>(false);
- private readonly Atomic<bool> closing = new Atomic<bool>(false);
- private readonly Atomic<bool> transportFailed = new Atomic<bool>(false);
- private Exception firstFailureError = null;
- private int sessionCounter = 0;
- private int temporaryDestinationCounter = 0;
- private int localTransactionCounter;
- private readonly Atomic<bool> started = new Atomic<bool>(false);
- private ConnectionMetaData metaData = null;
- private bool disposed = false;
- private IRedeliveryPolicy redeliveryPolicy;
- private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
- private ICompressionPolicy compressionPolicy = new CompressionPolicy();
- private readonly IdGenerator clientIdGenerator;
- private int consumerIdCounter = 0;
- private volatile CountDownLatch transportInterruptionProcessingComplete;
- private readonly MessageTransformation messageTransformation;
- private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor();
- private AdvisoryConsumer advisoryConsumer = null;
-
- public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
- {
- this.brokerUri = connectionUri;
- this.clientIdGenerator = clientIdGenerator;
-
- this.transport = transport;
- this.transport.Command = new CommandHandler(OnCommand);
- this.transport.Exception = new ExceptionHandler(OnTransportException);
- this.transport.Interrupted = new InterruptedHandler(OnTransportInterrupted);
- this.transport.Resumed = new ResumedHandler(OnTransportResumed);
-
- ConnectionId id = new ConnectionId();
- id.Value = CONNECTION_ID_GENERATOR.GenerateId();
-
- this.info = new ConnectionInfo();
- this.info.ConnectionId = id;
- this.info.FaultTolerant = transport.IsFaultTolerant;
-
- this.messageTransformation = new ActiveMQMessageTransformation(this);
- }
-
- ~Connection()
- {
- Dispose(false);
- }
-
- /// <summary>
- /// A delegate that can receive transport level exceptions.
- /// </summary>
- public event ExceptionListener ExceptionListener;
-
- /// <summary>
- /// An asynchronous listener that is notified when a Fault tolerant connection
- /// has been interrupted.
- /// </summary>
- public event ConnectionInterruptedListener ConnectionInterruptedListener;
-
- /// <summary>
- /// An asynchronous listener that is notified when a Fault tolerant connection
- /// has been resumed.
- /// </summary>
- public event ConnectionResumedListener ConnectionResumedListener;
-
- private ConsumerTransformerDelegate consumerTransformer;
- public ConsumerTransformerDelegate ConsumerTransformer
- {
- get { return this.consumerTransformer; }
- set { this.consumerTransformer = value; }
- }
-
- private ProducerTransformerDelegate producerTransformer;
- public ProducerTransformerDelegate ProducerTransformer
- {
- get { return this.producerTransformer; }
- set { this.producerTransformer = value; }
- }
-
- #region Properties
-
- public String UserName
- {
- get { return this.info.UserName; }
- set { this.info.UserName = value; }
- }
-
- public String Password
- {
- get { return this.info.Password; }
- set { this.info.Password = value; }
- }
-
- /// <summary>
- /// This property indicates what version of the Protocol we are using to
- /// communicate with the Broker, if not set we return the lowest version
- /// number to indicate we support only the basic command set.
- /// </summary>
- public int ProtocolVersion
- {
- get
- {
- if(brokerWireFormatInfo != null)
- {
- return brokerWireFormatInfo.Version;
- }
-
- return 1;
- }
- }
-
- /// <summary>
- /// This property indicates whether or not async send is enabled.
- /// </summary>
- public bool AsyncSend
- {
- get { return asyncSend; }
- set { asyncSend = value; }
- }
-
- /// <summary>
- /// This property indicates whether or not async close is enabled.
- /// When the connection is closed, it will either send a synchronous
- /// DisposeOf command to the broker and wait for confirmation (if true),
- /// or it will send the DisposeOf command asynchronously.
- /// </summary>
- public bool AsyncClose
- {
- get { return asyncClose; }
- set { asyncClose = value; }
- }
-
- /// <summary>
- /// This property indicates whether or not async sends are used for
- /// message acknowledgement messages. Sending Acks async can improve
- /// performance but may decrease reliability.
- /// </summary>
- public bool SendAcksAsync
- {
- get { return sendAcksAsync; }
- set { sendAcksAsync = value; }
- }
-
- /// <summary>
- /// This property sets the acknowledgment mode for the connection.
- /// The URI parameter connection.ackmode can be set to a string value
- /// that maps to the enumeration value.
- /// </summary>
- public string AckMode
- {
- set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); }
- }
-
- /// <summary>
- /// This property is the maximum number of bytes in memory that a producer will transmit
- /// to a broker before waiting for acknowledgement messages from the broker that it has
- /// accepted the previously sent messages. In other words, this how you configure the
- /// producer flow control window that is used for async sends where the client is responsible
- /// for managing memory usage. The default value of 0 means no flow control at the client
- /// </summary>
- public int ProducerWindowSize
- {
- get { return producerWindowSize; }
- set { producerWindowSize = value; }
- }
-
- /// <summary>
- /// This property forces all messages that are sent to be sent synchronously overriding
- /// any usage of the AsyncSend flag. This can reduce performance in some cases since the
- /// only messages we normally send synchronously are Persistent messages not sent in a
- /// transaction. This options guarantees that no send will return until the broker has
- /// acknowledge receipt of the message
- /// </summary>
- public bool AlwaysSyncSend
- {
- get { return alwaysSyncSend; }
- set { alwaysSyncSend = value; }
- }
-
- /// <summary>
- /// This property indicates whether Message's should be copied before being sent via
- /// one of the Connection's send methods. Copying the Message object allows the user
- /// to resuse the Object over for another send. If the message isn't copied performance
- /// can improve but the user must not reuse the Object as it may not have been sent
- /// before they reset its payload.
- /// </summary>
- public bool CopyMessageOnSend
- {
- get { return copyMessageOnSend; }
- set { copyMessageOnSend = value; }
- }
-
- /// <summary>
- /// Enable or Disable the use of Compression on Message bodies. When enabled all
- /// messages have their body compressed using the Deflate compression algorithm.
- /// The recipient of the message must support the use of message compression as well
- /// otherwise the receiving client will receive a message whose body appears in the
- /// compressed form.
- /// </summary>
- public bool UseCompression
- {
- get { return this.useCompression; }
- set { this.useCompression = value; }
- }
-
- /// <summary>
- /// Indicate whether or not the resources of this Connection should support the
- /// Message Priority value of incoming messages and dispatch them accordingly.
- /// When disabled Message are always dispatched to Consumers in FIFO order.
- /// </summary>
- public bool MessagePrioritySupported
- {
- get { return this.messagePrioritySupported; }
- set { this.messagePrioritySupported = value; }
- }
-
- public IConnectionMetaData MetaData
- {
- get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); }
- }
-
- public Uri BrokerUri
- {
- get { return brokerUri; }
- }
-
- public ITransport ITransport
- {
- get { return transport; }
- set { this.transport = value; }
- }
-
- public bool TransportFailed
- {
- get { return this.transportFailed.Value; }
- }
-
- public Exception FirstFailureError
- {
- get { return this.firstFailureError; }
- }
-
- public TimeSpan RequestTimeout
- {
- get { return this.requestTimeout; }
- set { this.requestTimeout = value; }
- }
-
- public AcknowledgementMode AcknowledgementMode
- {
- get { return acknowledgementMode; }
- set { this.acknowledgementMode = value; }
- }
-
- /// <summary>
- /// synchronously or asynchronously by the broker.
- /// </summary>
- public bool DispatchAsync
- {
- get { return this.dispatchAsync; }
- set { this.dispatchAsync = value; }
- }
-
- public bool WatchTopicAdvisories
- {
- get { return this.watchTopicAdviosires; }
- set { this.watchTopicAdviosires = value; }
- }
-
- public string ClientId
- {
- get { return info.ClientId; }
- set
- {
- if(this.connected.Value)
- {
- throw new NMSException("You cannot change the ClientId once the Connection is connected");
- }
-
- this.info.ClientId = value;
- this.userSpecifiedClientID = true;
- CheckConnected();
- }
- }
-
- /// <summary>
- /// The Default Client Id used if the ClientId property is not set explicity.
- /// </summary>
- public string DefaultClientId
- {
- set
- {
- this.info.ClientId = value;
- this.userSpecifiedClientID = true;
- }
- }
-
- public ConnectionId ConnectionId
- {
- get { return info.ConnectionId; }
- }
-
- public BrokerInfo BrokerInfo
- {
- get { return brokerInfo; }
- }
-
- public WireFormatInfo BrokerWireFormat
- {
- get { return brokerWireFormatInfo; }
- }
-
- public String ResourceManagerId
- {
- get
- {
- this.brokerInfoReceived.await();
- return brokerInfo.BrokerId.Value;
- }
- }
-
- /// <summary>
- /// Get/or set the redelivery policy for this connection.
- /// </summary>
- public IRedeliveryPolicy RedeliveryPolicy
- {
- get { return this.redeliveryPolicy; }
- set { this.redeliveryPolicy = value; }
- }
-
- public PrefetchPolicy PrefetchPolicy
- {
- get { return this.prefetchPolicy; }
- set { this.prefetchPolicy = value; }
- }
-
- public ICompressionPolicy CompressionPolicy
- {
- get { return this.compressionPolicy; }
- set { this.compressionPolicy = value; }
- }
-
- internal MessageTransformation MessageTransformation
- {
- get { return this.messageTransformation; }
- }
-
- #endregion
-
- /// <summary>
- /// Starts asynchronous message delivery of incoming messages for this connection.
- /// Synchronous delivery is unaffected.
- /// </summary>
- public void Start()
- {
- CheckConnected();
- if(started.CompareAndSet(false, true))
- {
- lock(sessions.SyncRoot)
- {
- foreach(Session session in sessions)
- {
- session.Start();
- }
- }
- }
- }
-
- /// <summary>
- /// This property determines if the asynchronous message delivery of incoming
- /// messages has been started for this connection.
- /// </summary>
- public bool IsStarted
- {
- get { return started.Value; }
- }
-
- /// <summary>
- /// Temporarily stop asynchronous delivery of inbound messages for this connection.
- /// The sending of outbound messages is unaffected.
- /// </summary>
- public void Stop()
- {
- if(started.CompareAndSet(true, false))
- {
- lock(sessions.SyncRoot)
- {
- foreach(Session session in sessions)
- {
- session.Stop();
- }
- }
- }
- }
-
- /// <summary>
- /// Creates a new session to work on this connection
- /// </summary>
- public ISession CreateSession()
- {
- return CreateAtiveMQSession(acknowledgementMode);
- }
-
- /// <summary>
- /// Creates a new session to work on this connection
- /// </summary>
- public ISession CreateSession(AcknowledgementMode sessionAcknowledgementMode)
- {
- return CreateAtiveMQSession(sessionAcknowledgementMode);
- }
-
- protected virtual Session CreateAtiveMQSession(AcknowledgementMode ackMode)
- {
- CheckConnected();
- return new Session(this, NextSessionId, ackMode);
- }
-
- internal void AddSession(Session session)
- {
- if(!this.closing.Value)
- {
- sessions.Add(session);
- }
- }
-
- internal void RemoveSession(Session session)
- {
- if(!this.closing.Value)
- {
- sessions.Remove(session);
- }
- }
-
- internal void addDispatcher(ConsumerId id, IDispatcher dispatcher)
- {
- if(!this.closing.Value)
- {
- this.dispatchers.Add(id, dispatcher);
- }
- }
-
- internal void removeDispatcher(ConsumerId id)
- {
- if(!this.closing.Value)
- {
- this.dispatchers.Remove(id);
- }
- }
-
- internal void addProducer(ProducerId id, MessageProducer producer)
- {
- if(!this.closing.Value)
- {
- this.producers.Add(id, producer);
- }
- }
-
- internal void removeProducer(ProducerId id)
- {
- if(!this.closing.Value)
- {
- this.producers.Remove(id);
- }
- }
-
- public void Close()
- {
- if(!this.closed.Value && !transportFailed.Value)
- {
- this.Stop();
- }
-
- lock(myLock)
- {
- if(this.closed.Value)
- {
- return;
- }
-
- try
- {
- Tracer.InfoFormat("Connection[{0}]: Closing Connection Now.", this.ConnectionId);
- this.closing.Value = true;
-
- if(this.advisoryConsumer != null)
- {
- this.advisoryConsumer.Dispose();
- this.advisoryConsumer = null;
- }
-
- lock(sessions.SyncRoot)
- {
- foreach(Session session in sessions)
- {
- session.Shutdown();
- }
- }
- sessions.Clear();
-
- if(this.tempDests.Count > 0)
- {
- // Make a copy of the destinations to delete, because the act of deleting
- // them will modify the collection.
- ActiveMQTempDestination[] tempDestsToDelete = new ActiveMQTempDestination[this.tempDests.Count];
-
- this.tempDests.Values.CopyTo(tempDestsToDelete, 0);
- foreach(ActiveMQTempDestination dest in tempDestsToDelete)
- {
- dest.Delete();
- }
- }
-
- // Connected is true only when we've successfully sent our ConnectionInfo
- // to the broker, so if we haven't announced ourselves there's no need to
- // inform the broker of a remove, and if the transport is failed, why bother.
- if(connected.Value && !transportFailed.Value)
- {
- DisposeOf(ConnectionId);
- ShutdownInfo shutdowninfo = new ShutdownInfo();
- transport.Oneway(shutdowninfo);
- }
-
- executor.Shutdown();
-
- Tracer.DebugFormat("Connection[{0}]: Disposing of the Transport.", this.ConnectionId);
- transport.Dispose();
- }
- catch(Exception ex)
- {
- Tracer.ErrorFormat("Connection[{0}]: Error during connection close: {1}", ConnectionId, ex);
- }
- finally
- {
- if(executor != null)
- {
- executor.Shutdown();
- }
-
- this.transport = null;
- this.closed.Value = true;
- this.connected.Value = false;
- this.closing.Value = false;
- }
- }
- }
-
- public void Dispose()
- {
- Dispose(true);
- GC.SuppressFinalize(this);
- }
-
- protected void Dispose(bool disposing)
- {
- if(disposed)
- {
- return;
- }
-
- if(disposing)
- {
- // Dispose managed code here.
- }
-
- try
- {
- Close();
- }
- catch
- {
- // Ignore network errors.
- }
-
- disposed = true;
- }
-
- // Implementation methods
-
- /// <summary>
- /// Performs a synchronous request-response with the broker
- /// </summary>
- ///
- public Response SyncRequest(Command command)
- {
- return SyncRequest(command, this.RequestTimeout);
- }
-
- /// <summary>
- /// Performs a synchronous request-response with the broker for requested timeout duration.
- /// </summary>
- /// <param name="command"></param>
- /// <param name="requestTimeout"></param>
- /// <returns></returns>
- public Response SyncRequest(Command command, TimeSpan requestTimeout)
- {
- CheckConnected();
-
- try
- {
- Response response = transport.Request(command, requestTimeout);
- if(response is ExceptionResponse)
- {
- ExceptionResponse exceptionResponse = (ExceptionResponse)response;
- BrokerError brokerError = exceptionResponse.Exception;
- throw new BrokerException(brokerError);
- }
- return response;
- }
- catch(Exception ex)
- {
- throw NMSExceptionSupport.Create(ex);
- }
- }
-
- public void Oneway(Command command)
- {
- CheckConnected();
-
- try
- {
- transport.Oneway(command);
- }
- catch(Exception ex)
- {
- throw NMSExceptionSupport.Create(ex);
- }
- }
-
- private void DisposeOf(DataStructure objectId)
- {
- try
- {
- RemoveInfo command = new RemoveInfo();
- command.ObjectId = objectId;
- if(asyncClose)
- {
- Tracer.DebugFormat("Connection[{0}]: Asynchronously disposing of Connection.", this.ConnectionId);
- if(connected.Value)
- {
- transport.Oneway(command);
- if (Tracer.IsDebugEnabled)
- {
- Tracer.DebugFormat("Connection[{0}]: Oneway command sent to broker: {1}",
- this.ConnectionId, command);
- }
- }
- }
- else
- {
- // Ensure that the object is disposed to avoid potential race-conditions
- // of trying to re-create the same object in the broker faster than
- // the broker can dispose of the object. Allow up to 5 seconds to process.
- Tracer.DebugFormat("Connection[{0}]: Synchronously disposing of Connection.", this.ConnectionId);
- SyncRequest(command, TimeSpan.FromSeconds(5));
- Tracer.DebugFormat("Connection[{0}]: Synchronously closed of Connection.", this.ConnectionId);
- }
- }
- catch // (BrokerException)
- {
- // Ignore exceptions while shutting down.
- }
- }
-
- private object checkConnectedLock = new object();
-
- /// <summary>
- /// Check and ensure that the connection objcet is connected. If it is not
- /// connected or is closed, a ConnectionClosedException is thrown.
- /// </summary>
- internal void CheckConnected()
- {
- if(closed.Value)
- {
- throw new ConnectionClosedException();
- }
-
- if(!connected.Value)
- {
- DateTime timeoutTime = DateTime.Now + this.RequestTimeout;
- int waitCount = 1;
-
- while(true)
- {
- if(Monitor.TryEnter(checkConnectedLock))
- {
- try
- {
- if(!connected.Value)
- {
- if(!this.userSpecifiedClientID)
- {
- this.info.ClientId = this.clientIdGenerator.GenerateId();
- }
-
- try
- {
- if(null != transport)
- {
- // Send the connection and see if an ack/nak is returned.
- Response response = transport.Request(this.info, this.RequestTimeout);
- if(!(response is ExceptionResponse))
- {
- connected.Value = true;
- if(this.watchTopicAdviosires)
- {
- ConsumerId id = new ConsumerId(
- new SessionId(info.ConnectionId, -1),
- Interlocked.Increment(ref this.consumerIdCounter));
- this.advisoryConsumer = new AdvisoryConsumer(this, id);
- }
- }
- }
- }
- catch
- {
- }
- }
- }
- finally
- {
- Monitor.Exit(checkConnectedLock);
- }
- }
-
- if(connected.Value || DateTime.Now > timeoutTime)
- {
- break;
- }
-
- // Back off from being overly aggressive. Having too many threads
- // aggressively trying to connect to a down broker pegs the CPU.
- Thread.Sleep(5 * (waitCount++));
- }
-
- if(!connected.Value)
- {
- throw new ConnectionClosedException();
- }
- }
- }
-
- /// <summary>
- /// Handle incoming commands
- /// </summary>
- /// <param name="commandTransport">An ITransport</param>
- /// <param name="command">A Command</param>
- protected void OnCommand(ITransport commandTransport, Command command)
- {
- if(command.IsMessageDispatch)
- {
- WaitForTransportInterruptionProcessingToComplete();
- DispatchMessage((MessageDispatch)command);
- }
- else if(command.IsKeepAliveInfo)
- {
- OnKeepAliveCommand(commandTransport, (KeepAliveInfo)command);
- }
- else if(command.IsWireFormatInfo)
- {
- this.brokerWireFormatInfo = (WireFormatInfo)command;
- }
- else if(command.IsBrokerInfo)
- {
- this.brokerInfo = (BrokerInfo)command;
- this.brokerInfoReceived.countDown();
- }
- else if(command.IsShutdownInfo)
- {
- // Only terminate the connection if the transport we use is not fault
- // tolerant otherwise we let the transport deal with the broker closing
- // our connection and deal with IOException if it is sent to use.
- if(!closing.Value && !closed.Value && this.transport != null && !this.transport.IsFaultTolerant)
- {
- OnException(new NMSException("Broker closed this connection via Shutdown command."));
- }
- }
- else if(command.IsProducerAck)
- {
- ProducerAck ack = (ProducerAck)command as ProducerAck;
- if(ack.ProducerId != null)
- {
- MessageProducer producer = producers[ack.ProducerId] as MessageProducer;
- if(producer != null)
- {
- if(Tracer.IsDebugEnabled)
- {
- Tracer.DebugFormat("Connection[{0}]: Received a new ProducerAck -> ",
- this.ConnectionId, ack);
- }
-
- producer.OnProducerAck(ack);
- }
- }
- }
- else if(command.IsConnectionError)
- {
- if(!closing.Value && !closed.Value)
- {
- ConnectionError connectionError = (ConnectionError)command;
- BrokerError brokerError = connectionError.Exception;
- string message = "Broker connection error.";
- string cause = "";
-
- if(null != brokerError)
- {
- message = brokerError.Message;
- if(null != brokerError.Cause)
- {
- cause = brokerError.Cause.Message;
- }
- }
-
- Tracer.ErrorFormat("Connection[{0}]: ConnectionError: " + message + " : " + cause, this.ConnectionId);
- OnException(new NMSConnectionException(message, cause));
- }
- }
- else
- {
- Tracer.ErrorFormat("Connection[{0}]: Unknown command: " + command, this.ConnectionId);
- }
- }
-
- protected void DispatchMessage(MessageDispatch dispatch)
- {
- lock(dispatchers.SyncRoot)
- {
- if(dispatchers.Contains(dispatch.ConsumerId))
- {
- IDispatcher dispatcher = (IDispatcher)dispatchers[dispatch.ConsumerId];
-
- // Can be null when a consumer has sent a MessagePull and there was
- // no available message at the broker to dispatch or when signalled
- // that the end of a Queue browse has been reached.
- if(dispatch.Message != null)
- {
- dispatch.Message.ReadOnlyBody = true;
- dispatch.Message.ReadOnlyProperties = true;
- dispatch.Message.RedeliveryCounter = dispatch.RedeliveryCounter;
- }
-
- dispatcher.Dispatch(dispatch);
-
- return;
- }
- }
-
- Tracer.ErrorFormat("Connection[{0}]: No such consumer active: " + dispatch.ConsumerId, this.ConnectionId);
- }
-
- protected void OnKeepAliveCommand(ITransport commandTransport, KeepAliveInfo info)
- {
- try
- {
- if (connected.Value)
- {
- info.ResponseRequired = false;
- transport.Oneway(info);
- }
- }
- catch(Exception ex)
- {
- if(!closing.Value && !closed.Value)
- {
- OnException(ex);
- }
- }
- }
-
- internal void OnAsyncException(Exception error)
- {
- if(!this.closed.Value && !this.closing.Value)
- {
- if(this.ExceptionListener != null)
- {
- if(!(error is NMSException))
- {
- error = NMSExceptionSupport.Create(error);
- }
- NMSException e = (NMSException)error;
-
- // Called in another thread so that processing can continue
- // here, ensures no lock contention.
- executor.QueueUserWorkItem(AsyncCallExceptionListener, e);
- }
- else
- {
- Tracer.DebugFormat("Connection[{0}]: Async exception with no exception listener: " + error, this.ConnectionId);
- }
- }
- }
-
- private void AsyncCallExceptionListener(object error)
- {
- NMSException exception = error as NMSException;
- this.ExceptionListener(exception);
- }
-
- internal void OnTransportException(ITransport source, Exception cause)
- {
- this.OnException(cause);
- }
-
- internal void OnException(Exception error)
- {
- // Will fire an exception listener callback if there's any set.
- OnAsyncException(error);
-
- if(!this.closing.Value && !this.closed.Value)
- {
- // Perform the actual work in another thread to avoid lock contention
- // and allow the caller to continue on in its error cleanup.
- executor.QueueUserWorkItem(AsyncOnExceptionHandler, error);
- }
- }
-
- private void AsyncOnExceptionHandler(object error)
- {
- Exception cause = error as Exception;
-
- MarkTransportFailed(cause);
-
- try
- {
- this.transport.Dispose();
- }
- catch(Exception ex)
- {
- Tracer.DebugFormat("Connection[{0}]: Caught Exception While disposing of Transport: " + ex, this.ConnectionId);
- }
-
- this.brokerInfoReceived.countDown();
-
- IList sessionsCopy = null;
- lock(this.sessions.SyncRoot)
- {
- sessionsCopy = new ArrayList(this.sessions);
- }
-
- // Use a copy so we don't concurrently modify the Sessions list if the
- // client is closing at the same time.
- foreach(Session session in sessionsCopy)
- {
- try
- {
- session.Shutdown();
- }
- catch(Exception ex)
- {
- Tracer.DebugFormat("Connection[{0}]: Caught Exception While disposing of Sessions: " + ex, this.ConnectionId);
- }
- }
- }
-
- private void MarkTransportFailed(Exception error)
- {
- this.transportFailed.Value = true;
- if(this.firstFailureError == null)
- {
- this.firstFailureError = error;
- }
- }
-
- protected void OnTransportInterrupted(ITransport sender)
- {
- Tracer.Debug("Connection: Transport has been Interrupted.");
-
- // Ensure that if there's an advisory consumer we don't add it to the
- // set of consumers that need interruption processing.
- this.transportInterruptionProcessingComplete =
- new CountDownLatch(dispatchers.Count - (this.advisoryConsumer != null ? 1 : 0));
-
- if(Tracer.IsDebugEnabled)
- {
- Tracer.DebugFormat("Connection[{0}]: Transport interrupted, dispatchers: " + dispatchers.Count, this.ConnectionId);
- }
-
- SignalInterruptionProcessingNeeded();
-
- foreach(Session session in this.sessions)
- {
- try
- {
- session.ClearMessagesInProgress();
- }
- catch(Exception ex)
- {
- Tracer.WarnFormat("Connection[{0}]: Exception while clearing messages: " + ex.Message, this.ConnectionId);
- Tracer.Warn(ex.StackTrace);
- }
- }
-
- if(this.ConnectionInterruptedListener != null && !this.closing.Value)
- {
- try
- {
- this.ConnectionInterruptedListener();
- }
- catch
- {
- }
- }
- }
-
- protected void OnTransportResumed(ITransport sender)
- {
- Tracer.Debug("Transport has resumed normal operation.");
-
- if(this.ConnectionResumedListener != null && !this.closing.Value)
- {
- try
- {
- this.ConnectionResumedListener();
- }
- catch
- {
- }
- }
- }
-
- internal void OnSessionException(Session sender, Exception exception)
- {
- if(ExceptionListener != null)
- {
- try
- {
- ExceptionListener(exception);
- }
- catch
- {
- sender.Close();
- }
- }
- }
-
- /// <summary>
- /// Creates a new local transaction ID
- /// </summary>
- public LocalTransactionId CreateLocalTransactionId()
- {
- LocalTransactionId id = new LocalTransactionId();
- id.ConnectionId = ConnectionId;
- id.Value = Interlocked.Increment(ref localTransactionCounter);
- return id;
- }
-
- protected SessionId NextSessionId
- {
- get { return new SessionId(this.info.ConnectionId, Interlocked.Increment(ref this.sessionCounter)); }
- }
-
- public ActiveMQTempDestination CreateTemporaryDestination(bool topic)
- {
- ActiveMQTempDestination destination = null;
-
- if(topic)
- {
- destination = new ActiveMQTempTopic(
- info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter));
- }
- else
- {
- destination = new ActiveMQTempQueue(
- info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter));
- }
-
- DestinationInfo command = new DestinationInfo();
- command.ConnectionId = ConnectionId;
- command.OperationType = DestinationInfo.ADD_OPERATION_TYPE; // 0 is add
- command.Destination = destination;
-
- this.SyncRequest(command);
-
- destination = this.AddTempDestination(destination);
- destination.Connection = this;
-
- return destination;
- }
-
- protected void CreateTemporaryDestination(ActiveMQDestination tempDestination)
- {
- }
-
- public void DeleteTemporaryDestination(IDestination destination)
- {
- CheckClosedOrFailed();
-
- ActiveMQTempDestination temp = destination as ActiveMQTempDestination;
-
- foreach(Session session in this.sessions)
- {
- if(session.IsInUse(temp))
- {
- throw new NMSException("A consumer is consuming from the temporary destination");
- }
- }
-
- this.tempDests.Remove(destination as ActiveMQTempDestination);
- this.DeleteDestination(destination);
- }
-
- public void DeleteDestination(IDestination destination)
- {
- DestinationInfo command = new DestinationInfo();
- command.ConnectionId = this.ConnectionId;
- command.OperationType = DestinationInfo.REMOVE_OPERATION_TYPE; // 1 is remove
- command.Destination = (ActiveMQDestination)destination;
-
- this.Oneway(command);
- }
-
- private void WaitForTransportInterruptionProcessingToComplete()
- {
- CountDownLatch cdl = this.transportInterruptionProcessingComplete;
- if(cdl != null)
- {
- if(!closed.Value && cdl.Remaining > 0)
- {
- Tracer.WarnFormat("Connection[{0}]: Dispatch paused, waiting for outstanding dispatch interruption " +
- "processing (" + cdl.Remaining + ") to complete..", this.ConnectionId);
- cdl.await(TimeSpan.FromSeconds(10));
- }
-
- SignalInterruptionProcessingComplete();
- }
- }
-
- internal void TransportInterruptionProcessingComplete()
- {
- CountDownLatch cdl = this.transportInterruptionProcessingComplete;
- if(cdl != null)
- {
- cdl.countDown();
- try
- {
- SignalInterruptionProcessingComplete();
- }
- catch
- {
- }
- }
- }
-
- private void SignalInterruptionProcessingComplete()
- {
- CountDownLatch cdl = this.transportInterruptionProcessingComplete;
- if(cdl.Remaining == 0)
- {
- if(Tracer.IsDebugEnabled)
- {
- Tracer.DebugFormat("Connection[{0}]: transportInterruptionProcessingComplete.", this.info.ConnectionId);
- }
-
- this.transportInterruptionProcessingComplete = null;
-
- FailoverTransport failoverTransport = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
- if(failoverTransport != null)
- {
- failoverTransport.ConnectionInterruptProcessingComplete(this.info.ConnectionId);
- if(Tracer.IsDebugEnabled)
- {
- Tracer.DebugFormat("Connection[{0}]: notified failover transport (" + failoverTransport +
- ") of interruption completion.", this.ConnectionId);
- }
- }
- }
- }
-
- private void SignalInterruptionProcessingNeeded()
- {
- FailoverTransport failoverTransport = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
-
- if(failoverTransport != null)
- {
- failoverTransport.StateTracker.TransportInterrupted(this.info.ConnectionId);
- if(Tracer.IsDebugEnabled)
- {
- Tracer.DebugFormat("Connection[{0}]: notified failover transport (" + failoverTransport +
- ") of pending interruption processing.", this.ConnectionId);
- }
- }
- }
-
- internal ActiveMQTempDestination AddTempDestination(ActiveMQTempDestination dest)
- {
- ActiveMQTempDestination addedDest = dest;
-
- // .NET lacks a putIfAbsent operation for Maps.
- lock(tempDests.SyncRoot)
- {
- if(!this.tempDests.Contains(dest))
- {
- this.tempDests.Add(dest, dest);
- }
- else
- {
- addedDest = this.tempDests[dest] as ActiveMQTempDestination;
- }
- }
-
- return addedDest;
- }
-
- internal void RemoveTempDestination(ActiveMQTempDestination dest)
- {
- this.tempDests.Remove(dest);
- }
-
- internal bool IsTempDestinationActive(ActiveMQTempDestination dest)
- {
- if(this.advisoryConsumer == null)
- {
- return true;
- }
-
- return this.tempDests.Contains(dest);
- }
-
- protected void CheckClosedOrFailed()
- {
- CheckClosed();
- if(transportFailed.Value)
- {
- throw new ConnectionFailedException(firstFailureError.Message);
- }
- }
-
- protected void CheckClosed()
- {
- if(closed.Value)
- {
- throw new ConnectionClosedException();
- }
- }
- }
+ /// <summary>
+ /// Represents a connection with a message broker
+ /// </summary>
+ public class Connection : IConnection
+ {
+ private static readonly IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
+
+ // Uri configurable options.
+ private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
+ private bool asyncSend = false;
+ private bool alwaysSyncSend = false;
+ private bool asyncClose = true;
+ private bool useCompression = false;
+ private bool copyMessageOnSend = true;
+ private bool sendAcksAsync = false;
+ private bool dispatchAsync = true;
+ private int producerWindowSize = 0;
+ private bool messagePrioritySupported = true;
+ private bool watchTopicAdviosires = true;
+
+ private bool userSpecifiedClientID;
+ private readonly Uri brokerUri;
+ private ITransport transport;
+ private readonly ConnectionInfo info;
+ private TimeSpan requestTimeout; // from connection factory
+ private BrokerInfo brokerInfo; // from broker
+ private readonly CountDownLatch brokerInfoReceived = new CountDownLatch(1);
+ private WireFormatInfo brokerWireFormatInfo; // from broker
+ private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
+ private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
+ private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable());
+ private readonly IDictionary tempDests = Hashtable.Synchronized(new Hashtable());
+ private readonly object myLock = new object();
+ private readonly Atomic<bool> connected = new Atomic<bool>(false);
+ private readonly Atomic<bool> closed = new Atomic<bool>(false);
+ private readonly Atomic<bool> closing = new Atomic<bool>(false);
+ private readonly Atomic<bool> transportFailed = new Atomic<bool>(false);
+ private Exception firstFailureError = null;
+ private int sessionCounter = 0;
+ private int temporaryDestinationCounter = 0;
+ private int localTransactionCounter;
+ private readonly Atomic<bool> started = new Atomic<bool>(false);
+ private ConnectionMetaData metaData = null;
+ private bool disposed = false;
+ private IRedeliveryPolicy redeliveryPolicy;
+ private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
+ private ICompressionPolicy compressionPolicy = new CompressionPolicy();
+ private readonly IdGenerator clientIdGenerator;
+ private int consumerIdCounter = 0;
+ private volatile CountDownLatch transportInterruptionProcessingComplete;
+ private readonly MessageTransformation messageTransformation;
+ private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor();
+ private AdvisoryConsumer advisoryConsumer = null;
+
+ public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
+ {
+ this.brokerUri = connectionUri;
+ this.clientIdGenerator = clientIdGenerator;
+
+ this.transport = transport;
+ this.transport.Command = new CommandHandler(OnCommand);
+ this.transport.Exception = new ExceptionHandler(OnTransportException);
+ this.transport.Interrupted = new InterruptedHandler(OnTransportInterrupted);
+ this.transport.Resumed = new ResumedHandler(OnTransportResumed);
+
+ ConnectionId id = new ConnectionId();
+ id.Value = CONNECTION_ID_GENERATOR.GenerateId();
+
+ this.info = new ConnectionInfo();
+ this.info.ConnectionId = id;
+ this.info.FaultTolerant = transport.IsFaultTolerant;
+
+ this.messageTransformation = new ActiveMQMessageTransformation(this);
+ }
+
+ ~Connection()
+ {
+ Dispose(false);
+ }
+
+ /// <summary>
+ /// A delegate that can receive transport level exceptions.
+ /// </summary>
+ public event ExceptionListener ExceptionListener;
+
+ /// <summary>
+ /// An asynchronous listener that is notified when a Fault tolerant connection
+ /// has been interrupted.
+ /// </summary>
+ public event ConnectionInterruptedListener ConnectionInterruptedListener;
+
+ /// <summary>
+ /// An asynchronous listener that is notified when a Fault tolerant connection
+ /// has been resumed.
+ /// </summary>
+ public event ConnectionResumedListener ConnectionResumedListener;
+
+ private ConsumerTransformerDelegate consumerTransformer;
+ public ConsumerTransformerDelegate ConsumerTransformer
+ {
+ get { return this.consumerTransformer; }
+ set { this.consumerTransformer = value; }
+ }
+
+ private ProducerTransformerDelegate producerTransformer;
+ public ProducerTransformerDelegate ProducerTransformer
+ {
+ get { return this.producerTransformer; }
+ set { this.producerTransformer = value; }
+ }
+
+ #region Properties
+
+ public String UserName
+ {
+ get { return this.info.UserName; }
+ set { this.info.UserName = value; }
+ }
+
+ public String Password
+ {
+ get { return this.info.Password; }
+ set { this.info.Password = value; }
+ }
+
+ /// <summary>
+ /// This property indicates what version of the Protocol we are using to
+ /// communicate with the Broker, if not set we return the lowest version
+ /// number to indicate we support only the basic command set.
+ /// </summary>
+ public int ProtocolVersion
+ {
+ get
+ {
+ if(brokerWireFormatInfo != null)
+ {
+ return brokerWireFormatInfo.Version;
+ }
+
+ return 1;
+ }
+ }
+
+ /// <summary>
+ /// This property indicates whether or not async send is enabled.
+ /// </summary>
+ public bool AsyncSend
+ {
+ get { return asyncSend; }
+ set { asyncSend = value; }
+ }
+
+ /// <summary>
+ /// This property indicates whether or not async close is enabled.
+ /// When the connection is closed, it will either send a synchronous
+ /// DisposeOf command to the broker and wait for confirmation (if true),
+ /// or it will send the DisposeOf command asynchronously.
+ /// </summary>
+ public bool AsyncClose
+ {
+ get { return asyncClose; }
+ set { asyncClose = value; }
+ }
+
+ /// <summary>
+ /// This property indicates whether or not async sends are used for
+ /// message acknowledgement messages. Sending Acks async can improve
+ /// performance but may decrease reliability.
+ /// </summary>
+ public bool SendAcksAsync
+ {
+ get { return sendAcksAsync; }
+ set { sendAcksAsync = value; }
+ }
+
+ /// <summary>
+ /// This property sets the acknowledgment mode for the connection.
+ /// The URI parameter connection.ackmode can be set to a string value
+ /// that maps to the enumeration value.
+ /// </summary>
+ public string AckMode
+ {
+ set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); }
+ }
+
+ /// <summary>
+ /// This property is the maximum number of bytes in memory that a producer will transmit
+ /// to a broker before waiting for acknowledgement messages from the broker that it has
+ /// accepted the previously sent messages. In other words, this how you configure the
+ /// producer flow control window that is used for async sends where the client is responsible
+ /// for managing memory usage. The default value of 0 means no flow control at the client
+ /// </summary>
+ public int ProducerWindowSize
+ {
+ get { return producerWindowSize; }
+ set { producerWindowSize = value; }
+ }
+
+ /// <summary>
+ /// This property forces all messages that are sent to be sent synchronously overriding
+ /// any usage of the AsyncSend flag. This can reduce performance in some cases since the
+ /// only messages we normally send synchronously are Persistent messages not sent in a
+ /// transaction. This options guarantees that no send will return until the broker has
+ /// acknowledge receipt of the message
+ /// </summary>
+ public bool AlwaysSyncSend
+ {
+ get { return alwaysSyncSend; }
+ set { alwaysSyncSend = value; }
+ }
+
+ /// <summary>
+ /// This property indicates whether Message's should be copied before being sent via
+ /// one of the Connection's send methods. Copying the Message object allows the user
+ /// to resuse the Object over for another send. If the message isn't copied performance
+ /// can improve but the user must not reuse the Object as it may not have been sent
+ /// before they reset its payload.
+ /// </summary>
+ public bool CopyMessageOnSend
+ {
+ get { return copyMessageOnSend; }
+ set { copyMessageOnSend = value; }
+ }
+
+ /// <summary>
+ /// Enable or Disable the use of Compression on Message bodies. When enabled all
+ /// messages have their body compressed using the Deflate compression algorithm.
+ /// The recipient of the message must support the use of message compression as well
+ /// otherwise the receiving client will receive a message whose body appears in the
+ /// compressed form.
+ /// </summary>
+ public bool UseCompression
+ {
+ get { return this.useCompression; }
+ set { this.useCompression = value; }
+ }
+
+ /// <summary>
+ /// Indicate whether or not the resources of this Connection should support the
+ /// Message Priority value of incoming messages and dispatch them accordingly.
+ /// When disabled Message are always dispatched to Consumers in FIFO order.
+ /// </summary>
+ public bool MessagePrioritySupported
+ {
+ get { return this.messagePrioritySupported; }
+ set { this.messagePrioritySupported = value; }
+ }
+
+ public IConnectionMetaData MetaData
+ {
+ get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); }
+ }
+
+ public Uri BrokerUri
+ {
+ get { return brokerUri; }
+ }
+
+ public ITransport ITransport
+ {
+ get { return transport; }
+ set { this.transport = value; }
+ }
+
+ public bool TransportFailed
+ {
+ get { return this.transportFailed.Value; }
+ }
+
+ public Exception FirstFailureError
+ {
+ get { return this.firstFailureError; }
+ }
+
+ public TimeSpan RequestTimeout
+ {
+ get { return this.requestTimeout; }
+ set { this.requestTimeout = value; }
+ }
+
+ public AcknowledgementMode AcknowledgementMode
+ {
+ get { return acknowledgementMode; }
+ set { this.acknowledgementMode = value; }
+ }
+
+ /// <summary>
+ /// synchronously or asynchronously by the broker.
+ /// </summary>
+ public bool DispatchAsync
+ {
+ get { return this.dispatchAsync; }
+ set { this.dispatchAsync = value; }
+ }
+
+ public bool WatchTopicAdvisories
+ {
+ get { return this.watchTopicAdviosires; }
+ set { this.watchTopicAdviosires = value; }
+ }
+
+ public string ClientId
+ {
+ get { return info.ClientId; }
+ set
+ {
+ if(this.connected.Value)
+ {
+ throw new NMSException("You cannot change the ClientId once the Connection is connected");
+ }
+
+ this.info.ClientId = value;
+ this.userSpecifiedClientID = true;
+ CheckConnected();
+ }
+ }
+
+ /// <summary>
+ /// The Default Client Id used if the ClientId property is not set explicity.
+ /// </summary>
+ public string DefaultClientId
+ {
+ set
+ {
+ this.info.ClientId = value;
+ this.userSpecifiedClientID = true;
+ }
+ }
+
+ public ConnectionId ConnectionId
+ {
+ get { return info.ConnectionId; }
+ }
+
+ public BrokerInfo BrokerInfo
+ {
+ get { return brokerInfo; }
+ }
+
+ public WireFormatInfo BrokerWireFormat
+ {
+ get { return brokerWireFormatInfo; }
+ }
+
+ public String ResourceManagerId
+ {
+ get
+ {
+ this.brokerInfoReceived.await();
+ return brokerInfo.BrokerId.Value;
+ }
+ }
+
+ /// <summary>
+ /// Get/or set the redelivery policy for this connection.
+ /// </summary>
+ public IRedeliveryPolicy RedeliveryPolicy
+ {
+ get { return this.redeliveryPolicy; }
+ set { this.redeliveryPolicy = value; }
+ }
+
+ public PrefetchPolicy PrefetchPolicy
+ {
+ get { return this.prefetchPolicy; }
+ set { this.prefetchPolicy = value; }
+ }
+
+ public ICompressionPolicy CompressionPolicy
+ {
+ get { return this.compressionPolicy; }
+ set { this.compressionPolicy = value; }
+ }
+
+ internal MessageTransformation MessageTransformation
+ {
+ get { return this.messageTransformation; }
+ }
+
+ #endregion
+
+ /// <summary>
+ /// Starts asynchronous message delivery of incoming messages for this connection.
+ /// Synchronous delivery is unaffected.
+ /// </summary>
+ public void Start()
+ {
+ CheckConnected();
+ if(started.CompareAndSet(false, true))
+ {
+ lock(sessions.SyncRoot)
+ {
+ foreach(Session session in sessions)
+ {
+ session.Start();
+ }
+ }
+ }
+ }
+
+ /// <summary>
+ /// This property determines if the asynchronous message delivery of incoming
+ /// messages has been started for this connection.
+ /// </summary>
+ public bool IsStarted
+ {
+ get { return started.Value; }
+ }
+
+ /// <summary>
+ /// Temporarily stop asynchronous delivery of inbound messages for this connection.
+ /// The sending of outbound messages is unaffected.
+ /// </summary>
+ public void Stop()
+ {
+ if(started.CompareAndSet(true, false))
+ {
+ lock(sessions.SyncRoot)
+ {
+ foreach(Session session in sessions)
+ {
+ session.Stop();
+ }
+ }
+ }
+ }
+
+ /// <summary>
+ /// Creates a new session to work on this connection
+ /// </summary>
+ public ISession CreateSession()
+ {
+ return CreateAtiveMQSession(acknowledgementMode);
+ }
+
+ /// <summary>
+ /// Creates a new session to work on this connection
+ /// </summary>
+ public ISession CreateSession(AcknowledgementMode sessionAcknowledgementMode)
+ {
+ return CreateAtiveMQSession(sessionAcknowledgementMode);
+ }
+
+ protected virtual Session CreateAtiveMQSession(AcknowledgementMode ackMode)
+ {
+ CheckConnected();
+ return new Session(this, NextSessionId, ackMode);
+ }
+
+ internal void AddSession(Session session)
+ {
+ if(!this.closing.Value)
+ {
+ sessions.Add(session);
+ }
+ }
+
+ internal void RemoveSession(Session session)
+ {
+ if(!this.closing.Value)
+ {
+ sessions.Remove(session);
+ }
+ }
+
+ internal void addDispatcher(ConsumerId id, IDispatcher dispatcher)
+ {
+ if(!this.closing.Value)
+ {
+ this.dispatchers.Add(id, dispatcher);
+ }
+ }
+
+ internal void removeDispatcher(ConsumerId id)
+ {
+ if(!this.closing.Value)
+ {
+ this.dispatchers.Remove(id);
+ }
+ }
+
+ internal void addProducer(ProducerId id, MessageProducer producer)
+ {
+ if(!this.closing.Value)
+ {
+ this.producers.Add(id, producer);
+ }
+ }
+
+ internal void removeProducer(ProducerId id)
+ {
+ if(!this.closing.Value)
+ {
+ this.producers.Remove(id);
+ }
+ }
+
+ public void Close()
+ {
+ if(!this.closed.Value && !transportFailed.Value)
+ {
+ this.Stop();
+ }
+
+ lock(myLock)
+ {
+ if(this.closed.Value)
+ {
+ return;
+ }
+
+ try
+ {
+ Tracer.InfoFormat("Connection[{0}]: Closing Connection Now.", this.ConnectionId);
+ this.closing.Value = true;
+
+ if(this.advisoryConsumer != null)
+ {
+ this.advisoryConsumer.Dispose();
+ this.advisoryConsumer = null;
+ }
+
+ lock(sessions.SyncRoot)
+ {
+ foreach(Session session in sessions)
+ {
+ session.Shutdown();
+ }
+ }
+ sessions.Clear();
+
+ if(this.tempDests.Count > 0)
+ {
+ // Make a copy of the destinations to delete, because the act of deleting
+ // them will modify the collection.
+ ActiveMQTempDestination[] tempDestsToDelete = new ActiveMQTempDestination[this.tempDests.Count];
+
+ this.tempDests.Values.CopyTo(tempDestsToDelete, 0);
+ foreach(ActiveMQTempDestination dest in tempDestsToDelete)
+ {
+ dest.Delete();
+ }
+ }
+
+ // Connected is true only when we've successfully sent our ConnectionInfo
+ // to the broker, so if we haven't announced ourselves there's no need to
+ // inform the broker of a remove, and if the transport is failed, why bother.
+ if(connected.Value && !transportFailed.Value)
+ {
+ DisposeOf(ConnectionId);
+ ShutdownInfo shutdowninfo = new ShutdownInfo();
+ transport.Oneway(shutdowninfo);
+ }
+
+ executor.Shutdown();
+
+ Tracer.DebugFormat("Connection[{0}]: Disposing of the Transport.", this.ConnectionId);
+ transport.Dispose();
+ }
+ catch(Exception ex)
+ {
+ Tracer.ErrorFormat("Connection[{0}]: Error during connection close: {1}", ConnectionId, ex);
+ }
+ finally
+ {
+ if(executor != null)
+ {
+ executor.Shutdown();
+ }
+
+ this.transport = null;
+ this.closed.Value = true;
+ this.connected.Value = false;
+ this.closing.Value = false;
+ }
+ }
+ }
+
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ protected void Dispose(bool disposing)
+ {
+ if(disposed)
+ {
+ return;
+ }
+
+ if(disposing)
+ {
+ // Dispose managed code here.
+ }
+
+ try
+ {
+ Close();
+ }
+ catch
+ {
+ // Ignore network errors.
+ }
+
+ disposed = true;
+ }
+
+ // Implementation methods
+
+ /// <summary>
+ /// Performs a synchronous request-response with the broker
+ /// </summary>
+ ///
+ public Response SyncRequest(Command command)
+ {
+ return SyncRequest(command, this.RequestTimeout);
+ }
+
+ /// <summary>
+ /// Performs a synchronous request-response with the broker for requested timeout duration.
+ /// </summary>
+ /// <param name="command"></param>
+ /// <param name="requestTimeout"></param>
+ /// <returns></returns>
+ public Response SyncRequest(Command command, TimeSpan requestTimeout)
+ {
+ CheckConnected();
+
+ try
+ {
+ Response response = transport.Request(command, requestTimeout);
+ if(response is ExceptionResponse)
+ {
+ ExceptionResponse exceptionResponse = (ExceptionResponse)response;
+ BrokerError brokerError = exceptionResponse.Exception;
+ throw new BrokerException(brokerError);
+ }
+ return response;
+ }
+ catch(Exception ex)
+ {
+ throw NMSExceptionSupport.Create(ex);
+ }
+ }
+
+ public void Oneway(Command command)
+ {
+ CheckConnected();
+
+ try
+ {
+ transport.Oneway(command);
+ }
+ catch(Exception ex)
+ {
+ throw NMSExceptionSupport.Create(ex);
+ }
+ }
+
+ private void DisposeOf(DataStructure objectId)
+ {
+ try
+ {
+ RemoveInfo command = new RemoveInfo();
+ command.ObjectId = objectId;
+ if(asyncClose)
+ {
+ Tracer.DebugFormat("Connection[{0}]: Asynchronously disposing of Connection.", this.ConnectionId);
+ if(connected.Value)
+ {
+ transport.Oneway(command);
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Connection[{0}]: Oneway command sent to broker: {1}",
+ this.ConnectionId, command);
+ }
+ }
+ }
+ else
+ {
+ // Ensure that the object is disposed to avoid potential race-conditions
+ // of trying to re-create the same object in the broker faster than
+ // the broker can dispose of the object. Allow up to 5 seconds to process.
+ Tracer.DebugFormat("Connection[{0}]: Synchronously disposing of Connection.", this.ConnectionId);
+ SyncRequest(command, TimeSpan.FromSeconds(5));
+ Tracer.DebugFormat("Connection[{0}]: Synchronously closed of Connection.", this.ConnectionId);
+ }
+ }
+ catch // (BrokerException)
+ {
+ // Ignore exceptions while shutting down.
+ }
+ }
+
+ private object checkConnectedLock = new object();
+
+ /// <summary>
+ /// Check and ensure that the connection objcet is connected. If it is not
+ /// connected or is closed, a ConnectionClosedException is thrown.
+ /// </summary>
+ internal void CheckConnected()
+ {
+ if(closed.Value)
+ {
+ throw new ConnectionClosedException();
+ }
+
+ if(!connected.Value)
+ {
+ DateTime timeoutTime = DateTime.Now + this.RequestTimeout;
+ int waitCount = 1;
+
+ while(true)
+ {
+ if(Monitor.TryEnter(checkConnectedLock))
+ {
+ try
+ {
+ if(!connected.Value)
+ {
+ if(!this.userSpecifiedClientID)
+ {
+ this.info.ClientId = this.clientIdGenerator.GenerateId();
+ }
+
+ try
+ {
+ if(null != transport)
+ {
+ // Send the connection and see if an ack/nak is returned.
+ Response response = transport.Request(this.info, this.RequestTimeout);
+ if(!(response is ExceptionResponse))
+ {
+ connected.Value = true;
+ if(this.watchTopicAdviosires)
+ {
+ ConsumerId id = new ConsumerId(
+ new SessionId(info.ConnectionId, -1),
+ Interlocked.Increment(ref this.consumerIdCounter));
+ this.advisoryConsumer = new AdvisoryConsumer(this, id);
+ }
+ }
+ }
+ }
+ catch
+ {
+ }
+ }
+ }
+ finally
+ {
+ Monitor.Exit(checkConnectedLock);
+ }
+ }
+
+ if(connected.Value || DateTime.Now > timeoutTime)
+ {
+ break;
+ }
+
+ // Back off from being overly aggressive. Having too many threads
+ // aggressively trying to connect to a down broker pegs the CPU.
+ Thread.Sleep(5 * (waitCount++));
+ }
+
+ if(!connected.Value)
+ {
+ throw new ConnectionClosedException();
+ }
+ }
+ }
+
+ /// <summary>
+ /// Handle incoming commands
+ /// </summary>
+ /// <param name="commandTransport">An ITransport</param>
+ /// <param name="command">A Command</param>
+ protected void OnCommand(ITransport commandTransport, Command command)
+ {
+ if(command.IsMessageDispatch)
+ {
+ WaitForTransportInterruptionProcessingToComplete();
+ DispatchMessage((MessageDispatch)command);
+ }
+ else if(command.IsKeepAliveInfo)
+ {
+ OnKeepAliveCommand(commandTransport, (KeepAliveInfo)command);
+ }
+ else if(command.IsWireFormatInfo)
+ {
+ this.brokerWireFormatInfo = (WireFormatInfo)command;
+ }
+ else if(command.IsBrokerInfo)
+ {
+ this.brokerInfo = (BrokerInfo)command;
+ this.brokerInfoReceived.countDown();
+ }
+ else if(command.IsShutdownInfo)
+ {
+ // Only terminate the connection if the transport we use is not fault
+ // tolerant otherwise we let the transport deal with the broker closing
+ // our connection and deal with IOException if it is sent to use.
+ if(!closing.Value && !closed.Value && this.transport != null && !this.transport.IsFaultTolerant)
+ {
+ OnException(new NMSException("Broker closed this connection via Shutdown command."));
+ }
+ }
+ else if(command.IsProducerAck)
+ {
+ ProducerAck ack = (ProducerAck)command as ProducerAck;
+ if(ack.ProducerId != null)
+ {
+ MessageProducer producer = producers[ack.ProducerId] as MessageProducer;
+ if(producer != null)
+ {
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Connection[{0}]: Received a new ProducerAck -> ",
+ this.ConnectionId, ack);
+ }
+
+ producer.OnProducerAck(ack);
+ }
+ }
+ }
+ else if(command.IsConnectionError)
+ {
+ if(!closing.Value && !closed.Value)
+ {
+ ConnectionError connectionError = (ConnectionError)command;
+ BrokerError brokerError = connectionError.Exception;
+ string message = "Broker connection error.";
+ string cause = "";
+
+ if(null != brokerError)
+ {
+ message = brokerError.Message;
+ if(null != brokerError.Cause)
+ {
+ cause = brokerError.Cause.Message;
+ }
+ }
+
+ Tracer.ErrorFormat("Connection[{0}]: ConnectionError: " + message + " : " + cause, this.ConnectionId);
+ OnAsyncException(new NMSConnectionException(message, cause));
+ }
+ }
+ else
+ {
+ Tracer.ErrorFormat("Connection[{0}]: Unknown command: " + command, this.ConnectionId);
+ }
+ }
+
+ protected void DispatchMessage(MessageDispatch dispatch)
+ {
+ lock(dispatchers.SyncRoot)
+ {
+ if(dispatchers.Contains(dispatch.ConsumerId))
+ {
+ IDispatcher dispatcher = (IDispatcher)dispatchers[dispatch.ConsumerId];
+
+ // Can be null when a consumer has sent a MessagePull and there was
+ // no available message at the broker to dispatch or when signalled
+ // that the end of a Queue browse has been reached.
+ if(dispatch.Message != null)
+ {
+ dispatch.Message.ReadOnlyBody = true;
+ dispatch.Message.ReadOnlyProperties = true;
+ dispatch.Message.RedeliveryCounter = dispatch.RedeliveryCounter;
+ }
+
+ dispatcher.Dispatch(dispatch);
+
+ return;
+ }
+ }
+
+ Tracer.ErrorFormat("Connection[{0}]: No such consumer active: " + dispatch.ConsumerId, this.ConnectionId);
+ }
+
+ protected void OnKeepAliveCommand(ITransport commandTransport, KeepAliveInfo info)
+ {
+ try
+ {
+ if (connected.Value)
+ {
+ info.ResponseRequired = false;
+ transport.Oneway(info);
+ }
+ }
+ catch(Exception ex)
+ {
+ if(!closing.Value && !closed.Value)
+ {
+ OnException(ex);
+ }
+ }
+ }
+
+ internal void OnAsyncException(Exception error)
+ {
+ if(!this.closed.Value && !this.closing.Value)
+ {
+ if(this.ExceptionListener != null)
+ {
+ if(!(error is NMSException))
+ {
+ error = NMSExceptionSupport.Create(error);
+ }
+ NMSException e = (NMSException)error;
+
+ // Called in another thread so that processing can continue
+ // here, ensures no lock contention.
+ executor.QueueUserWorkItem(AsyncCallExceptionListener, e);
+ }
+ else
+ {
+ Tracer.DebugFormat("Connection[{0}]: Async exception with no exception listener: " + error, this.ConnectionId);
+ }
+ }
+ }
+
+ private void AsyncCallExceptionListener(object error)
+ {
+ NMSException exception = error as NMSException;
+ this.ExceptionListener(exception);
+ }
+
+ internal void OnTransportException(ITransport source, Exception cause)
+ {
+ this.OnException(cause);
+ }
+
+ internal void OnException(Exception error)
+ {
+ // Will fire an exception listener callback if there's any set.
+ OnAsyncException(error);
+
+ if(!this.closing.Value && !this.closed.Value)
+ {
+ // Perform the actual work in another thread to avoid lock contention
+ // and allow the caller to continue on in its error cleanup.
+ executor.QueueUserWorkItem(AsyncOnExceptionHandler, error);
+ }
+ }
+
+ private void AsyncOnExceptionHandler(object error)
+ {
+ Exception cause = error as Exception;
+
+ MarkTransportFailed(cause);
+
+ try
+ {
+ this.transport.Dispose();
+ }
+ catch(Exception ex)
+ {
+ Tracer.DebugFormat("Connection[{0}]: Caught Exception While disposing of Transport: " + ex, this.ConnectionId);
+ }
+
+ this.brokerInfoReceived.countDown();
+
+ IList sessionsCopy = null;
+ lock(this.sessions.SyncRoot)
+ {
+ sessionsCopy = new ArrayList(this.sessions);
+ }
+
+ // Use a copy so we don't concurrently modify the Sessions list if the
+ // client is closing at the same time.
+ foreach(Session session in sessionsCopy)
+ {
+ try
+ {
+ session.Shutdown();
+ }
+ catch(Exception ex)
+ {
+ Tracer.DebugFormat("Connection[{0}]: Caught Exception While disposing of Sessions: " + ex, this.ConnectionId);
+ }
+ }
+ }
+
+ private void MarkTransportFailed(Exception error)
+ {
+ this.transportFailed.Value = true;
+ if(this.firstFailureError == null)
+ {
+ this.firstFailureError = error;
+ }
+ }
+
+ protected void OnTransportInterrupted(ITransport sender)
+ {
+ Tracer.Debug("Connection: Transport has been Interrupted.");
+
+ // Ensure that if there's an advisory consumer we don't add it to the
+ // set of consumers that need interruption processing.
+ this.transportInterruptionProcessingComplete =
+ new CountDownLatch(dispatchers.Count - (this.advisoryConsumer != null ? 1 : 0));
+
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Connection[{0}]: Transport interrupted, dispatchers: " + dispatchers.Count, this.ConnectionId);
+ }
+
+ SignalInterruptionProcessingNeeded();
+
+ foreach(Session session in this.sessions)
+ {
+ try
+ {
+ session.ClearMessagesInProgress();
+ }
+ catch(Exception ex)
+ {
+ Tracer.WarnFormat("Connection[{0}]: Exception while clearing messages: " + ex.Message, this.ConnectionId);
+ Tracer.Warn(ex.StackTrace);
+ }
+ }
+
+ if(this.ConnectionInterruptedListener != null && !this.closing.Value)
+ {
+ try
+ {
+ this.ConnectionInterruptedListener();
+ }
+ catch
+ {
+ }
+ }
+ }
+
+ protected void OnTransportResumed(ITransport sender)
+ {
+ Tracer.Debug("Transport has resumed normal operation.");
+
+ if(this.ConnectionResumedListener != null && !this.closing.Value)
+ {
+ try
+ {
+ this.ConnectionResumedListener();
+ }
+ catch
+ {
+ }
+ }
+ }
+
+ internal void OnSessionException(Session sender, Exception exception)
+ {
+ if(ExceptionListener != null)
+ {
+ try
+ {
+ ExceptionListener(exception);
+ }
+ catch
+ {
+ sender.Close();
+ }
+ }
+ }
+
+ /// <summary>
+ /// Creates a new local transaction ID
+ /// </summary>
+ public LocalTransactionId CreateLocalTransactionId()
+ {
+ LocalTransactionId id = new LocalTransactionId();
+ id.ConnectionId = ConnectionId;
+ id.Value = Interlocked.Increment(ref localTransactionCounter);
+ return id;
+ }
+
+ protected SessionId NextSessionId
+ {
+ get { return new SessionId(this.info.ConnectionId, Interlocked.Increment(ref this.sessionCounter)); }
+ }
+
+ public ActiveMQTempDestination CreateTemporaryDestination(bool topic)
+ {
+ ActiveMQTempDestination destination = null;
+
+ if(topic)
+ {
+ destination = new ActiveMQTempTopic(
+ info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter));
+ }
+ else
+ {
+ destination = new ActiveMQTempQueue(
+ info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter));
+ }
+
+ DestinationInfo command = new DestinationInfo();
+ command.ConnectionId = ConnectionId;
+ command.OperationType = DestinationInfo.ADD_OPERATION_TYPE; // 0 is add
+ command.Destination = destination;
+
+ this.SyncRequest(command);
+
+ destination = this.AddTempDestination(destination);
+ destination.Connection = this;
+
+ return destination;
+ }
+
+ protected void CreateTemporaryDestination(ActiveMQDestination tempDestination)
+ {
+ }
+
+ public void DeleteTemporaryDestination(IDestination destination)
+ {
+ CheckClosedOrFailed();
+
+ ActiveMQTempDestination temp = destination as ActiveMQTempDestination;
+
+ foreach(Session session in this.sessions)
+ {
+ if(session.IsInUse(temp))
+ {
+ throw new NMSException("A consumer is consuming from the temporary destination");
+ }
+ }
+
+ this.tempDests.Remove(destination as ActiveMQTempDestination);
+ this.DeleteDestination(destination);
+ }
+
+ public void DeleteDestination(IDestination destination)
+ {
+ DestinationInfo command = new DestinationInfo();
+ command.ConnectionId = this.ConnectionId;
+ command.OperationType = DestinationInfo.REMOVE_OPERATION_TYPE; // 1 is remove
+ command.Destination = (ActiveMQDestination)destination;
+
+ this.Oneway(command);
+ }
+
+ private void WaitForTransportInterruptionProcessingToComplete()
+ {
+ CountDownLatch cdl = this.transportInterruptionProcessingComplete;
+ if(cdl != null)
+ {
+ if(!closed.Value && cdl.Remaining > 0)
+ {
+ Tracer.WarnFormat("Connection[{0}]: Dispatch paused, waiting for outstanding dispatch interruption " +
+ "processing (" + cdl.Remaining + ") to complete..", this.ConnectionId);
+ cdl.await(TimeSpan.FromSeconds(10));
+ }
+
+ SignalInterruptionProcessingComplete();
+ }
+ }
+
+ internal void TransportInterruptionProcessingComplete()
+ {
+ CountDownLatch cdl = this.transportInterruptionProcessingComplete;
+ if(cdl != null)
+ {
+ cdl.countDown();
+ try
+ {
+ SignalInterruptionProcessingComplete();
+ }
+ catch
+ {
+ }
+ }
+ }
+
+ private void SignalInterruptionProcessingComplete()
+ {
+ CountDownLatch cdl = this.transportInterruptionProcessingComplete;
+ if(cdl.Remaining == 0)
+ {
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Connection[{0}]: transportInterruptionProcessingComplete.", this.info.ConnectionId);
+ }
+
+ this.transportInterruptionProcessingComplete = null;
+
+ FailoverTransport failoverTransport = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
+ if(failoverTransport != null)
+ {
+ failoverTransport.ConnectionInterruptProcessingComplete(this.info.ConnectionId);
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Connection[{0}]: notified failover transport (" + failoverTransport +
+ ") of interruption completion.", this.ConnectionId);
+ }
+ }
+ }
+ }
+
+ private void SignalInterruptionProcessingNeeded()
+ {
+ FailoverTransport failoverTransport = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
+
+ if(failoverTransport != null)
+ {
+ failoverTransport.StateTracker.TransportInterrupted(this.info.ConnectionId);
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Connection[{0}]: notified failover transport (" + failoverTransport +
+ ") of pending interruption processing.", this.ConnectionId);
+ }
+ }
+ }
+
+ internal ActiveMQTempDestination AddTempDestination(ActiveMQTempDestination dest)
+ {
+ ActiveMQTempDestination addedDest = dest;
+
+ // .NET lacks a putIfAbsent operation for Maps.
+ lock(tempDests.SyncRoot)
+ {
+ if(!this.tempDests.Contains(dest))
+ {
+ this.tempDests.Add(dest, dest);
+ }
+ else
+ {
+ addedDest = this.tempDests[dest] as ActiveMQTempDestination;
+ }
+ }
+
+ return addedDest;
+ }
+
+ internal void RemoveTempDestination(ActiveMQTempDestination dest)
+ {
+ this.tempDests.Remove(dest);
+ }
+
+ internal bool IsTempDestinationActive(ActiveMQTempDestination dest)
+ {
+ if(this.advisoryConsumer == null)
+ {
+ return true;
+ }
+
+ return this.tempDests.Contains(dest);
+ }
+
+ protected void CheckClosedOrFailed()
+ {
+ CheckClosed();
+ if(transportFailed.Value)
+ {
+ throw new ConnectionFailedException(firstFailureError.Message);
+ }
+ }
+
+ protected void CheckClosed()
+ {
+ if(closed.Value)
+ {
+ throw new ConnectionClosedException();
+ }
+ }
+ }
}