You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/10/27 19:30:27 UTC
svn commit: r468468 -
/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs
Author: jstrachan
Date: Fri Oct 27 10:30:27 2006
New Revision: 468468
URL: http://svn.apache.org/viewvc?view=rev&rev=468468
Log:
applied patch for AMQ-995 to fix the exception handling and close logic of the TCP transport
Modified:
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs
Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs?view=diff&rev=468468&r1=468467&r2=468468
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs Fri Oct 27 10:30:27 2006
@@ -39,7 +39,7 @@
private BinaryWriter socketWriter;
private Thread readThread;
private bool started;
- volatile private bool closed;
+ private Util.AtomicBoolean closed = new Util.AtomicBoolean(false);
private CommandHandler commandHandler;
private ExceptionHandler exceptionHandler;
@@ -89,37 +89,65 @@
{
throw new NotImplementedException("Use a ResponseCorrelator if you want to issue Request calls");
}
-
+
+ public void Close()
+ {
+ if (closed.compareAndSet(false, true))
+ {
+ socket.Close();
+ if (System.Threading.Thread.CurrentThread != readThread)
+ readThread.Join();
+ socketWriter.Close();
+ socketReader.Close();
+ }
+ }
+
public void Dispose()
{
- closed = true;
- socket.Close();
- readThread.Join();
- socketWriter.Close();
- socketReader.Close();
+ Close();
}
public void ReadLoop()
{
- while (!closed)
+ // This is the thread function for the reader thread. This runs continuously
+ // performing a blokcing read on the socket and dispatching all commands
+ // received.
+ //
+ // Exception Handling
+ // ------------------
+ // If an Exception occurs during the reading/marshalling, then the connection
+ // is effectively broken because psoition cannot be re-established to the next
+ // message. This is reported to the app via the exceptionHandler and the socket
+ // is closed to prevent further communication attempts.
+ //
+ // An exception in the command handler may not be fatal to the transport, so
+ // these are simply reported to the exceptionHandler.
+ //
+ while (!closed.Value)
{
+ Command command = null;
try
{
- Command command = (Command) Wireformat.Unmarshal(socketReader);
- this.commandHandler(this, command);
+ command = (Command) Wireformat.Unmarshal(socketReader);
}
- catch (ObjectDisposedException)
+ catch(Exception ex)
{
- break;
- }
- catch ( Exception e) {
- if( e.GetBaseException() is ObjectDisposedException ) {
+ if( !closed.Value )
+ {
+ this.exceptionHandler(this, ex);
+ // Close the socket as there's little that can be done with this transport now.
+ Close();
break;
}
- if( !closed ) {
- this.exceptionHandler(this,e);
- }
- break;
+ }
+
+ try
+ {
+ this.commandHandler(this, command);
+ }
+ catch ( Exception e)
+ {
+ this.exceptionHandler(this, e);
}
}
}