You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2013/11/25 10:16:11 UTC

svn commit: r1545184 - in /qpid/branches/0.26/qpid: ./ java/ java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java

Author: kwall
Date: Mon Nov 25 09:16:10 2013
New Revision: 1545184

URL: http://svn.apache.org/r1545184
Log:
QPID-5282: Change IoSender to cause the socket to be closed after a sender timeout.

Merged from trunk with the following command:

svn merge -c1543721  https://svn.apache.org/repos/asf/qpid/trunk/qpid


Modified:
    qpid/branches/0.26/qpid/   (props changed)
    qpid/branches/0.26/qpid/java/   (props changed)
    qpid/branches/0.26/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java

Propchange: qpid/branches/0.26/qpid/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid:r1543721

Propchange: qpid/branches/0.26/qpid/java/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java:r1543721

Modified: qpid/branches/0.26/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.26/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=1545184&r1=1545183&r2=1545184&view=diff
==============================================================================
--- qpid/branches/0.26/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java (original)
+++ qpid/branches/0.26/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java Mon Nov 25 09:16:10 2013
@@ -60,6 +60,7 @@ public final class IoSender implements R
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final Thread senderThread;
     private final List<Closeable> _listeners = new ArrayList<Closeable>();
+    private final String _remoteSocketAddress;
 
     private volatile Throwable exception = null;
 
@@ -68,6 +69,7 @@ public final class IoSender implements R
         this.socket = socket;
         this.buffer = new byte[pof2(bufferSize)]; // buffer size must be a power of 2
         this.timeout = timeout;
+        _remoteSocketAddress = socket.getRemoteSocketAddress().toString();
 
         try
         {
@@ -89,7 +91,7 @@ public final class IoSender implements R
         }
 
         senderThread.setDaemon(true);
-        senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress()));
+        senderThread.setName(String.format("IoSender - %s", _remoteSocketAddress));
     }
 
     public void initiate()
@@ -109,13 +111,11 @@ public final class IoSender implements R
 
     public void send(ByteBuffer buf)
     {
-        if (closed.get())
-        {
-            throw new SenderClosedException("sender is closed", exception);
-        }
+        checkNotAlreadyClosed();
+
         if(!senderThread.isAlive())
         {
-            throw new SenderException("sender thread not alive");
+            throw new SenderException(String.format("sender thread for socket %s is not alive", _remoteSocketAddress));
         }
 
         final int size = buffer.length;
@@ -131,7 +131,7 @@ public final class IoSender implements R
                 flush();
                 synchronized (notFull)
                 {
-                    long start = System.currentTimeMillis();
+                    final long start = System.currentTimeMillis();
                     long elapsed = 0;
                     while (!closed.get() && head - tail >= size && elapsed < timeout)
                     {
@@ -146,14 +146,19 @@ public final class IoSender implements R
                         elapsed += System.currentTimeMillis() - start;
                     }
 
-                    if (closed.get())
-                    {
-                        throw new SenderClosedException("sender is closed", exception);
-                    }
+                    checkNotAlreadyClosed();
 
                     if (head - tail >= size)
                     {
-                        throw new SenderException(String.format("write timed out: %s, %s", head, tail));
+                        try
+                        {
+                            log.error("write timed out for socket %s: head %d, tail %d", _remoteSocketAddress, head, tail);
+                            throw new SenderException(String.format("write timed out for socket %s: head %d, tail %d",  _remoteSocketAddress, head, tail));
+                        }
+                        finally
+                        {
+                            close(false, false);
+                        }
                     }
                 }
                 continue;
@@ -191,10 +196,10 @@ public final class IoSender implements R
 
     public void close()
     {
-        close(true);
+        close(true, true);
     }
 
-    void close(boolean reportException)
+    private void close(boolean awaitSenderBeforeClose, boolean reportException)
     {
         if (!closed.getAndSet(true))
         {
@@ -210,21 +215,11 @@ public final class IoSender implements R
 
             try
             {
-                if (Thread.currentThread() != senderThread)
+                if (awaitSenderBeforeClose)
                 {
-                    senderThread.join(timeout);
-                    if (senderThread.isAlive())
-                    {
-                        log.error("join timed out");
-                        throw new SenderException("join timed out");
-                    }
+                    awaitSenderThreadShutdown();
                 }
             }
-            catch (InterruptedException e)
-            {
-                log.error("interrupted whilst waiting for sender thread to stop");
-                throw new SenderException(e);
-            }
             finally
             {
                 closeListeners();
@@ -247,7 +242,7 @@ public final class IoSender implements R
             }
             catch(Exception e)
             {
-                log.error("Exception closing listener: " + e.getMessage());
+                log.error(e, "Exception closing listener for socket %s", _remoteSocketAddress);
                 ex = e;
             }
         }
@@ -316,7 +311,7 @@ public final class IoSender implements R
             {
                 log.error(e, "error in write thread");
                 exception = e;
-                close(false);
+                close(false, false);
                 break;
             }
             tail += length;
@@ -346,4 +341,33 @@ public final class IoSender implements R
     {
         _listeners.add(listener);
     }
+
+    private void awaitSenderThreadShutdown()
+    {
+        if (Thread.currentThread() != senderThread)
+        {
+            try
+            {
+                senderThread.join(timeout);
+                if (senderThread.isAlive())
+                {
+                    log.error("join timed out for socket %s to stop", _remoteSocketAddress);
+                    throw new SenderException(String.format("join timed out for socket %s to stop", _remoteSocketAddress));
+                }
+            }
+            catch (InterruptedException e)
+            {
+                log.error("interrupted whilst waiting for sender thread for socket %s to stop", _remoteSocketAddress);
+                throw new SenderException(e);
+            }
+        }
+    }
+
+    private void checkNotAlreadyClosed()
+    {
+        if (closed.get())
+        {
+            throw new SenderClosedException(String.format("sender for socket %s is closed", _remoteSocketAddress), exception);
+        }
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org