You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/10/27 19:30:27 UTC

svn commit: r468468 - /incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs

Author: jstrachan
Date: Fri Oct 27 10:30:27 2006
New Revision: 468468

URL: http://svn.apache.org/viewvc?view=rev&rev=468468
Log:
applied patch for AMQ-995 to fix the exception handling and close logic of the TCP transport

Modified:
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs

Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs?view=diff&rev=468468&r1=468467&r2=468468
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs Fri Oct 27 10:30:27 2006
@@ -39,7 +39,7 @@
         private BinaryWriter socketWriter;
         private Thread readThread;
         private bool started;
-        volatile private bool closed;
+        private Util.AtomicBoolean closed = new Util.AtomicBoolean(false);
         
         private CommandHandler commandHandler;
         private ExceptionHandler exceptionHandler;
@@ -89,37 +89,65 @@
         {
             throw new NotImplementedException("Use a ResponseCorrelator if you want to issue Request calls");
         }
-        
+
+        public void Close()
+        {
+            if (closed.compareAndSet(false, true))
+            {
+                socket.Close();
+                if (System.Threading.Thread.CurrentThread != readThread)
+                    readThread.Join();
+                socketWriter.Close();
+                socketReader.Close();
+            }
+        }
+
         public void Dispose()
         {
-            closed = true;
-            socket.Close();
-            readThread.Join();
-            socketWriter.Close();
-            socketReader.Close();
+            Close();
         }
         
         public void ReadLoop()
         {
-            while (!closed)
+            // This is the thread function for the reader thread. This runs continuously
+            // performing a blokcing read on the socket and dispatching all commands
+            // received.
+            //
+            // Exception Handling
+            // ------------------
+            // If an Exception occurs during the reading/marshalling, then the connection
+            // is effectively broken because psoition cannot be re-established to the next
+            // message.  This is reported to the app via the exceptionHandler and the socket
+            // is closed to prevent further communication attempts.
+            //
+            // An exception in the command handler may not be fatal to the transport, so
+            // these are simply reported to the exceptionHandler.
+            //
+            while (!closed.Value)
             {
+                Command command = null;
                 try
                 {
-                    Command command = (Command) Wireformat.Unmarshal(socketReader);
-                    this.commandHandler(this, command);
+                    command = (Command) Wireformat.Unmarshal(socketReader);
                 }
-                catch (ObjectDisposedException)
+                catch(Exception ex)
                 {
-                    break;
-                }
-                catch ( Exception e) {
-                    if( e.GetBaseException() is ObjectDisposedException ) {
+                    if( !closed.Value ) 
+                    {
+                        this.exceptionHandler(this, ex);
+                        // Close the socket as there's little that can be done with this transport now.
+                        Close();
                         break;
                     }
-                    if( !closed ) {
-                        this.exceptionHandler(this,e);
-                    }
-                    break;
+                }
+
+                try
+                {
+                    this.commandHandler(this, command);
+                }
+                catch ( Exception e) 
+                {
+                    this.exceptionHandler(this, e);
                 }
             }
         }