You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/07/09 23:10:13 UTC

svn commit: r962695 - in /activemq/activemq-dotnet: Apache.NMS.ActiveMQ/trunk/src/main/csharp/ Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/ Apache.NMS.ActiveMQ/trunk/src/main/csharp/...

Author: tabish
Date: Fri Jul  9 21:10:12 2010
New Revision: 962695

URL: http://svn.apache.org/viewvc?rev=962695&view=rev
Log:
fix for: https://issues.apache.org/activemq/browse/AMQNET-259
fix for: https://issues.apache.org/activemq/browse/AMQNET-260

Fixes both NMS.ActiveMQ and NMS.Stomp

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/FutureResponse.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransportFactory.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/FutureResponse.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransport.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFilter.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Fri Jul  9 21:10:12 2010
@@ -73,7 +73,6 @@ namespace Apache.NMS.ActiveMQ
         public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
         {
             this.brokerUri = connectionUri;
-            this.requestTimeout = transport.RequestTimeout;
             this.clientIdGenerator = clientIdGenerator;
 
             this.transport = transport;

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs Fri Jul  9 21:10:12 2010
@@ -49,6 +49,7 @@ namespace Apache.NMS.ActiveMQ
         private bool alwaysSyncSend;
         private bool sendAcksAsync=true;
         private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
+		private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
 
         private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
         private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
@@ -240,6 +241,12 @@ namespace Apache.NMS.ActiveMQ
             get { return this.dispatchAsync; }
             set { this.dispatchAsync = value; }
         }
+		
+		public int RequestTimeout
+		{
+			get { return (int)this.requestTimeout.TotalMilliseconds; }
+			set { this.requestTimeout = TimeSpan.FromMilliseconds(value); }
+		}
 
         public string AckMode
         {
@@ -330,6 +337,7 @@ namespace Apache.NMS.ActiveMQ
             connection.SendAcksAsync = this.SendAcksAsync;
             connection.AcknowledgementMode = this.acknowledgementMode;
             connection.UseCompression = this.useCompression;
+			connection.RequestTimeout = this.requestTimeout;
             connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
             connection.PrefetchPolicy = this.prefetchPolicy.Clone() as PrefetchPolicy;
             connection.CompressionPolicy = this.compressionPolicy.Clone() as ICompressionPolicy;

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs Fri Jul  9 21:10:12 2010
@@ -73,7 +73,6 @@ namespace Apache.NMS.ActiveMQ.Transport.
         private List<BackupTransport> backups = new List<BackupTransport>();
         private int backupPoolSize = 1;
         private bool trackMessages = false;
-        private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
         private int maxCacheSize = 256;
         private volatile Exception failure;
         private readonly object mutex = new object();
@@ -187,12 +186,6 @@ namespace Apache.NMS.ActiveMQ.Transport.
             set { this.timeout = value; }
         }
 
