You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by da...@apache.org on 2012/09/07 13:58:22 UTC

svn commit: r1381985 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/ broker/jmx/ transport/failover/ transport/fanout/ transport/nio/ transport/tcp/ transport/vm/ util/

Author: davsclaus
Date: Fri Sep  7 11:58:21 2012
New Revision: 1381985

URL: http://svn.apache.org/viewvc?rev=1381985&view=rev
Log:
AMQ-3451: Ensure thread pools is shutdown properly to avoid any leaks. Do not use the old @deprecated thread pool.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1381985&r1=1381984&r2=1381985&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Fri Sep  7 11:58:21 2012
@@ -52,7 +52,6 @@ import org.apache.activemq.state.Consume
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.state.SessionState;
 import org.apache.activemq.state.TransactionState;
-import org.apache.activemq.thread.DefaultThreadPools;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
@@ -117,6 +116,7 @@ public class TransportConnection impleme
     private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
     private DemandForwardingBridge duplexBridge;
     private final TaskRunnerFactory taskRunnerFactory;
+    private final TaskRunnerFactory stopTaskRunnerFactory;
     private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
     private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
     private String duplexNetworkConnectorId;
@@ -125,18 +125,20 @@ public class TransportConnection impleme
     /**
      * @param taskRunnerFactory - can be null if you want direct dispatch to the transport
      *                          else commands are sent async.
+     * @param stopTaskRunnerFactory - can <b>not</b> be null, used for stopping this connection.
      */
     public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
-                               TaskRunnerFactory taskRunnerFactory) {
+                               TaskRunnerFactory taskRunnerFactory, TaskRunnerFactory stopTaskRunnerFactory) {
         this.connector = connector;
         this.broker = broker;
-        this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
         RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
         brokerConnectionStates = rb.getConnectionStates();
         if (connector != null) {
             this.statistics.setParent(connector.getStatistics());
+            this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
         }
         this.taskRunnerFactory = taskRunnerFactory;
+        this.stopTaskRunnerFactory = stopTaskRunnerFactory;
         this.transport = transport;
         this.transport.setTransportListener(new DefaultTransportListener() {
             @Override
@@ -939,6 +941,9 @@ public class TransportConnection impleme
     }
 
     public void stop() throws Exception {
+        // do not stop task the task runner factories (taskRunnerFactory, stopTaskRunnerFactory)
+        // as their lifecycle is handled elsewhere
+
         stopAsync();
         while (!stopped.await(5, TimeUnit.SECONDS)) {
             LOG.info("The connection to '" + transport.getRemoteAddress() + "' is taking a long time to shutdown.");
@@ -952,7 +957,7 @@ public class TransportConnection impleme
                 stopError = cause;
             }
             try {
-                DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
+                stopTaskRunnerFactory.execute(new Runnable() {
                     public void run() {
                         try {
                             Thread.sleep(waitTime);
@@ -961,9 +966,9 @@ public class TransportConnection impleme
                         } catch (InterruptedException e) {
                         }
                     }
-                }, "delayedStop:" + transport.getRemoteAddress());
+                });
             } catch (Throwable t) {
-                LOG.warn("cannot create stopAsync :", t);
+                LOG.warn("Cannot create stopAsync. This exception will be ignored.", t);
             }
         }
     }
@@ -988,7 +993,7 @@ public class TransportConnection impleme
                 }
             }
             try {
-                DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
+                stopTaskRunnerFactory.execute(new Runnable() {
                     public void run() {
                         serviceLock.writeLock().lock();
                         try {
@@ -1000,9 +1005,9 @@ public class TransportConnection impleme
                             serviceLock.writeLock().unlock();
                         }
                     }
-                }, "StopAsync:" + transport.getRemoteAddress());
+                });
             } catch (Throwable t) {
-                LOG.warn("cannot create async transport stopper thread.. not waiting for stop to complete, reason:", t);
+                LOG.warn("Cannot create async transport stopper thread. This exception is ignored. Not waiting for stop to complete", t);
                 stopped.countDown();
             }
         }
@@ -1013,8 +1018,8 @@ public class TransportConnection impleme
         return "Transport Connection to: " + transport.getRemoteAddress();
     }
 
