You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/07/09 23:10:13 UTC
svn commit: r962695 - in /activemq/activemq-dotnet:
Apache.NMS.ActiveMQ/trunk/src/main/csharp/
Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/
Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/
Apache.NMS.ActiveMQ/trunk/src/main/csharp/...
Author: tabish
Date: Fri Jul 9 21:10:12 2010
New Revision: 962695
URL: http://svn.apache.org/viewvc?rev=962695&view=rev
Log:
fix for: https://issues.apache.org/activemq/browse/AMQNET-259
fix for: https://issues.apache.org/activemq/browse/AMQNET-260
Fixes both NMS.ActiveMQ and NMS.Stomp
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/FutureResponse.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransportFactory.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/FutureResponse.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransport.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFilter.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Fri Jul 9 21:10:12 2010
@@ -73,7 +73,6 @@ namespace Apache.NMS.ActiveMQ
public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
{
this.brokerUri = connectionUri;
- this.requestTimeout = transport.RequestTimeout;
this.clientIdGenerator = clientIdGenerator;
this.transport = transport;
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs Fri Jul 9 21:10:12 2010
@@ -49,6 +49,7 @@ namespace Apache.NMS.ActiveMQ
private bool alwaysSyncSend;
private bool sendAcksAsync=true;
private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
+ private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
@@ -240,6 +241,12 @@ namespace Apache.NMS.ActiveMQ
get { return this.dispatchAsync; }
set { this.dispatchAsync = value; }
}
+
+ public int RequestTimeout
+ {
+ get { return (int)this.requestTimeout.TotalMilliseconds; }
+ set { this.requestTimeout = TimeSpan.FromMilliseconds(value); }
+ }
public string AckMode
{
@@ -330,6 +337,7 @@ namespace Apache.NMS.ActiveMQ
connection.SendAcksAsync = this.SendAcksAsync;
connection.AcknowledgementMode = this.acknowledgementMode;
connection.UseCompression = this.useCompression;
+ connection.RequestTimeout = this.requestTimeout;
connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
connection.PrefetchPolicy = this.prefetchPolicy.Clone() as PrefetchPolicy;
connection.CompressionPolicy = this.compressionPolicy.Clone() as ICompressionPolicy;
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs Fri Jul 9 21:10:12 2010
@@ -73,7 +73,6 @@ namespace Apache.NMS.ActiveMQ.Transport.
private List<BackupTransport> backups = new List<BackupTransport>();
private int backupPoolSize = 1;
private bool trackMessages = false;
- private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
private int maxCacheSize = 256;
private volatile Exception failure;
private readonly object mutex = new object();
@@ -187,12 +186,6 @@ namespace Apache.NMS.ActiveMQ.Transport.
set { this.timeout = value; }
}
- public TimeSpan RequestTimeout
- {
- get { return requestTimeout; }
- set { requestTimeout = value; }
- }
-
public int InitialReconnectDelay
{
get { return initialReconnectDelay; }
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/FutureResponse.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/FutureResponse.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/FutureResponse.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/FutureResponse.cs Fri Jul 9 21:10:12 2010
@@ -27,7 +27,7 @@ namespace Apache.NMS.ActiveMQ.Transport
/// </summary>
public class FutureResponse
{
- private static TimeSpan maxWait = TimeSpan.FromMilliseconds(Timeout.Infinite);
+ private TimeSpan maxWait = TimeSpan.FromMilliseconds(Timeout.Infinite);
public TimeSpan ResponseTimeout
{
get { return maxWait; }
@@ -37,39 +37,26 @@ namespace Apache.NMS.ActiveMQ.Transport
private readonly CountDownLatch latch = new CountDownLatch(1);
private Response response;
- public WaitHandle AsyncWaitHandle
- {
- get { return latch.AsyncWaitHandle; }
- }
-
public Response Response
{
// Blocks the caller until a value has been set
get
{
- bool waitForResponse = false;
-
lock(latch)
{
- if(null == response)
+ if(null != response)
{
- waitForResponse = true;
+ return response;
}
}
- if(waitForResponse)
+ try
{
- try
- {
- if(!latch.await(maxWait))
- {
- // TODO: Throw timeout exception?
- }
- }
- catch (Exception e)
- {
- Tracer.Error("Caught while waiting on monitor: " + e);
- }
+ latch.await(maxWait);
+ }
+ catch (Exception e)
+ {
+ Tracer.Error("Caught while waiting on monitor: " + e);
}
lock(latch)
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs Fri Jul 9 21:10:12 2010
@@ -97,16 +97,6 @@ namespace Apache.NMS.ActiveMQ.Transport
/// </returns>
Object Narrow(Type type);
- /// <value>
- /// The time that the Transport waits before considering a request to have
- /// failed and throwing an exception.
- /// </value>
- TimeSpan RequestTimeout
- {
- get;
- set;
- }
-
CommandHandler Command
{
get;
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs Fri Jul 9 21:10:12 2010
@@ -52,7 +52,6 @@ namespace Apache.NMS.ActiveMQ.Transport.
private ResumedHandler resumedHandler;
private bool disposed = false;
private bool started = false;
- private TimeSpan requestTimeout = TimeSpan.FromMilliseconds(Timeout.Infinite);
private TaskRunner asyncResponseTask;
private Queue<Command> receiveQueue = new Queue<Command>();
private IResponseBuilder responseBuilder = new OpenWireResponseBuilder();
@@ -263,12 +262,6 @@ namespace Apache.NMS.ActiveMQ.Transport.
}
#region Property Accessors
-
- public TimeSpan RequestTimeout
- {
- get{ return requestTimeout; }
- set{ this.requestTimeout = value; }
- }
public CommandHandler Command
{
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransportFactory.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransportFactory.cs Fri Jul 9 21:10:12 2010
@@ -32,13 +32,6 @@ namespace Apache.NMS.ActiveMQ.Transport.
#region Properties
- private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
- public int RequestTimeout
- {
- get { return (int) requestTimeout.TotalMilliseconds; }
- set { requestTimeout = TimeSpan.FromMilliseconds(value); }
- }
-
private bool useLogging = false;
public bool UseLogging
{
@@ -89,7 +82,6 @@ namespace Apache.NMS.ActiveMQ.Transport.
transport = new MutexTransport(transport);
transport = new ResponseCorrelator(transport);
- transport.RequestTimeout = this.requestTimeout;
return transport;
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ResponseCorrelator.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ResponseCorrelator.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ResponseCorrelator.cs Fri Jul 9 21:10:12 2010
@@ -29,6 +29,7 @@ namespace Apache.NMS.ActiveMQ.Transport
{
private readonly IDictionary requestMap = Hashtable.Synchronized(new Hashtable());
private int nextCommandId;
+ private Exception error;
public ResponseCorrelator(ITransport next) : base(next)
{
@@ -36,20 +37,8 @@ namespace Apache.NMS.ActiveMQ.Transport
protected override void OnException(ITransport sender, Exception command)
{
+ Dispose(command);
base.OnException(sender, command);
-
- foreach(DictionaryEntry entry in requestMap)
- {
- FutureResponse value = (FutureResponse) entry.Value;
- ExceptionResponse response = new ExceptionResponse();
- BrokerError error = new BrokerError();
-
- error.Message = command.Message;
- response.Exception = error;
- value.Response = response;
- }
-
- requestMap.Clear();
}
internal int GetNextCommandId()
@@ -59,11 +48,9 @@ namespace Apache.NMS.ActiveMQ.Transport
public override void Oneway(Command command)
{
- if(0 == command.CommandId)
- {
- command.CommandId = GetNextCommandId();
- }
-
+ command.CommandId = GetNextCommandId();
+ command.ResponseRequired = false;
+
next.Oneway(command);
}
@@ -74,9 +61,29 @@ namespace Apache.NMS.ActiveMQ.Transport
command.CommandId = commandId;
command.ResponseRequired = true;
FutureResponse future = new FutureResponse();
- requestMap[commandId] = future;
+ Exception priorError = null;
+ lock(requestMap.SyncRoot)
+ {
+ priorError = this.error;
+ if(priorError == null)
+ {
+ requestMap[commandId] = future;
+ }
+ }
+
+ if(priorError != null)
+ {
+ BrokerError brError = new BrokerError();
+ brError.Message = priorError.Message;
+ ExceptionResponse response = new ExceptionResponse();
+ response.Exception = brError;
+ future.Response = response;
+ throw priorError;
+ }
+
next.Oneway(command);
- return future;
+
+ return future;
}
public override Response Request(Command command, TimeSpan timeout)
@@ -87,10 +94,10 @@ namespace Apache.NMS.ActiveMQ.Transport
if(response != null && response is ExceptionResponse)
{
- ExceptionResponse er = (ExceptionResponse) response;
+ ExceptionResponse er = response as ExceptionResponse;
BrokerError brokerError = er.Exception;
- if (brokerError == null)
+ if(brokerError == null)
{
throw new BrokerException();
}
@@ -118,7 +125,7 @@ namespace Apache.NMS.ActiveMQ.Transport
if(response is ExceptionResponse)
{
- ExceptionResponse er = (ExceptionResponse) response;
+ ExceptionResponse er = response as ExceptionResponse;
BrokerError brokerError = er.Exception;
BrokerException exception = new BrokerException(brokerError);
this.exceptionHandler(this, exception);
@@ -132,16 +139,45 @@ namespace Apache.NMS.ActiveMQ.Transport
}
}
}
- else if(command is ShutdownInfo)
- {
- // lets shutdown
- this.commandHandler(sender, command);
- }
else
{
this.commandHandler(sender, command);
}
}
+
+ public override void Stop()
+ {
+ this.Dispose(new IOException("Stopped"));
+ base.Stop();
+ }
+
+ private void Dispose(Exception error)
+ {
+ ArrayList requests = null;
+
+ lock(requestMap.SyncRoot)
+ {
+ if(this.error == null)
+ {
+ this.error = error;
+ requests = new ArrayList(requestMap.Values);
+ requestMap.Clear();
+ }
+ }
+
+ if(requests != null)
+ {
+ foreach(FutureResponse future in requests)
+ {
+ BrokerError brError = new BrokerError();
+ brError.Message = error.Message;
+ ExceptionResponse response = new ExceptionResponse();
+ response.Exception = brError;
+ future.Response = response;
+ }
+ }
+ }
+
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs Fri Jul 9 21:10:12 2010
@@ -39,7 +39,6 @@ namespace Apache.NMS.ActiveMQ.Transport.
private bool disposed = false;
private Atomic<bool> closed = new Atomic<bool>(false);
private volatile bool seenShutdown;
- private TimeSpan maxWait = TimeSpan.FromMilliseconds(Timeout.Infinite);
private Uri connectedUri;
private CommandHandler commandHandler;
@@ -139,15 +138,6 @@ namespace Apache.NMS.ActiveMQ.Transport.
throw new NotImplementedException("Use a ResponseCorrelator if you want to issue AsyncRequest calls");
}
- /// <summary>
- /// Property RequestTimeout
- /// </summary>
- public TimeSpan RequestTimeout
- {
- get { return this.maxWait; }
- set { this.maxWait = value; }
- }
-
public bool TcpNoDelayEnabled
{
#if !NETCF
@@ -228,29 +218,14 @@ namespace Apache.NMS.ActiveMQ.Transport.
if(null != readThread)
{
- if(Thread.CurrentThread != readThread
-#if !NETCF
- && readThread.IsAlive
-#endif
-)
+ if(Thread.CurrentThread != readThread && readThread.IsAlive)
{
- TimeSpan waitTime;
-
- if(maxWait < MAX_THREAD_WAIT)
- {
- waitTime = maxWait;
- }
- else
- {
- waitTime = MAX_THREAD_WAIT;
- }
-
- if(!readThread.Join((int) waitTime.TotalMilliseconds))
+ if(!readThread.Join((int) MAX_THREAD_WAIT.TotalMilliseconds))
{
readThread.Abort();
}
}
-
+
readThread = null;
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs Fri Jul 9 21:10:12 2010
@@ -92,13 +92,6 @@ namespace Apache.NMS.ActiveMQ.Transport.
set { sendTimeout = value; }
}
- private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
- public int RequestTimeout
- {
- get { return (int) requestTimeout.TotalMilliseconds; }
- set { requestTimeout = TimeSpan.FromMilliseconds(value); }
- }
-
#endregion
#region ITransportFactory Members
@@ -163,7 +156,6 @@ namespace Apache.NMS.ActiveMQ.Transport.
}
transport = new WireFormatNegotiator(transport, wireformat);
- transport.RequestTimeout = this.requestTimeout;
if(setTransport != null)
{
setTransport(transport, location);
@@ -178,7 +170,6 @@ namespace Apache.NMS.ActiveMQ.Transport.
transport = new MutexTransport(transport);
transport = new ResponseCorrelator(transport);
- transport.RequestTimeout = this.requestTimeout;
return transport;
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs Fri Jul 9 21:10:12 2010
@@ -92,22 +92,13 @@ namespace Apache.NMS.ActiveMQ.Transport
}
/// <summary>
- /// Property RequestTimeout
- /// </summary>
- public TimeSpan RequestTimeout
- {
- get { return this.next.RequestTimeout; }
- set { this.next.RequestTimeout = value; }
- }
-
- /// <summary>
/// Method Request
/// </summary>
/// <returns>A Response</returns>
/// <param name="command">A Command</param>
public virtual Response Request(Command command)
- {
- return Request(command, RequestTimeout);
+ {
+ return Request(command, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite));
}
/// <summary>
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs Fri Jul 9 21:10:12 2010
@@ -48,7 +48,7 @@ namespace Apache.NMS.Stomp
private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
private bool userSpecifiedClientID;
- private TimeSpan requestTimeout;
+ private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable());
private readonly object myLock = new object();
@@ -67,7 +67,6 @@ namespace Apache.NMS.Stomp
public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
{
this.brokerUri = connectionUri;
- this.requestTimeout = transport.RequestTimeout;
this.clientIdGenerator = clientIdGenerator;
this.transport = transport;
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs Fri Jul 9 21:10:12 2010
@@ -46,6 +46,7 @@ namespace Apache.NMS.Stomp
private bool alwaysSyncSend;
private bool sendAcksAsync=true;
private bool dispatchAsync=true;
+ private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
@@ -231,6 +232,12 @@ namespace Apache.NMS.Stomp
set { this.dispatchAsync = value; }
}
+ public int RequestTimeout
+ {
+ get { return (int) this.requestTimeout.TotalMilliseconds; }
+ set { this.requestTimeout = TimeSpan.FromMilliseconds(value); }
+ }
+
public AcknowledgementMode AcknowledgementMode
{
get { return acknowledgementMode; }
@@ -301,6 +308,7 @@ namespace Apache.NMS.Stomp
connection.SendAcksAsync = this.SendAcksAsync;
connection.DispatchAsync = this.DispatchAsync;
connection.AcknowledgementMode = this.acknowledgementMode;
+ connection.RequestTimeout = this.requestTimeout;
connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
connection.PrefetchPolicy = this.prefetchPolicy.Clone() as PrefetchPolicy;
}
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs Fri Jul 9 21:10:12 2010
@@ -66,7 +66,6 @@ namespace Apache.NMS.Stomp.Transport.Fai
private int reconnectDelay = 10;
private Exception connectionFailure;
private bool firstConnection = true;
- private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
private volatile Exception failure;
private readonly object mutex = new object();
@@ -156,12 +155,6 @@ namespace Apache.NMS.Stomp.Transport.Fai
set { this.timeout = value; }
}
- public TimeSpan RequestTimeout
- {
- get { return requestTimeout; }
- set { requestTimeout = value; }
- }
-
public int InitialReconnectDelay
{
get { return initialReconnectDelay; }
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/FutureResponse.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/FutureResponse.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/FutureResponse.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/FutureResponse.cs Fri Jul 9 21:10:12 2010
@@ -28,7 +28,7 @@ namespace Apache.NMS.Stomp.Transport
/// </summary>
public class FutureResponse
{
- private static TimeSpan maxWait = TimeSpan.FromMilliseconds(Timeout.Infinite);
+ private TimeSpan maxWait = TimeSpan.FromMilliseconds(Timeout.Infinite);
public TimeSpan ResponseTimeout
{
get { return maxWait; }
@@ -48,30 +48,25 @@ namespace Apache.NMS.Stomp.Transport
// Blocks the caller until a value has been set
get
{
- bool waitForResponse = false;
-
lock(latch)
{
- if(null == response)
+ if(null != response)
{
- waitForResponse = true;
+ return response;
}
}
- if(waitForResponse)
+ try
{
- try
- {
- if(!latch.await(maxWait))
- {
- // TODO: Throw timeout exception?
- }
- }
- catch (Exception e)
+ if(!latch.await(maxWait))
{
- Tracer.Error("Caught while waiting on monitor: " + e);
+ // TODO: Throw timeout exception?
}
}
+ catch (Exception e)
+ {
+ Tracer.Error("Caught while waiting on monitor: " + e);
+ }
lock(latch)
{
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransport.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransport.cs Fri Jul 9 21:10:12 2010
@@ -96,16 +96,6 @@ namespace Apache.NMS.Stomp.Transport
/// A <see cref="System.Object"/>
/// </returns>
Object Narrow(Type type);
-
- /// <value>
- /// The time that the Transport waits before considering a request to have
- /// failed and throwing an exception.
- /// </value>
- TimeSpan RequestTimeout
- {
- get;
- set;
- }
CommandHandler Command
{
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ResponseCorrelator.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ResponseCorrelator.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ResponseCorrelator.cs Fri Jul 9 21:10:12 2010
@@ -28,121 +28,158 @@ namespace Apache.NMS.Stomp.Transport
/// <summary>
/// A Transport that correlates asynchronous send/receive messages into single request/response.
/// </summary>
- public class ResponseCorrelator : TransportFilter
- {
- private readonly IDictionary requestMap = Hashtable.Synchronized(new Hashtable());
- private int nextCommandId;
-
- public ResponseCorrelator(ITransport next) : base(next)
- {
- }
-
- protected override void OnException(ITransport sender, Exception command)
- {
- base.OnException(sender, command);
-
- foreach(DictionaryEntry entry in requestMap)
- {
- FutureResponse value = (FutureResponse) entry.Value;
- ExceptionResponse response = new ExceptionResponse();
- BrokerError error = new BrokerError();
-
- error.Message = command.Message;
- response.Exception = error;
- value.Response = response;
- }
+ public class ResponseCorrelator : TransportFilter
+ {
+ private readonly IDictionary requestMap = Hashtable.Synchronized(new Hashtable());
+ private int nextCommandId;
+ private Exception error;
+
+ public ResponseCorrelator(ITransport next) : base(next)
+ {
+ }
+
+ protected override void OnException(ITransport sender, Exception command)
+ {
+ Dispose(command);
+ base.OnException(sender, command);
+ }
- requestMap.Clear();
- }
-
- internal int GetNextCommandId()
- {
+ internal int GetNextCommandId()
+ {
return Interlocked.Increment(ref nextCommandId);
- }
-
- public override void Oneway(Command command)
- {
- if(0 == command.CommandId)
- {
- command.CommandId = GetNextCommandId();
- }
-
- next.Oneway(command);
- }
-
- public override FutureResponse AsyncRequest(Command command)
- {
- int commandId = GetNextCommandId();
-
- command.CommandId = commandId;
- command.ResponseRequired = true;
- FutureResponse future = new FutureResponse();
- requestMap[commandId] = future;
- next.Oneway(command);
- return future;
- }
-
- public override Response Request(Command command, TimeSpan timeout)
- {
- FutureResponse future = AsyncRequest(command);
- future.ResponseTimeout = timeout;
- Response response = future.Response;
-
- if(response != null && response is ExceptionResponse)
- {
- ExceptionResponse er = (ExceptionResponse) response;
- BrokerError brokerError = er.Exception;
-
- if (brokerError == null)
- {
- throw new BrokerException();
- }
- else
- {
- throw new BrokerException(brokerError);
- }
- }
-
- return response;
- }
-
- protected override void OnCommand(ITransport sender, Command command)
- {
- if(command is Response)
- {
- Response response = (Response) command;
- int correlationId = response.CorrelationId;
- FutureResponse future = (FutureResponse) requestMap[correlationId];
-
- if(future != null)
- {
- requestMap.Remove(correlationId);
- future.Response = response;
-
- if(response is ExceptionResponse)
- {
- ExceptionResponse er = (ExceptionResponse) response;
- BrokerError brokerError = er.Exception;
- BrokerException exception = new BrokerException(brokerError);
- this.exceptionHandler(this, exception);
- }
- }
- else
- {
- Tracer.Error("Unknown response ID: " + response.CommandId + " for response: " + response);
- }
- }
- else if(command is ShutdownInfo)
- {
- // lets shutdown
- this.commandHandler(sender, command);
- }
- else
- {
- this.commandHandler(sender, command);
- }
- }
- }
-}
+ }
+ public override void Oneway(Command command)
+ {
+ command.CommandId = GetNextCommandId();
+ command.ResponseRequired = false;
+
+ next.Oneway(command);
+ }
+
+ public override FutureResponse AsyncRequest(Command command)
+ {
+ int commandId = GetNextCommandId();
+
+ command.CommandId = commandId;
+ command.ResponseRequired = true;
+ FutureResponse future = new FutureResponse();
+ Exception priorError = null;
+ lock(requestMap.SyncRoot)
+ {
+ priorError = this.error;
+ if(priorError == null)
+ {
+ requestMap[commandId] = future;
+ }
+ }
+
+ if(priorError != null)
+ {
+ BrokerError brError = new BrokerError();
+ brError.Message = priorError.Message;
+ ExceptionResponse response = new ExceptionResponse();
+ response.Exception = brError;
+ future.Response = response;
+ throw priorError;
+ }
+
+ next.Oneway(command);
+
+ return future;
+ }
+
+ public override Response Request(Command command, TimeSpan timeout)
+ {
+ FutureResponse future = AsyncRequest(command);
+ future.ResponseTimeout = timeout;
+ Response response = future.Response;
+
+ if(response != null && response is ExceptionResponse)
+ {
+ ExceptionResponse er = response as ExceptionResponse;
+ BrokerError brokerError = er.Exception;
+
+ if(brokerError == null)
+ {
+ throw new BrokerException();
+ }
+ else
+ {
+ throw new BrokerException(brokerError);
+ }
+ }
+
+ return response;
+ }
+
+ protected override void OnCommand(ITransport sender, Command command)
+ {
+ if(command is Response)
+ {
+ Response response = (Response) command;
+ int correlationId = response.CorrelationId;
+ FutureResponse future = (FutureResponse) requestMap[correlationId];
+
+ if(future != null)
+ {
+ requestMap.Remove(correlationId);
+ future.Response = response;
+
+ if(response is ExceptionResponse)
+ {
+ ExceptionResponse er = response as ExceptionResponse;
+ BrokerError brokerError = er.Exception;
+ BrokerException exception = new BrokerException(brokerError);
+ this.exceptionHandler(this, exception);
+ }
+ }
+ else
+ {
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("Unknown response ID: " + response.CommandId + " for response: " + response);
+ }
+ }
+ }
+ else
+ {
+ this.commandHandler(sender, command);
+ }
+ }
+
+ public override void Stop()
+ {
+ this.Dispose(new IOException("Stopped"));
+ base.Stop();
+ }
+
+ private void Dispose(Exception error)
+ {
+ ArrayList requests = null;
+
+ lock(requestMap.SyncRoot)
+ {
+ if(this.error == null)
+ {
+ this.error = error;
+ requests = new ArrayList(requestMap.Values);
+ requestMap.Clear();
+ }
+ }
+
+ if(requests != null)
+ {
+ foreach(FutureResponse future in requests)
+ {
+ BrokerError brError = new BrokerError();
+ brError.Message = error.Message;
+ ExceptionResponse response = new ExceptionResponse();
+ response.Exception = brError;
+ future.Response = response;
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs Fri Jul 9 21:10:12 2010
@@ -39,7 +39,6 @@ namespace Apache.NMS.Stomp.Transport.Tcp
private bool disposed = false;
private Atomic<bool> closed = new Atomic<bool>(false);
private volatile bool seenShutdown;
- private TimeSpan maxWait = TimeSpan.FromMilliseconds(Timeout.Infinite);
private Uri connectedUri;
private CommandHandler commandHandler;
@@ -139,15 +138,6 @@ namespace Apache.NMS.Stomp.Transport.Tcp
throw new NotImplementedException("Use a ResponseCorrelator if you want to issue AsyncRequest calls");
}
- /// <summary>
- /// Property RequestTimeout
- /// </summary>
- public TimeSpan RequestTimeout
- {
- get { return this.maxWait; }
- set { this.maxWait = value; }
- }
-
public bool TcpNoDelayEnabled
{
#if !NETCF
@@ -234,18 +224,7 @@ namespace Apache.NMS.Stomp.Transport.Tcp
#endif
)
{
- TimeSpan waitTime;
-
- if(maxWait < MAX_THREAD_WAIT)
- {
- waitTime = maxWait;
- }
- else
- {
- waitTime = MAX_THREAD_WAIT;
- }
-
- if(!readThread.Join((int) waitTime.TotalMilliseconds))
+ if(!readThread.Join((int) MAX_THREAD_WAIT.TotalMilliseconds))
{
readThread.Abort();
}
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs Fri Jul 9 21:10:12 2010
@@ -92,13 +92,6 @@ namespace Apache.NMS.Stomp.Transport.Tcp
set { sendTimeout = value; }
}
- private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
- public int RequestTimeout
- {
- get { return (int) requestTimeout.TotalMilliseconds; }
- set { requestTimeout = TimeSpan.FromMilliseconds(value); }
- }
-
#endregion
#region ITransportFactory Members
@@ -143,8 +136,6 @@ namespace Apache.NMS.Stomp.Transport.Tcp
transport = new InactivityMonitor(transport);
}
- transport.RequestTimeout = this.requestTimeout;
-
if(setTransport != null)
{
setTransport(transport, location);
@@ -159,7 +150,6 @@ namespace Apache.NMS.Stomp.Transport.Tcp
transport = new MutexTransport(transport);
transport = new ResponseCorrelator(transport);
- transport.RequestTimeout = this.requestTimeout;
return transport;
}
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFilter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFilter.cs?rev=962695&r1=962694&r2=962695&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFilter.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFilter.cs Fri Jul 9 21:10:12 2010
@@ -92,22 +92,13 @@ namespace Apache.NMS.Stomp.Transport
}
/// <summary>
- /// Property RequestTimeout
- /// </summary>
- public TimeSpan RequestTimeout
- {
- get { return this.next.RequestTimeout; }
- set { this.next.RequestTimeout = value; }
- }
-
- /// <summary>
/// Method Request
/// </summary>
/// <returns>A Response</returns>
/// <param name="command">A Command</param>
public virtual Response Request(Command command)
{
- return Request(command, RequestTimeout);
+ return Request(command, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite));
}
/// <summary>