-        public TimeSpan RequestTimeout
-        {
-            get { return requestTimeout; }
-            set { requestTimeout = value; }
-        }
-        
         public int InitialReconnectDelay
         {
             get { return initialReconnectDelay; }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/FutureResponse.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/FutureResponse.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/FutureResponse.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/FutureResponse.cs Fri Jul  9 21:10:12 2010
@@ -27,7 +27,7 @@ namespace Apache.NMS.ActiveMQ.Transport
 	/// </summary>
 	public class FutureResponse
 	{
-		private static TimeSpan maxWait = TimeSpan.FromMilliseconds(Timeout.Infinite);
+		private TimeSpan maxWait = TimeSpan.FromMilliseconds(Timeout.Infinite);
 		public TimeSpan ResponseTimeout
 		{
 			get { return maxWait; }
@@ -37,39 +37,26 @@ namespace Apache.NMS.ActiveMQ.Transport
 		private readonly CountDownLatch latch = new CountDownLatch(1);
 		private Response response;
 
-		public WaitHandle AsyncWaitHandle
-		{
-			get { return latch.AsyncWaitHandle; }
-		}
-
 		public Response Response
 		{
 			// Blocks the caller until a value has been set
 			get
 			{
-				bool waitForResponse = false;
-
 				lock(latch)
 				{
-					if(null == response)
+					if(null != response)
 					{
-						waitForResponse = true;
+						return response;
 					}
 				}
 
-				if(waitForResponse)
+				try
 				{
-					try
-					{
-						if(!latch.await(maxWait))
-						{
-							// TODO: Throw timeout exception?
-						}
-					}
-					catch (Exception e)
-					{
-						Tracer.Error("Caught while waiting on monitor: " + e);
-					}
+					latch.await(maxWait);
+				}
+				catch (Exception e)
+				{
+					Tracer.Error("Caught while waiting on monitor: " + e);
 				}
 
 				lock(latch)

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs Fri Jul  9 21:10:12 2010
@@ -97,16 +97,6 @@ namespace Apache.NMS.ActiveMQ.Transport
         /// </returns>
         Object Narrow(Type type);            
         
-        /// <value>
-        /// The time that the Transport waits before considering a request to have
-        /// failed and throwing an exception.
-        /// </value>
-        TimeSpan RequestTimeout
-        {
-            get;
-            set;
-        }
-
 		CommandHandler Command
 		{
 			get;

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs Fri Jul  9 21:10:12 2010
@@ -52,7 +52,6 @@ namespace Apache.NMS.ActiveMQ.Transport.
         private ResumedHandler resumedHandler;
         private bool disposed = false;
         private bool started = false;
-        private TimeSpan requestTimeout = TimeSpan.FromMilliseconds(Timeout.Infinite);
         private TaskRunner asyncResponseTask;
         private Queue<Command> receiveQueue = new Queue<Command>();
         private IResponseBuilder responseBuilder = new OpenWireResponseBuilder();
@@ -263,12 +262,6 @@ namespace Apache.NMS.ActiveMQ.Transport.
         }
         
 		#region Property Accessors
-		
-        public TimeSpan RequestTimeout
-        {
-            get{ return requestTimeout; }
-            set{ this.requestTimeout = value; }
-        }
 
         public CommandHandler Command
         {

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransportFactory.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransportFactory.cs Fri Jul  9 21:10:12 2010
@@ -32,13 +32,6 @@ namespace Apache.NMS.ActiveMQ.Transport.
 
 		#region Properties
 
-		private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
-		public int RequestTimeout
-		{
-			get { return (int) requestTimeout.TotalMilliseconds; }
-			set { requestTimeout = TimeSpan.FromMilliseconds(value); }
-		}
-
 		private bool useLogging = false;
 		public bool UseLogging
 		{
@@ -89,7 +82,6 @@ namespace Apache.NMS.ActiveMQ.Transport.
 
 			transport = new MutexTransport(transport);
 			transport = new ResponseCorrelator(transport);
-			transport.RequestTimeout = this.requestTimeout;
 
 			return transport;
 		}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ResponseCorrelator.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ResponseCorrelator.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ResponseCorrelator.cs Fri Jul  9 21:10:12 2010
@@ -29,6 +29,7 @@ namespace Apache.NMS.ActiveMQ.Transport
     {
         private readonly IDictionary requestMap = Hashtable.Synchronized(new Hashtable());
         private int nextCommandId;
+		private Exception error;
 
         public ResponseCorrelator(ITransport next) : base(next)
         {
@@ -36,20 +37,8 @@ namespace Apache.NMS.ActiveMQ.Transport
 
         protected override void OnException(ITransport sender, Exception command)
         {
+			Dispose(command);
             base.OnException(sender, command);
-
-            foreach(DictionaryEntry entry in requestMap)
-            {
-                FutureResponse value = (FutureResponse) entry.Value;
-                ExceptionResponse response = new ExceptionResponse();
-                BrokerError error = new BrokerError();
-
-                error.Message = command.Message;
-                response.Exception = error;
-                value.Response = response;
-            }
-
-            requestMap.Clear();
         }
 
         internal int GetNextCommandId()
@@ -59,11 +48,9 @@ namespace Apache.NMS.ActiveMQ.Transport
 
         public override void Oneway(Command command)
         {
-            if(0 == command.CommandId)
-            {
-                command.CommandId = GetNextCommandId();
-            }
-
+            command.CommandId = GetNextCommandId();
+			command.ResponseRequired = false;
+			
             next.Oneway(command);
         }
 
@@ -74,9 +61,29 @@ namespace Apache.NMS.ActiveMQ.Transport
             command.CommandId = commandId;
             command.ResponseRequired = true;
             FutureResponse future = new FutureResponse();
-            requestMap[commandId] = future;
+	        Exception priorError = null;
+	        lock(requestMap.SyncRoot) 
+			{
+	            priorError = this.error;
+	            if(priorError == null) 
+				{
+		            requestMap[commandId] = future;
+	            }
+	        }
+	
+	        if(priorError != null) 
+			{
+				BrokerError brError = new BrokerError();
+				brError.Message = priorError.Message;
+				ExceptionResponse response = new ExceptionResponse();
+				response.Exception = brError;
+	            future.Response = response;
+	            throw priorError;
+	        }
+			
             next.Oneway(command);
-            return future;
+
+			return future;
         }
 
         public override Response Request(Command command, TimeSpan timeout)
@@ -87,10 +94,10 @@ namespace Apache.NMS.ActiveMQ.Transport
 
             if(response != null && response is ExceptionResponse)
             {
-                ExceptionResponse er = (ExceptionResponse) response;
+                ExceptionResponse er = response as ExceptionResponse;
                 BrokerError brokerError = er.Exception;
 
-                if (brokerError == null)
+                if(brokerError == null)
                 {
                     throw new BrokerException();
                 }
@@ -118,7 +125,7 @@ namespace Apache.NMS.ActiveMQ.Transport
 
                     if(response is ExceptionResponse)
                     {
-                        ExceptionResponse er = (ExceptionResponse) response;
+                        ExceptionResponse er = response as ExceptionResponse;
                         BrokerError brokerError = er.Exception;
                         BrokerException exception = new BrokerException(brokerError);
                         this.exceptionHandler(this, exception);
@@ -132,16 +139,45 @@ namespace Apache.NMS.ActiveMQ.Transport
                     }
                 }
             }
-            else if(command is ShutdownInfo)
-            {
-                // lets shutdown
-                this.commandHandler(sender, command);
-            }
             else
             {
                 this.commandHandler(sender, command);
             }
         }
+		
+		public override void Stop()
+		{
+			this.Dispose(new IOException("Stopped"));
+			base.Stop();
+		}
+		
+		private void Dispose(Exception error)
+		{
+			ArrayList requests = null;
+			
+	        lock(requestMap.SyncRoot) 
+			{
+	            if(this.error == null) 
+				{
+	                this.error = error;
+	                requests = new ArrayList(requestMap.Values);
+	                requestMap.Clear();
+	            }
+	        }
+			
+	        if(requests != null)
+			{
+				foreach(FutureResponse future in requests)
+				{
+					BrokerError brError = new BrokerError();
+					brError.Message = error.Message;
+					ExceptionResponse response = new ExceptionResponse();
+					response.Exception = brError;
+		            future.Response = response;
+				}
+	        }
+		}
+		
     }
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs Fri Jul  9 21:10:12 2010
@@ -39,7 +39,6 @@ namespace Apache.NMS.ActiveMQ.Transport.
 		private bool disposed = false;
 		private Atomic<bool> closed = new Atomic<bool>(false);
 		private volatile bool seenShutdown;
-		private TimeSpan maxWait = TimeSpan.FromMilliseconds(Timeout.Infinite);
 		private Uri connectedUri;
 
 		private CommandHandler commandHandler;
@@ -139,15 +138,6 @@ namespace Apache.NMS.ActiveMQ.Transport.
 			throw new NotImplementedException("Use a ResponseCorrelator if you want to issue AsyncRequest calls");
 		}
 
-		/// <summary>
-		/// Property RequestTimeout
-		/// </summary>
-		public TimeSpan RequestTimeout
-		{
-			get { return this.maxWait; }
-			set { this.maxWait = value; }
-		}
-
 		public bool TcpNoDelayEnabled
 		{
 #if !NETCF
@@ -228,29 +218,14 @@ namespace Apache.NMS.ActiveMQ.Transport.
 
 					if(null != readThread)
 					{
-						if(Thread.CurrentThread != readThread
-#if !NETCF
- && readThread.IsAlive
-#endif
-)
+						if(Thread.CurrentThread != readThread && readThread.IsAlive)
 						{
-							TimeSpan waitTime;
-
-							if(maxWait < MAX_THREAD_WAIT)
-							{
-								waitTime = maxWait;
-							}
-							else
-							{
-								waitTime = MAX_THREAD_WAIT;
-							}
-
-							if(!readThread.Join((int) waitTime.TotalMilliseconds))
+							if(!readThread.Join((int) MAX_THREAD_WAIT.TotalMilliseconds))
 							{
 								readThread.Abort();
 							}
 						}
-
+						
 						readThread = null;
 					}
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs Fri Jul  9 21:10:12 2010
@@ -92,13 +92,6 @@ namespace Apache.NMS.ActiveMQ.Transport.
             set { sendTimeout = value; }
         }
 
-        private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
-        public int RequestTimeout
-        {
-            get { return (int) requestTimeout.TotalMilliseconds; }
-            set { requestTimeout = TimeSpan.FromMilliseconds(value); }
-        }
-
         #endregion
 
         #region ITransportFactory Members
@@ -163,7 +156,6 @@ namespace Apache.NMS.ActiveMQ.Transport.
             }
 
             transport = new WireFormatNegotiator(transport, wireformat);
-            transport.RequestTimeout = this.requestTimeout;
             if(setTransport != null)
             {
                 setTransport(transport, location);
@@ -178,7 +170,6 @@ namespace Apache.NMS.ActiveMQ.Transport.
 
             transport = new MutexTransport(transport);
             transport = new ResponseCorrelator(transport);
-            transport.RequestTimeout = this.requestTimeout;
 
             return transport;
         }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs Fri Jul  9 21:10:12 2010
@@ -92,22 +92,13 @@ namespace Apache.NMS.ActiveMQ.Transport
 		}
 
 		/// <summary>
-		/// Property RequestTimeout
-		/// </summary>
-		public TimeSpan RequestTimeout
-		{
-			get { return this.next.RequestTimeout; }
-			set { this.next.RequestTimeout = value; }
-		}
-
-		/// <summary>
 		/// Method Request
 		/// </summary>
 		/// <returns>A Response</returns>
 		/// <param name="command">A  Command</param>
 		public virtual Response Request(Command command)
-		{
-			return Request(command, RequestTimeout);
+		{			
+			return Request(command, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite));
 		}
 
 		/// <summary>

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs Fri Jul  9 21:10:12 2010
@@ -48,7 +48,7 @@ namespace Apache.NMS.Stomp
 		private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
 
 		private bool userSpecifiedClientID;
-		private TimeSpan requestTimeout;
+		private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
 		private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
 		private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable());
 		private readonly object myLock = new object();