-    protected void doStop() throws Exception, InterruptedException {
-        LOG.debug("Stopping connection: " + transport.getRemoteAddress());
+    protected void doStop() throws Exception {
+        LOG.debug("Stopping connection: {}", transport.getRemoteAddress());
         connector.onStopped(this);
         try {
             synchronized (this) {
@@ -1026,16 +1031,17 @@ public class TransportConnection impleme
                 }
             }
         } catch (Exception ignore) {
-            LOG.trace("Exception caught stopping", ignore);
+            LOG.trace("Exception caught stopping. This exception is ignored.", ignore);
         }
         try {
             transport.stop();
             LOG.debug("Stopped transport: " + transport.getRemoteAddress());
         } catch (Exception e) {
-            LOG.debug("Could not stop transport: " + e, e);
+            LOG.debug("Could not stop transport to " + transport.getRemoteAddress() + ". This exception is ignored.", e);
         }
         if (taskRunner != null) {
             taskRunner.shutdown(1);
+            taskRunner = null;
         }
         active = false;
         // Run the MessageDispatch callbacks so that message references get
@@ -1063,14 +1069,14 @@ public class TransportConnection impleme
             for (TransportConnectionState cs : connectionStates) {
                 cs.getContext().getStopping().set(true);
                 try {
-                    LOG.debug("Cleaning up connection resources: " + getRemoteAddress());
+                    LOG.debug("Cleaning up connection resources: {}", getRemoteAddress());
                     processRemoveConnection(cs.getInfo().getConnectionId(), 0l);
                 } catch (Throwable ignore) {
                     ignore.printStackTrace();
                 }
             }
         }
-        LOG.debug("Connection Stopped: " + getRemoteAddress());
+        LOG.debug("Connection Stopped: {}", getRemoteAddress());
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=1381985&r1=1381984&r2=1381985&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java Fri Sep  7 11:58:21 2012
@@ -32,7 +32,6 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.ConnectionControl;
 import org.apache.activemq.security.MessageAuthorizationPolicy;
