You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2008/04/08 19:06:26 UTC
svn commit: r646000 - in
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport:
FutureResponse.cs Tcp/TcpTransport.cs
Author: jgomes
Date: Tue Apr 8 10:06:05 2008
New Revision: 646000
URL: http://svn.apache.org/viewvc?rev=646000&view=rev
Log:
[AMQNET-81] FutureResponse transport correlater does not handle transport timeout correctly.
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/FutureResponse.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/FutureResponse.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/FutureResponse.cs?rev=646000&r1=645999&r2=646000&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/FutureResponse.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/FutureResponse.cs Tue Apr 8 10:06:05 2008
@@ -22,57 +22,74 @@
namespace Apache.NMS.ActiveMQ.Transport
{
-
+
/// <summary>
/// Handles asynchronous responses
/// </summary>
- public class FutureResponse
- {
-
- private static int maxWait = -1;
- public int Timeout
- {
- get { return maxWait; }
- set { maxWait = value; }
- }
-
- private readonly CountDownLatch latch = new CountDownLatch(1);
- private Response response;
-
- public WaitHandle AsyncWaitHandle
- {
- get { return latch.AsyncWaitHandle; }
- }
-
- public Response Response
- {
- // Blocks the caller until a value has been set
- get {
- while (response == null)
- {
- try
+ public class FutureResponse
+ {
+
+ private static int maxWait = -1;
+ public int Timeout
+ {
+ get { return maxWait; }
+ set { maxWait = value; }
+ }
+
+ private readonly CountDownLatch latch = new CountDownLatch(1);
+ private Response response;
+
+ public WaitHandle AsyncWaitHandle
+ {
+ get { return latch.AsyncWaitHandle; }
+ }
+
+ public Response Response
+ {
+ // Blocks the caller until a value has been set
+ get
+ {
+ bool waitForResponse = false;
+
+ lock(latch)
+ {
+ if(null == response)
{
- latch.await(maxWait);
- }
- catch (Exception e)
+ waitForResponse = true;
+ }
+ }
+
+ if(waitForResponse)
+ {
+ try
+ {
+ if(!latch.await(maxWait))
+ {
+ // TODO: Throw timeout exception?
+ }
+ }
+ catch (Exception e)
{
- Tracer.Error("Caught while waiting on monitor: " + e);
- }
- }
- lock (latch)
- {
- return response;
- }
- }
-
- set {
- lock (latch)
- {
- response = value;
- }
- latch.countDown();
- }
- }
- }
+ Tracer.Error("Caught while waiting on monitor: " + e);
+ }
+ }
+
+ lock(latch)
+ {
+ return response;
+ }
+ }
+
+ set
+ {
+ lock(latch)
+ {
+ response = value;
+ }
+
+ latch.countDown();
+ }
+ }
+ }
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=646000&r1=645999&r2=646000&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs Tue Apr 8 10:06:05 2008
@@ -34,7 +34,6 @@
private readonly Socket socket;
private IWireFormat wireformat;
private BinaryReader socketReader;
- private readonly object socketReaderLock = new object();
private BinaryWriter socketWriter;
private readonly object socketWriterLock = new object();
private Thread readThread;
@@ -93,6 +92,11 @@
{
try
{
+ if(closed.Value)
+ {
+ throw new Exception("Error writing to broker. Transport connection is closed.");
+ }
+
Wireformat.Marshal(command, socketWriter);
socketWriter.Flush();
}
@@ -139,25 +143,46 @@
{
}
- lock(socketWriterLock)
+ try
{
- if(null != socketWriter)
+ lock(socketWriterLock)
{
- socketWriter.Close();
- socketWriter = null;
+ if(null != socketWriter)
+ {
+ socketWriter.Close();
+ }
}
}
+ catch
+ {
+ }
+ finally
+ {
+ socketWriter = null;
+ }
- lock(socketReaderLock)
+ try
{
if(null != socketReader)
{
socketReader.Close();
- socketReader = null;
}
}
+ catch
+ {
+ }
+ finally
+ {
+ socketReader = null;
+ }
- socket.Close();
+ try
+ {
+ socket.Close();
+ }
+ catch
+ {
+ }
if(null != readThread)
{