@@ -67,7 +67,6 @@ namespace Apache.NMS.Stomp
 		public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
 		{
 			this.brokerUri = connectionUri;
-			this.requestTimeout = transport.RequestTimeout;
 			this.clientIdGenerator = clientIdGenerator;
 
 			this.transport = transport;

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs Fri Jul  9 21:10:12 2010
@@ -46,6 +46,7 @@ namespace Apache.NMS.Stomp
         private bool alwaysSyncSend;
         private bool sendAcksAsync=true;
         private bool dispatchAsync=true;
+        private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
         private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
 
         private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
@@ -231,6 +232,12 @@ namespace Apache.NMS.Stomp
             set { this.dispatchAsync = value; }
         }
 
+        public int RequestTimeout
+        {
+            get { return (int) this.requestTimeout.TotalMilliseconds; }
+            set { this.requestTimeout = TimeSpan.FromMilliseconds(value); }
+        }
+
         public AcknowledgementMode AcknowledgementMode
         {
             get { return acknowledgementMode; }
@@ -301,6 +308,7 @@ namespace Apache.NMS.Stomp
             connection.SendAcksAsync = this.SendAcksAsync;
             connection.DispatchAsync = this.DispatchAsync;
             connection.AcknowledgementMode = this.acknowledgementMode;
+            connection.RequestTimeout = this.requestTimeout;
             connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
             connection.PrefetchPolicy = this.prefetchPolicy.Clone() as PrefetchPolicy;
         }

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs Fri Jul  9 21:10:12 2010
@@ -66,7 +66,6 @@ namespace Apache.NMS.Stomp.Transport.Fai
         private int reconnectDelay = 10;
         private Exception connectionFailure;
         private bool firstConnection = true;
