You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/12/19 20:50:12 UTC

svn commit: r605671 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java

Author: rajdavies
Date: Wed Dec 19 11:50:11 2007
New Revision: 605671

URL: http://svn.apache.org/viewvc?rev=605671&view=rev
Log:
close socket in a separate thread and only await stopLatch 
for 2 seconds - as it close could be called by InactivityMonitor

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=605671&r1=605670&r2=605671&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Dec 19 11:50:11 2007
@@ -30,6 +30,10 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.net.SocketFactory;
@@ -50,7 +54,7 @@
  */
 public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
     private static final Log LOG = LogFactory.getLog(TcpTransport.class);
-
+    private static final ThreadPoolExecutor SOCKET_CLOSE;
     protected final URI remoteLocation;
     protected final URI localLocation;
     protected final WireFormat wireFormat;
@@ -427,7 +431,23 @@
         // is hung.. then this hangs the close.
         // closeStreams();
         if (socket != null) {
-            socket.close();
+            //closing the socket can hang also 
+            final CountDownLatch latch = new CountDownLatch(1);
+            SOCKET_CLOSE.execute(new Runnable() {
+
+                public void run() {
+                    try {
+                        socket.close();
+                    } catch (IOException e) {
+                        LOG.debug("Caught exception closing socket",e);
+                    }finally {
+                        latch.countDown();
+                    }
+                }
+                
+            });
+            latch.await(1,TimeUnit.SECONDS);
+           
         }
     }
 
@@ -439,7 +459,7 @@
         super.stop();
         CountDownLatch countDownLatch = stoppedLatch.get();
         if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
-            countDownLatch.await();
+            countDownLatch.await(1,TimeUnit.SECONDS);
         }
     }
 
@@ -478,4 +498,13 @@
         return super.narrow(target);
     }
 
+    static {
+        SOCKET_CLOSE =   new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
+            public Thread newThread(Runnable runnable) {
+                Thread thread = new Thread(runnable, "TcpSocketClose: "+runnable);
+                thread.setDaemon(true);
+                return thread;
+            }
+        });
+    }
 }