You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/12/19 20:50:12 UTC
svn commit: r605671 -
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
Author: rajdavies
Date: Wed Dec 19 11:50:11 2007
New Revision: 605671
URL: http://svn.apache.org/viewvc?rev=605671&view=rev
Log:
close socket in a separate thread and only await stopLatch
for 2 seconds - as it close could be called by InactivityMonitor
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=605671&r1=605670&r2=605671&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Dec 19 11:50:11 2007
@@ -30,6 +30,10 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.SocketFactory;
@@ -50,7 +54,7 @@
*/
public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
private static final Log LOG = LogFactory.getLog(TcpTransport.class);
-
+ private static final ThreadPoolExecutor SOCKET_CLOSE;
protected final URI remoteLocation;
protected final URI localLocation;
protected final WireFormat wireFormat;
@@ -427,7 +431,23 @@
// is hung.. then this hangs the close.
// closeStreams();
if (socket != null) {
- socket.close();
+ //closing the socket can hang also
+ final CountDownLatch latch = new CountDownLatch(1);
+ SOCKET_CLOSE.execute(new Runnable() {
+
+ public void run() {
+ try {
+ socket.close();
+ } catch (IOException e) {
+ LOG.debug("Caught exception closing socket",e);
+ }finally {
+ latch.countDown();
+ }
+ }
+
+ });
+ latch.await(1,TimeUnit.SECONDS);
+
}
}
@@ -439,7 +459,7 @@
super.stop();
CountDownLatch countDownLatch = stoppedLatch.get();
if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
- countDownLatch.await();
+ countDownLatch.await(1,TimeUnit.SECONDS);
}
}
@@ -478,4 +498,13 @@
return super.narrow(target);
}
+ static {
+ SOCKET_CLOSE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable, "TcpSocketClose: "+runnable);
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
+ }
}