-import org.apache.activemq.thread.DefaultThreadPools;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportAcceptListener;
@@ -220,7 +219,7 @@ public class TransportConnector implemen
         getServer().setAcceptListener(new TransportAcceptListener() {
             public void onAccept(final Transport transport) {
                 try {
-                    DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
+                    brokerService.getTaskRunnerFactory().execute(new Runnable() {
                         public void run() {
                             try {
                                 Connection connection = createConnection(transport);
@@ -310,8 +309,10 @@ public class TransportConnector implemen
     // Implementation methods
     // -------------------------------------------------------------------------
     protected Connection createConnection(Transport transport) throws IOException {
+        // prefer to use task runner from broker service as stop task runner, as we can then
+        // tie it to the lifecycle of the broker service
         TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null
-                : taskRunnerFactory);
+                : taskRunnerFactory, brokerService.getTaskRunnerFactory());
         boolean statEnabled = this.getStatistics().isEnabled();
         answer.getStatistics().setEnabled(statEnabled);
         answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java?rev=1381985&r1=1381984&r2=1381985&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java Fri Sep  7 11:58:21 2012
@@ -49,9 +49,10 @@ public class ManagedTransportConnection 
     private final boolean populateUserName;
 
     public ManagedTransportConnection(TransportConnector connector, Transport transport, Broker broker,
-                                      TaskRunnerFactory factory, ManagementContext context, ObjectName connectorName)
+                                      TaskRunnerFactory factory, TaskRunnerFactory stopFactory,
+                                      ManagementContext context, ObjectName connectorName)
         throws IOException {
-        super(connector, transport, broker, factory);
+        super(connector, transport, broker, factory, stopFactory);
         this.managementContext = context;
         this.connectorName = connectorName;
         this.mbean = new ConnectionView(this, managementContext);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java?rev=1381985&r1=1381984&r2=1381985&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java Fri Sep  7 11:58:21 2012
@@ -49,7 +49,10 @@ public class ManagedTransportConnector e
     }
 
     protected Connection createConnection(Transport transport) throws IOException {
-        return new ManagedTransportConnection(this, transport, getBroker(), isDisableAsyncDispatch() ? null : getTaskRunnerFactory(), managementContext, connectorName);
+        // prefer to use task runner from broker service as stop task runner, as we can then
+        // tie it to the lifecycle of the broker service
+        return new ManagedTransportConnection(this, transport, getBroker(), isDisableAsyncDispatch() ? null : getTaskRunnerFactory(),
+                getBrokerService().getTaskRunnerFactory(), managementContext, connectorName);
     }
 
     protected static synchronized long getNextConnectionId() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=1381985&r1=1381984&r2=1381985&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Fri Sep  7 11:58:21 2012
@@ -46,9 +46,9 @@ import org.apache.activemq.command.Remov
 import org.apache.activemq.command.Response;
 import org.apache.activemq.state.ConnectionStateTracker;
 import org.apache.activemq.state.Tracked;
-import org.apache.activemq.thread.DefaultThreadPools;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
+import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.CompositeTransport;
 import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.FutureResponse;
@@ -86,6 +86,7 @@ public class FailoverTransport implement
     private URI connectedTransportURI;
     private URI failedConnectTransportURI;
     private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>();
+    private final TaskRunnerFactory reconnectTaskFactory;
     private final TaskRunner reconnectTask;
     private boolean started;
     private boolean initialized;
@@ -128,7 +129,9 @@ public class FailoverTransport implement
         brokerSslContext = SslContext.getCurrentSslContext();
         stateTracker.setTrackTransactions(true);
         // Setup a task that is used to reconnect the a connection async.
-        reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
+        reconnectTaskFactory = new TaskRunnerFactory();
+        reconnectTaskFactory.init();
+        reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() {
             public boolean iterate() {
                 boolean result = false;
                 if (!started) {
@@ -345,26 +348,31 @@ public class FailoverTransport implement
         Transport transportToStop = null;
         List<Transport> backupsToStop = new ArrayList<Transport>(backups.size());
 
-        synchronized (reconnectMutex) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Stopped " + this);
-            }
-            if (!started) {
-                return;
-            }
-            started = false;
-            disposed = true;
-            connected = false;
+        try {
+            synchronized (reconnectMutex) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Stopped " + this);
+                }
+                if (!started) {
+                    return;
+                }
+                started = false;
+                disposed = true;
+                connected = false;
 
-            if (connectedTransport.get() != null) {
-                transportToStop = connectedTransport.getAndSet(null);
+                if (connectedTransport.get() != null) {
+                    transportToStop = connectedTransport.getAndSet(null);
+                }
+                reconnectMutex.notifyAll();
             }
-            reconnectMutex.notifyAll();
-        }
-        synchronized (sleepMutex) {
-            sleepMutex.notifyAll();
+            synchronized (sleepMutex) {
+                sleepMutex.notifyAll();
+            }
+        } finally {
+            reconnectTask.shutdown();
+            reconnectTaskFactory.shutdownNow();
         }
-        reconnectTask.shutdown();
+
         synchronized(backupMutex) {
             for (BackupTransport backup : backups) {
                 backup.setDisposed(true);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java?rev=1381985&r1=1381984&r2=1381985&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java Fri Sep  7 11:58:21 2012
@@ -30,9 +30,9 @@ import org.apache.activemq.command.Messa
 import org.apache.activemq.command.RemoveInfo;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.state.ConnectionStateTracker;
-import org.apache.activemq.thread.DefaultThreadPools;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
+import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.CompositeTransport;
 import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.FutureResponse;
@@ -63,6 +63,7 @@ public class FanoutTransport implements 
     private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
     private final ConcurrentHashMap<Integer, RequestCounter> requestMap = new ConcurrentHashMap<Integer, RequestCounter>();
 
+    private final TaskRunnerFactory reconnectTaskFactory;
     private final TaskRunner reconnectTask;
     private boolean started;
 
@@ -157,7 +158,9 @@ public class FanoutTransport implements 
 
     public FanoutTransport() throws InterruptedIOException {
         // Setup a task that is used to reconnect the a connection async.
-        reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
+        reconnectTaskFactory = new TaskRunnerFactory();
+        reconnectTaskFactory.init();
+        reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() {
             public boolean iterate() {
                 return doConnect();
             }
@@ -291,27 +294,31 @@ public class FanoutTransport implements 
     }
 
     public void stop() throws Exception {
-        synchronized (reconnectMutex) {
-            ServiceStopper ss = new ServiceStopper();
+        try {
+            synchronized (reconnectMutex) {
+                ServiceStopper ss = new ServiceStopper();
 
-            if (!started) {
-                return;
-            }
-            started = false;
-            disposed = true;
-            connected=false;
+                if (!started) {
+                    return;
+                }
+                started = false;
+                disposed = true;
+                connected=false;
 
-            for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
-                FanoutTransportHandler th = iter.next();
-                if (th.transport != null) {
-                    ss.stop(th.transport);
+                for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
+                    FanoutTransportHandler th = iter.next();
+                    if (th.transport != null) {
+                        ss.stop(th.transport);
+                    }
                 }
-            }
 
-            LOG.debug("Stopped: " + this);
-            ss.throwFirstException();
+                LOG.debug("Stopped: " + this);
+                ss.throwFirstException();
+            }
+        } finally {
+            reconnectTask.shutdown();
+            reconnectTaskFactory.shutdownNow();
         }
-        reconnectTask.shutdown();
     }
 
 	public int getMinAckCount() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java?rev=1381985&r1=1381984&r2=1381985&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java Fri Sep  7 11:58:21 2012
@@ -37,7 +37,7 @@ import javax.net.ssl.SSLSession;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.thread.DefaultThreadPools;
+import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.wireformat.WireFormat;
@@ -52,9 +52,10 @@ public class NIOSSLTransport extends NIO
     protected SSLEngine sslEngine;
     protected SSLSession sslSession;
 
-    protected boolean handshakeInProgress = false;
+    protected volatile boolean handshakeInProgress = false;
     protected SSLEngineResult.Status status = null;
     protected SSLEngineResult.HandshakeStatus handshakeStatus = null;
+    protected TaskRunnerFactory taskRunnerFactory;
 
     public NIOSSLTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
         super(wireFormat, socketFactory, remoteLocation, localLocation);
@@ -259,7 +260,7 @@ public class NIOSSLTransport extends NIO
                 case NEED_TASK:
                     Runnable task;
                     while ((task = sslEngine.getDelegatedTask()) != null) {
-                        DefaultThreadPools.getDefaultTaskRunnerFactory().execute(task);
+                        taskRunnerFactory.execute(task);
                     }
                     break;
                 case NEED_WRAP:
@@ -274,7 +275,18 @@ public class NIOSSLTransport extends NIO
     }
 
     @Override
+    protected void doStart() throws Exception {
+        taskRunnerFactory = new TaskRunnerFactory("ActiveMQ NIOSSLTransport Task");
+        // no need to init as we can delay that until demand (eg in doHandshake)
+        super.doStart();
+    }
+
+    @Override
     protected void doStop(ServiceStopper stopper) throws Exception {
+        if (taskRunnerFactory != null) {
+            taskRunnerFactory.shutdownNow();
+            taskRunnerFactory = null;
+        }
         if (channel != null) {
             channel.close();
             channel = null;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=1381985&r1=1381984&r2=1381985&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Fri Sep  7 11:58:21 2012
@@ -37,7 +37,7 @@ import java.util.concurrent.atomic.Atomi
 import javax.net.SocketFactory;
 
 import org.apache.activemq.Service;
-import org.apache.activemq.thread.DefaultThreadPools;
+import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportLoggerFactory;
 import org.apache.activemq.transport.TransportThreadSupport;
@@ -536,13 +536,17 @@ public class TcpTransport extends Transp
                 //closing the socket can hang also
                 final CountDownLatch latch = new CountDownLatch(1);
 
-                DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
+                // need a async task for this
+                final TaskRunnerFactory taskRunnerFactory = new TaskRunnerFactory();
+                taskRunnerFactory.execute(new Runnable() {
                     public void run() {
+                        LOG.trace("Closing socket {}", socket);
                         try {
                             socket.close();
+                            LOG.debug("Closed socket {}", socket);
                         } catch (IOException e) {
                             if (LOG.isDebugEnabled()) {
-                                LOG.debug("Caught exception closing socket", e);
+                                LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e);
                             }
                         } finally {
                             latch.countDown();
@@ -554,14 +558,20 @@ public class TcpTransport extends Transp
                     latch.await(1,TimeUnit.SECONDS);
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
+                } finally {
+                    taskRunnerFactory.shutdownNow();
                 }
 
             } else {
-
+                // close synchronously
+                LOG.trace("Closing socket {}", socket);
                 try {
                     socket.close();
+                    LOG.debug("Closed socket {}", socket);
                 } catch (IOException e) {
-                    LOG.debug("Caught exception closing socket",e);
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e);
+                    }
                 }
             }
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=1381985&r1=1381984&r2=1381985&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Fri Sep  7 11:58:21 2012
@@ -26,9 +26,9 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.command.ShutdownInfo;
-import org.apache.activemq.thread.DefaultThreadPools;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
+import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.FutureResponse;
 import org.apache.activemq.transport.ResponseCallback;
 import org.apache.activemq.transport.Transport;
@@ -55,6 +55,7 @@ public class VMTransport implements Tran
 
     // Implementation
     private LinkedBlockingQueue<Object> messageQueue;
+    private TaskRunnerFactory taskRunnerFactory;
     private TaskRunner taskRunner;
 
     // Transport State
@@ -188,6 +189,7 @@ public class VMTransport implements Tran
                     tr.shutdown(TimeUnit.SECONDS.toMillis(1));
                 } catch(Exception e) {
                 }
+                taskRunner = null;
             }
 
             // let the peer know that we are disconnecting after attempting
@@ -197,6 +199,12 @@ public class VMTransport implements Tran
                 peer.transportListener.onCommand(new ShutdownInfo());
             } catch (Exception ignore) {
             }
+
+            // shutdown task runner factory
+            if (taskRunnerFactory != null) {
+                taskRunnerFactory.shutdownNow();
+                taskRunnerFactory = null;
+            }
         }
     }
 
@@ -280,7 +288,11 @@ public class VMTransport implements Tran
                         throw new TransportDisposedIOException("The Transport has been disposed");
                     }
 
-                    taskRunner = result = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(this, "VMTransport: " + toString());
+                    if (taskRunnerFactory == null) {
+                        taskRunnerFactory = new TaskRunnerFactory("ActiveMQ VMTransport: " + toString());
+                        taskRunnerFactory.init();
+                    }
+                    taskRunner = result = taskRunnerFactory.createTaskRunner(this, "VMTransport: " + toString());
                 }
             }
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java?rev=1381985&r1=1381984&r2=1381985&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java Fri Sep  7 11:58:21 2012
@@ -30,7 +30,7 @@ public final class ThreadPoolUtils {
 
     private static final Logger LOG = LoggerFactory.getLogger(ThreadPoolUtils.class);
 
-    public static final long DEFAULT_SHUTDOWN_AWAIT_TERMINATION = 30 * 1000L;
+    public static final long DEFAULT_SHUTDOWN_AWAIT_TERMINATION = 10 * 1000L;
 
     /**
      * Shutdown the given executor service only (ie not graceful shutdown).
@@ -38,7 +38,7 @@ public final class ThreadPoolUtils {
      * @see java.util.concurrent.ExecutorService#shutdown()
      */
     public static void shutdown(ExecutorService executorService) {
-        doShutdown(executorService, -1, true);
+        doShutdown(executorService, 0);
     }
 
     /**
@@ -70,7 +70,7 @@ public final class ThreadPoolUtils {
      * with a timeout value of {@link #DEFAULT_SHUTDOWN_AWAIT_TERMINATION} millis.
      */
     public static void shutdownGraceful(ExecutorService executorService) {
-        doShutdown(executorService, DEFAULT_SHUTDOWN_AWAIT_TERMINATION, false);
+        doShutdown(executorService, DEFAULT_SHUTDOWN_AWAIT_TERMINATION);
     }
 
     /**
@@ -83,62 +83,49 @@ public final class ThreadPoolUtils {
      * forces a shutdown. The parameter <tt>shutdownAwaitTermination</tt>
      * is used as timeout value waiting for orderly shutdown to
      * complete normally, before going aggressively.
-     * <p/>
-     * Notice if the given parameter <tt>shutdownAwaitTermination</tt> is negative, then a quick shutdown
-     * is commenced, by invoking the {@link java.util.concurrent.ExecutorService#shutdown()} method
-     * and then exit from this method (ie. no graceful shutdown is performed).
      *
      * @param executorService the executor service to shutdown
-     * @param shutdownAwaitTermination timeout in millis to wait for orderly shutdown, if the value if negative
-     *                                 then the thread pool is <b>not</b> graceful shutdown, but a regular shutdown
-     *                                 is commenced.
+     * @param shutdownAwaitTermination timeout in millis to wait for orderly shutdown
      */
     public static void shutdownGraceful(ExecutorService executorService, long shutdownAwaitTermination) {
-        doShutdown(executorService, shutdownAwaitTermination, false);
+        doShutdown(executorService, shutdownAwaitTermination);
     }
 
-    private static void doShutdown(ExecutorService executorService, long shutdownAwaitTermination, boolean quick) {
+    private static void doShutdown(ExecutorService executorService, long shutdownAwaitTermination) {
         // code from Apache Camel - org.apache.camel.impl.DefaultExecutorServiceManager
 
         if (executorService == null) {
             return;
         }
 
-        if (quick) {
-            // do not shutdown graceful, but just quick shutdown on the thread pool
-            executorService.shutdown();
-            LOG.debug("Quick shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.",
-                    new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated()});
-            return;
-        }
-
-        if (shutdownAwaitTermination <= 0) {
-            throw new IllegalArgumentException("ShutdownAwaitTermination must be a positive number, was: " + shutdownAwaitTermination);
-        }
-
         // shutting down a thread pool is a 2 step process. First we try graceful, and if that fails, then we go more aggressively
         // and try shutting down again. In both cases we wait at most the given shutdown timeout value given
-        // (total wait could then be 2 x shutdownAwaitTermination)
-        boolean warned = false;
-        StopWatch watch = new StopWatch();
+        // (total wait could then be 2 x shutdownAwaitTermination, but when we shutdown the 2nd time we are aggressive and thus
+        // we ought to shutdown much faster)
         if (!executorService.isShutdown()) {
+            boolean warned = false;
+            StopWatch watch = new StopWatch();
+
             LOG.trace("Shutdown of ExecutorService: {} with await termination: {} millis", executorService, shutdownAwaitTermination);
             executorService.shutdown();
-            try {
-                if (!awaitTermination(executorService, shutdownAwaitTermination)) {
-                    warned = true;
-                    LOG.warn("Forcing shutdown of ExecutorService: {} due first await termination elapsed.", executorService);
-                    executorService.shutdownNow();
-                    // we are now shutting down aggressively, so wait to see if we can completely shutdown or not
+
+            if (shutdownAwaitTermination > 0) {
+                try {
                     if (!awaitTermination(executorService, shutdownAwaitTermination)) {
-                        LOG.warn("Cannot completely force shutdown of ExecutorService: {} due second await termination elapsed.", executorService);
+                        warned = true;
+                        LOG.warn("Forcing shutdown of ExecutorService: {} due first await termination elapsed.", executorService);
+                        executorService.shutdownNow();
+                        // we are now shutting down aggressively, so wait to see if we can completely shutdown or not
+                        if (!awaitTermination(executorService, shutdownAwaitTermination)) {
+                            LOG.warn("Cannot completely force shutdown of ExecutorService: {} due second await termination elapsed.", executorService);
+                        }
                     }
+                } catch (InterruptedException e) {
+                    warned = true;
+                    LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.", executorService);
+                    // we were interrupted during shutdown, so force shutdown
+                    executorService.shutdownNow();
                 }
-            } catch (InterruptedException e) {
-                warned = true;
-                LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.", executorService);
-                // we were interrupted during shutdown, so force shutdown
-                executorService.shutdownNow();
             }
 
             // if we logged at WARN level, then report at INFO level when we are complete so the end user can see this in the log
@@ -155,8 +142,8 @@ public final class ThreadPoolUtils {
     /**
      * Awaits the termination of the thread pool.
      * <p/>
-     * This implementation will log every 5th second at INFO level that we are waiting, so the end user
-     * can see we are not hanging in case it takes longer time to shutdown the pool.
+     * This implementation will log every 2nd second at INFO level that we are waiting, so the end user
+     * can see we are not hanging in case it takes longer time to terminate the pool.
      *
      * @param executorService            the thread pool
      * @param shutdownAwaitTermination   time in millis to use as timeout
@@ -166,15 +153,15 @@ public final class ThreadPoolUtils {
     public static boolean awaitTermination(ExecutorService executorService, long shutdownAwaitTermination) throws InterruptedException {
         // log progress every 5th second so end user is aware of we are shutting down
         StopWatch watch = new StopWatch();
-        long interval = Math.min(5000, shutdownAwaitTermination);
+        long interval = Math.min(2000, shutdownAwaitTermination);
         boolean done = false;
         while (!done && interval > 0) {
             if (executorService.awaitTermination(interval, TimeUnit.MILLISECONDS)) {
                 done = true;
             } else {
-                LOG.info("Waited {} for ExecutorService: {} to shutdown...", TimeUtils.printDuration(watch.taken()), executorService);
+                LOG.info("Waited {} for ExecutorService: {} to terminate...", TimeUtils.printDuration(watch.taken()), executorService);
                 // recalculate interval
-                interval = Math.min(5000, shutdownAwaitTermination - watch.taken());
+                interval = Math.min(2000, shutdownAwaitTermination - watch.taken());
             }
         }