-        private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
         private volatile Exception failure;
         private readonly object mutex = new object();
 
@@ -156,12 +155,6 @@ namespace Apache.NMS.Stomp.Transport.Fai
             set { this.timeout = value; }
         }
 
-        public TimeSpan RequestTimeout
-        {
-            get { return requestTimeout; }
-            set { requestTimeout = value; }
-        }
-
         public int InitialReconnectDelay
         {
             get { return initialReconnectDelay; }

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/FutureResponse.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/FutureResponse.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/FutureResponse.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/FutureResponse.cs Fri Jul  9 21:10:12 2010
@@ -28,7 +28,7 @@ namespace Apache.NMS.Stomp.Transport
 	/// </summary>
 	public class FutureResponse
 	{
-		private static TimeSpan maxWait = TimeSpan.FromMilliseconds(Timeout.Infinite);
+		private TimeSpan maxWait = TimeSpan.FromMilliseconds(Timeout.Infinite);
 		public TimeSpan ResponseTimeout
 		{
 			get { return maxWait; }
@@ -48,30 +48,25 @@ namespace Apache.NMS.Stomp.Transport
 			// Blocks the caller until a value has been set
 			get
 			{
-				bool waitForResponse = false;
-
 				lock(latch)
 				{
-					if(null == response)
+					if(null != response)
 					{
-						waitForResponse = true;
+						return response;
 					}
 				}
 
-				if(waitForResponse)
+				try
 				{
-					try
-					{
-						if(!latch.await(maxWait))
-						{
-							// TODO: Throw timeout exception?
-						}
-					}
-					catch (Exception e)
+					if(!latch.await(maxWait))
 					{
-						Tracer.Error("Caught while waiting on monitor: " + e);
+						// TODO: Throw timeout exception?
 					}
 				}
+				catch (Exception e)
+				{
+					Tracer.Error("Caught while waiting on monitor: " + e);
+				}
 
 				lock(latch)
 				{

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransport.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransport.cs Fri Jul  9 21:10:12 2010
@@ -96,16 +96,6 @@ namespace Apache.NMS.Stomp.Transport
         /// A <see cref="System.Object"/>
         /// </returns>
         Object Narrow(Type type);            
-        
-        /// <value>
-        /// The time that the Transport waits before considering a request to have
-        /// failed and throwing an exception.
-        /// </value>
-        TimeSpan RequestTimeout
-        {
-            get;
-            set;
-        }
 
 		CommandHandler Command
 		{

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ResponseCorrelator.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ResponseCorrelator.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ResponseCorrelator.cs Fri Jul  9 21:10:12 2010
@@ -28,121 +28,158 @@ namespace Apache.NMS.Stomp.Transport
 	/// <summary>
 	/// A Transport that correlates asynchronous send/receive messages into single request/response.
 	/// </summary>
-	public class ResponseCorrelator : TransportFilter
-	{
-		private readonly IDictionary requestMap = Hashtable.Synchronized(new Hashtable());
-		private int nextCommandId;
-
-		public ResponseCorrelator(ITransport next) : base(next)
-		{
-		}
-
-		protected override void OnException(ITransport sender, Exception command)
-		{
-			base.OnException(sender, command);
-
-			foreach(DictionaryEntry entry in requestMap)
-			{
-				FutureResponse value = (FutureResponse) entry.Value;
-				ExceptionResponse response = new ExceptionResponse();
-				BrokerError error = new BrokerError();
-
-				error.Message = command.Message;
-				response.Exception = error;
-				value.Response = response;
-			}
+    public class ResponseCorrelator : TransportFilter
+    {
+        private readonly IDictionary requestMap = Hashtable.Synchronized(new Hashtable());
+        private int nextCommandId;
+        private Exception error;
+
+        public ResponseCorrelator(ITransport next) : base(next)
+        {
+        }
+
+        protected override void OnException(ITransport sender, Exception command)
+        {
+            Dispose(command);
+            base.OnException(sender, command);
+        }
 
-			requestMap.Clear();
-		}
-
-		internal int GetNextCommandId()
-		{
+        internal int GetNextCommandId()
+        {
             return Interlocked.Increment(ref nextCommandId);
-		}
-
-		public override void Oneway(Command command)
-		{
-			if(0 == command.CommandId)
-			{
-				command.CommandId = GetNextCommandId();
-			}
-
-			next.Oneway(command);
-		}
-
-		public override FutureResponse AsyncRequest(Command command)
-		{
-			int commandId = GetNextCommandId();
-
-			command.CommandId = commandId;
-			command.ResponseRequired = true;
-			FutureResponse future = new FutureResponse();
-			requestMap[commandId] = future;
-			next.Oneway(command);
-			return future;
-		}
-
-		public override Response Request(Command command, TimeSpan timeout)
-		{
-			FutureResponse future = AsyncRequest(command);
-			future.ResponseTimeout = timeout;
-			Response response = future.Response;
-
-			if(response != null && response is ExceptionResponse)
-			{
-				ExceptionResponse er = (ExceptionResponse) response;
-				BrokerError brokerError = er.Exception;
-
-				if (brokerError == null)
-				{
-					throw new BrokerException();
-				}
-				else
-				{
-					throw new BrokerException(brokerError);
-				}
-			}
-
-			return response;
-		}
-
-		protected override void OnCommand(ITransport sender, Command command)
-		{
-			if(command is Response)
-			{
-				Response response = (Response) command;
-				int correlationId = response.CorrelationId;
-				FutureResponse future = (FutureResponse) requestMap[correlationId];
-				
-				if(future != null)
-				{
-					requestMap.Remove(correlationId);
-					future.Response = response;
-
-					if(response is ExceptionResponse)
-					{
-						ExceptionResponse er = (ExceptionResponse) response;
-						BrokerError brokerError = er.Exception;
-						BrokerException exception = new BrokerException(brokerError);
-						this.exceptionHandler(this, exception);
-					}
-				}
-				else
-				{
-					Tracer.Error("Unknown response ID: " + response.CommandId + " for response: " + response);
-				}
-			}
-			else if(command is ShutdownInfo)
-			{
-				// lets shutdown
-				this.commandHandler(sender, command);
-			}
-			else
-			{
-				this.commandHandler(sender, command);
-			}
-		}
-	}
-}
+        }
 
+        public override void Oneway(Command command)
+        {
+            command.CommandId = GetNextCommandId();
+            command.ResponseRequired = false;
+            
+            next.Oneway(command);
+        }
+
+        public override FutureResponse AsyncRequest(Command command)
+        {
+            int commandId = GetNextCommandId();
+
+            command.CommandId = commandId;
+            command.ResponseRequired = true;
+            FutureResponse future = new FutureResponse();
+            Exception priorError = null;
+            lock(requestMap.SyncRoot) 
+            {
+                priorError = this.error;
+                if(priorError == null) 
+                {
+                    requestMap[commandId] = future;
+                }
+            }
+    
+            if(priorError != null) 
+            {
+                BrokerError brError = new BrokerError();
+                brError.Message = priorError.Message;
+                ExceptionResponse response = new ExceptionResponse();
+                response.Exception = brError;
+                future.Response = response;
+                throw priorError;
+            }
+            
+            next.Oneway(command);
+
+            return future;
+        }
+
+        public override Response Request(Command command, TimeSpan timeout)
+        {
+            FutureResponse future = AsyncRequest(command);
+            future.ResponseTimeout = timeout;
+            Response response = future.Response;
+
+            if(response != null && response is ExceptionResponse)
+            {
+                ExceptionResponse er = response as ExceptionResponse;
+                BrokerError brokerError = er.Exception;
+
+                if(brokerError == null)
+                {
+                    throw new BrokerException();
+                }
+                else
+                {
+                    throw new BrokerException(brokerError);
+                }
+            }
+
+            return response;
+        }
+
+        protected override void OnCommand(ITransport sender, Command command)
+        {
+            if(command is Response)
+            {
+                Response response = (Response) command;
+                int correlationId = response.CorrelationId;
+                FutureResponse future = (FutureResponse) requestMap[correlationId];
+
+                if(future != null)
+                {
+                    requestMap.Remove(correlationId);
+                    future.Response = response;
+
+                    if(response is ExceptionResponse)
+                    {
+                        ExceptionResponse er = response as ExceptionResponse;
+                        BrokerError brokerError = er.Exception;
+                        BrokerException exception = new BrokerException(brokerError);
+                        this.exceptionHandler(this, exception);
+                    }
+                }
+                else
+                {
+                    if(Tracer.IsDebugEnabled)
+                    {
+                        Tracer.Debug("Unknown response ID: " + response.CommandId + " for response: " + response);
+                    }
+                }
+            }
+            else
+            {
+                this.commandHandler(sender, command);
+            }
+        }
+        
+        public override void Stop()
+        {
+            this.Dispose(new IOException("Stopped"));
+            base.Stop();
+        }
+        
+        private void Dispose(Exception error)
+        {
+            ArrayList requests = null;
+            
+            lock(requestMap.SyncRoot) 
+            {
+                if(this.error == null) 
+                {
+                    this.error = error;
+                    requests = new ArrayList(requestMap.Values);
+                    requestMap.Clear();
+                }
+            }
+            
+            if(requests != null)
+            {
+                foreach(FutureResponse future in requests)
+                {
+                    BrokerError brError = new BrokerError();
+                    brError.Message = error.Message;
+                    ExceptionResponse response = new ExceptionResponse();
+                    response.Exception = brError;
+                    future.Response = response;
+                }
+            }
+        }
 
+    }
+}
\ No newline at end of file

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs Fri Jul  9 21:10:12 2010
@@ -39,7 +39,6 @@ namespace Apache.NMS.Stomp.Transport.Tcp
         private bool disposed = false;
         private Atomic<bool> closed = new Atomic<bool>(false);
         private volatile bool seenShutdown;
-        private TimeSpan maxWait = TimeSpan.FromMilliseconds(Timeout.Infinite);
         private Uri connectedUri;
 
         private CommandHandler commandHandler;
@@ -139,15 +138,6 @@ namespace Apache.NMS.Stomp.Transport.Tcp
             throw new NotImplementedException("Use a ResponseCorrelator if you want to issue AsyncRequest calls");
         }
 
-        /// <summary>
-        /// Property RequestTimeout
-        /// </summary>
-        public TimeSpan RequestTimeout
-        {
-            get { return this.maxWait; }
-            set { this.maxWait = value; }
-        }
-
         public bool TcpNoDelayEnabled
         {
 #if !NETCF
@@ -234,18 +224,7 @@ namespace Apache.NMS.Stomp.Transport.Tcp
 #endif
 )
                         {
-                            TimeSpan waitTime;
-
-                            if(maxWait < MAX_THREAD_WAIT)
-                            {
-                                waitTime = maxWait;
-                            }
-                            else
-                            {
-                                waitTime = MAX_THREAD_WAIT;
-                            }
-
-                            if(!readThread.Join((int) waitTime.TotalMilliseconds))
+                            if(!readThread.Join((int) MAX_THREAD_WAIT.TotalMilliseconds))
                             {
                                 readThread.Abort();
                             }

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs Fri Jul  9 21:10:12 2010
@@ -92,13 +92,6 @@ namespace Apache.NMS.Stomp.Transport.Tcp
 			set { sendTimeout = value; }
 		}
 
-		private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
-		public int RequestTimeout
-		{
-			get { return (int) requestTimeout.TotalMilliseconds; }
-			set { requestTimeout = TimeSpan.FromMilliseconds(value); }
-		}
-
 		#endregion
 
 		#region ITransportFactory Members
@@ -143,8 +136,6 @@ namespace Apache.NMS.Stomp.Transport.Tcp
                transport = new InactivityMonitor(transport);
             }
             
-			transport.RequestTimeout = this.requestTimeout;
-
 			if(setTransport != null)
 			{
 				setTransport(transport, location);
@@ -159,7 +150,6 @@ namespace Apache.NMS.Stomp.Transport.Tcp
 
 			transport = new MutexTransport(transport);
 			transport = new ResponseCorrelator(transport);
-			transport.RequestTimeout = this.requestTimeout;
 
 			return transport;
 		}

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFilter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFilter.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFilter.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFilter.cs Fri Jul  9 21:10:12 2010
@@ -92,22 +92,13 @@ namespace Apache.NMS.Stomp.Transport
 		}
 
 		/// <summary>
-		/// Property RequestTimeout
-		/// </summary>
-		public TimeSpan RequestTimeout
-		{
-			get { return this.next.RequestTimeout; }
-			set { this.next.RequestTimeout = value; }
-		}
-
-		/// <summary>
 		/// Method Request
 		/// </summary>
 		/// <returns>A Response</returns>
 		/// <param name="command">A  Command</param>
 		public virtual Response Request(Command command)
 		{
-			return Request(command, RequestTimeout);
+            return Request(command, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite));
 		}
 
 		/// <summary>