You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/07/31 22:31:28 UTC

svn commit: r681474 - in /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io: IoReceiver.java IoSender.java IoTransport.java

Author: rhs
Date: Thu Jul 31 13:31:28 2008
New Revision: 681474

URL: http://svn.apache.org/viewvc?rev=681474&view=rev
Log:
QPID-1207: fixed io transport close to ensure threads shutdown properly

Modified:
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java?rev=681474&r1=681473&r2=681474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java Thu Jul 31 13:31:28 2008
@@ -28,6 +28,7 @@
 import java.io.IOException;
 import java.net.Socket;
 import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * IoReceiver
@@ -43,18 +44,50 @@
     private final Receiver<ByteBuffer> receiver;
     private final int bufferSize;
     private final Socket socket;
+    private final long timeout;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
 
-    public IoReceiver(IoTransport transport, Receiver<ByteBuffer> receiver, int bufferSize)
+    public IoReceiver(IoTransport transport, Receiver<ByteBuffer> receiver,
+                      int bufferSize, long timeout)
     {
         this.transport = transport;
         this.receiver = receiver;
         this.bufferSize = bufferSize;
         this.socket = transport.getSocket();
+        this.timeout = timeout;
 
-        setName(String.format("IoReceive - %s", socket.getRemoteSocketAddress()));
+        setDaemon(true);
+        setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress()));
         start();
     }
 
+    void close()
+    {
+        if (!closed.getAndSet(true))
+        {
+            try
+            {
+                socket.shutdownInput();
+                if (Thread.currentThread() != this)
+                {
+                    join(timeout);
+                    if (isAlive())
+                    {
+                        throw new TransportException("join timed out");
+                    }
+                }
+            }
+            catch (InterruptedException e)
+            {
+                throw new TransportException(e);
+            }
+            catch (IOException e)
+            {
+                throw new TransportException(e);
+            }
+        }
+    }
+
     public void run()
     {
         final int threshold = bufferSize / 2;
@@ -67,7 +100,7 @@
             InputStream in = socket.getInputStream();
             int read = 0;
             int offset = 0;
-            while ((read = in.read(buffer, offset, bufferSize-offset)) != -1)
+            while (!closed.get() && (read = in.read(buffer, offset, bufferSize-offset)) != -1)
             {
                 if (read > 0)
                 {
@@ -84,22 +117,12 @@
         }
         catch (Throwable t)
         {
-            receiver.exception(new TransportException("error in read thread", t));
+            receiver.exception(t);
         }
         finally
         {
-            try
-            {
-                transport.getSender().close();
-            }
-            catch (TransportException e)
-            {
-                log.error(e, "error closing");
-            }
-            finally
-            {
-                receiver.closed();
-            }
+            receiver.closed();
+            transport.getSender().close();
         }
     }
 

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=681474&r1=681473&r2=681474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java Thu Jul 31 13:31:28 2008
@@ -24,7 +24,6 @@
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.TransportException;
@@ -55,7 +54,7 @@
     private final Object notEmpty = new Object();
     private final AtomicBoolean closed = new AtomicBoolean(false);
 
-    private IOException exception = null;
+    private volatile Throwable exception = null;
 
 
     public IoSender(IoTransport transport, int bufferSize, long timeout)
@@ -74,6 +73,7 @@
             throw new TransportException("Error getting output stream for socket", e);
         }
 
+        setDaemon(true);
         setName(String.format("IoSender - %s", socket.getRemoteSocketAddress()));
         start();
     }
@@ -159,6 +159,11 @@
 
     public void close()
     {
+        close(true);
+    }
+
+    void close(boolean reportException)
+    {
         if (!closed.getAndSet(true))
         {
             synchronized (notEmpty)
@@ -168,11 +173,15 @@
 
             try
             {
-                join(timeout);
-                if (isAlive())
+                if (Thread.currentThread() != this)
                 {
-                    throw new TransportException("join timed out");
+                    join(timeout);
+                    if (isAlive())
+                    {
+                        throw new TransportException("join timed out");
+                    }
                 }
+                transport.getReceiver().close();
                 socket.close();
             }
             catch (InterruptedException e)
@@ -184,7 +193,7 @@
                 throw new TransportException(e);
             }
 
-            if (exception != null)
+            if (reportException && exception != null)
             {
                 throw new TransportException(exception);
             }
@@ -246,6 +255,7 @@
             {
                 log.error(e, "error in write thread");
                 exception = e;
+                close(false);
                 break;
             }
             tail.getAndAdd(length);

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java?rev=681474&r1=681473&r2=681474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java Thu Jul 31 13:31:28 2008
@@ -106,7 +106,8 @@
         Connection conn = new Connection
             (new Disassembler(new OutputHandler(sender), 64*1024 - 1),
              delegate);
-        receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)), 2*readBufferSize);
+        receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)),
+                                  2*readBufferSize, timeout);
 
         return conn;
     }