You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by da...@apache.org on 2012/09/07 13:58:22 UTC
svn commit: r1381985 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/
broker/jmx/ transport/failover/ transport/fanout/ transport/nio/
transport/tcp/ transport/vm/ util/
Author: davsclaus
Date: Fri Sep 7 11:58:21 2012
New Revision: 1381985
URL: http://svn.apache.org/viewvc?rev=1381985&view=rev
Log:
AMQ-3451: Ensure thread pools is shutdown properly to avoid any leaks. Do not use the old @deprecated thread pool.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1381985&r1=1381984&r2=1381985&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Fri Sep 7 11:58:21 2012
@@ -52,7 +52,6 @@ import org.apache.activemq.state.Consume
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.state.SessionState;
import org.apache.activemq.state.TransactionState;
-import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
@@ -117,6 +116,7 @@ public class TransportConnection impleme
private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
private DemandForwardingBridge duplexBridge;
private final TaskRunnerFactory taskRunnerFactory;
+ private final TaskRunnerFactory stopTaskRunnerFactory;
private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
private String duplexNetworkConnectorId;
@@ -125,18 +125,20 @@ public class TransportConnection impleme
/**
* @param taskRunnerFactory - can be null if you want direct dispatch to the transport
* else commands are sent async.
+ * @param stopTaskRunnerFactory - can <b>not</b> be null, used for stopping this connection.
*/
public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
- TaskRunnerFactory taskRunnerFactory) {
+ TaskRunnerFactory taskRunnerFactory, TaskRunnerFactory stopTaskRunnerFactory) {
this.connector = connector;
this.broker = broker;
- this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
brokerConnectionStates = rb.getConnectionStates();
if (connector != null) {
this.statistics.setParent(connector.getStatistics());
+ this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
}
this.taskRunnerFactory = taskRunnerFactory;
+ this.stopTaskRunnerFactory = stopTaskRunnerFactory;
this.transport = transport;
this.transport.setTransportListener(new DefaultTransportListener() {
@Override
@@ -939,6 +941,9 @@ public class TransportConnection impleme
}
public void stop() throws Exception {
+ // do not stop task the task runner factories (taskRunnerFactory, stopTaskRunnerFactory)
+ // as their lifecycle is handled elsewhere
+
stopAsync();
while (!stopped.await(5, TimeUnit.SECONDS)) {
LOG.info("The connection to '" + transport.getRemoteAddress() + "' is taking a long time to shutdown.");
@@ -952,7 +957,7 @@ public class TransportConnection impleme
stopError = cause;
}
try {
- DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
+ stopTaskRunnerFactory.execute(new Runnable() {
public void run() {
try {
Thread.sleep(waitTime);
@@ -961,9 +966,9 @@ public class TransportConnection impleme
} catch (InterruptedException e) {
}
}
- }, "delayedStop:" + transport.getRemoteAddress());
+ });
} catch (Throwable t) {
- LOG.warn("cannot create stopAsync :", t);
+ LOG.warn("Cannot create stopAsync. This exception will be ignored.", t);
}
}
}
@@ -988,7 +993,7 @@ public class TransportConnection impleme
}
}
try {
- DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
+ stopTaskRunnerFactory.execute(new Runnable() {
public void run() {
serviceLock.writeLock().lock();
try {
@@ -1000,9 +1005,9 @@ public class TransportConnection impleme
serviceLock.writeLock().unlock();
}
}
- }, "StopAsync:" + transport.getRemoteAddress());
+ });
} catch (Throwable t) {
- LOG.warn("cannot create async transport stopper thread.. not waiting for stop to complete, reason:", t);
+ LOG.warn("Cannot create async transport stopper thread. This exception is ignored. Not waiting for stop to complete", t);
stopped.countDown();
}
}
@@ -1013,8 +1018,8 @@ public class TransportConnection impleme
return "Transport Connection to: " + transport.getRemoteAddress();
}
- protected void doStop() throws Exception, InterruptedException {
- LOG.debug("Stopping connection: " + transport.getRemoteAddress());
+ protected void doStop() throws Exception {
+ LOG.debug("Stopping connection: {}", transport.getRemoteAddress());
connector.onStopped(this);
try {
synchronized (this) {
@@ -1026,16 +1031,17 @@ public class TransportConnection impleme
}
}
} catch (Exception ignore) {
- LOG.trace("Exception caught stopping", ignore);
+ LOG.trace("Exception caught stopping. This exception is ignored.", ignore);
}
try {
transport.stop();
LOG.debug("Stopped transport: " + transport.getRemoteAddress());
} catch (Exception e) {
- LOG.debug("Could not stop transport: " + e, e);
+ LOG.debug("Could not stop transport to " + transport.getRemoteAddress() + ". This exception is ignored.", e);
}
if (taskRunner != null) {
taskRunner.shutdown(1);
+ taskRunner = null;
}
active = false;
// Run the MessageDispatch callbacks so that message references get
@@ -1063,14 +1069,14 @@ public class TransportConnection impleme
for (TransportConnectionState cs : connectionStates) {
cs.getContext().getStopping().set(true);
try {
- LOG.debug("Cleaning up connection resources: " + getRemoteAddress());
+ LOG.debug("Cleaning up connection resources: {}", getRemoteAddress());
processRemoveConnection(cs.getInfo().getConnectionId(), 0l);
} catch (Throwable ignore) {
ignore.printStackTrace();
}
}
}
- LOG.debug("Connection Stopped: " + getRemoteAddress());
+ LOG.debug("Connection Stopped: {}", getRemoteAddress());
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=1381985&r1=1381984&r2=1381985&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java Fri Sep 7 11:58:21 2012
@@ -32,7 +32,6 @@ import org.apache.activemq.broker.region
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.security.MessageAuthorizationPolicy;
-import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener;
@@ -220,7 +219,7 @@ public class TransportConnector implemen
getServer().setAcceptListener(new TransportAcceptListener() {
public void onAccept(final Transport transport) {
try {
- DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
+ brokerService.getTaskRunnerFactory().execute(new Runnable() {
public void run() {
try {
Connection connection = createConnection(transport);
@@ -310,8 +309,10 @@ public class TransportConnector implemen
// Implementation methods
// -------------------------------------------------------------------------
protected Connection createConnection(Transport transport) throws IOException {
+ // prefer to use task runner from broker service as stop task runner, as we can then
+ // tie it to the lifecycle of the broker service
TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null
- : taskRunnerFactory);
+ : taskRunnerFactory, brokerService.getTaskRunnerFactory());
boolean statEnabled = this.getStatistics().isEnabled();
answer.getStatistics().setEnabled(statEnabled);
answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java?rev=1381985&r1=1381984&r2=1381985&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java Fri Sep 7 11:58:21 2012
@@ -49,9 +49,10 @@ public class ManagedTransportConnection
private final boolean populateUserName;
public ManagedTransportConnection(TransportConnector connector, Transport transport, Broker broker,
- TaskRunnerFactory factory, ManagementContext context, ObjectName connectorName)
+ TaskRunnerFactory factory, TaskRunnerFactory stopFactory,
+ ManagementContext context, ObjectName connectorName)
throws IOException {
- super(connector, transport, broker, factory);
+ super(connector, transport, broker, factory, stopFactory);
this.managementContext = context;
this.connectorName = connectorName;
this.mbean = new ConnectionView(this, managementContext);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java?rev=1381985&r1=1381984&r2=1381985&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java Fri Sep 7 11:58:21 2012
@@ -49,7 +49,10 @@ public class ManagedTransportConnector e
}
protected Connection createConnection(Transport transport) throws IOException {
- return new ManagedTransportConnection(this, transport, getBroker(), isDisableAsyncDispatch() ? null : getTaskRunnerFactory(), managementContext, connectorName);
+ // prefer to use task runner from broker service as stop task runner, as we can then
+ // tie it to the lifecycle of the broker service
+ return new ManagedTransportConnection(this, transport, getBroker(), isDisableAsyncDispatch() ? null : getTaskRunnerFactory(),
+ getBrokerService().getTaskRunnerFactory(), managementContext, connectorName);
}
protected static synchronized long getNextConnectionId() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=1381985&r1=1381984&r2=1381985&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Fri Sep 7 11:58:21 2012
@@ -46,9 +46,9 @@ import org.apache.activemq.command.Remov
import org.apache.activemq.command.Response;
import org.apache.activemq.state.ConnectionStateTracker;
import org.apache.activemq.state.Tracked;
-import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
+import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.CompositeTransport;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse;
@@ -86,6 +86,7 @@ public class FailoverTransport implement
private URI connectedTransportURI;
private URI failedConnectTransportURI;
private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>();
+ private final TaskRunnerFactory reconnectTaskFactory;
private final TaskRunner reconnectTask;
private boolean started;
private boolean initialized;
@@ -128,7 +129,9 @@ public class FailoverTransport implement
brokerSslContext = SslContext.getCurrentSslContext();
stateTracker.setTrackTransactions(true);
// Setup a task that is used to reconnect the a connection async.
- reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
+ reconnectTaskFactory = new TaskRunnerFactory();
+ reconnectTaskFactory.init();
+ reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() {
public boolean iterate() {
boolean result = false;
if (!started) {
@@ -345,26 +348,31 @@ public class FailoverTransport implement
Transport transportToStop = null;
List<Transport> backupsToStop = new ArrayList<Transport>(backups.size());
- synchronized (reconnectMutex) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Stopped " + this);
- }
- if (!started) {
- return;
- }
- started = false;
- disposed = true;
- connected = false;
+ try {
+ synchronized (reconnectMutex) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stopped " + this);
+ }
+ if (!started) {
+ return;
+ }
+ started = false;
+ disposed = true;
+ connected = false;
- if (connectedTransport.get() != null) {
- transportToStop = connectedTransport.getAndSet(null);
+ if (connectedTransport.get() != null) {
+ transportToStop = connectedTransport.getAndSet(null);
+ }
+ reconnectMutex.notifyAll();
}
- reconnectMutex.notifyAll();
- }
- synchronized (sleepMutex) {
- sleepMutex.notifyAll();
+ synchronized (sleepMutex) {
+ sleepMutex.notifyAll();
+ }
+ } finally {
+ reconnectTask.shutdown();
+ reconnectTaskFactory.shutdownNow();
}
- reconnectTask.shutdown();
+
synchronized(backupMutex) {
for (BackupTransport backup : backups) {
backup.setDisposed(true);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java?rev=1381985&r1=1381984&r2=1381985&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java Fri Sep 7 11:58:21 2012
@@ -30,9 +30,9 @@ import org.apache.activemq.command.Messa
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.state.ConnectionStateTracker;
-import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
+import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.CompositeTransport;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse;
@@ -63,6 +63,7 @@ public class FanoutTransport implements
private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
private final ConcurrentHashMap<Integer, RequestCounter> requestMap = new ConcurrentHashMap<Integer, RequestCounter>();
+ private final TaskRunnerFactory reconnectTaskFactory;
private final TaskRunner reconnectTask;
private boolean started;
@@ -157,7 +158,9 @@ public class FanoutTransport implements
public FanoutTransport() throws InterruptedIOException {
// Setup a task that is used to reconnect the a connection async.
- reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
+ reconnectTaskFactory = new TaskRunnerFactory();
+ reconnectTaskFactory.init();
+ reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() {
public boolean iterate() {
return doConnect();
}
@@ -291,27 +294,31 @@ public class FanoutTransport implements
}
public void stop() throws Exception {
- synchronized (reconnectMutex) {
- ServiceStopper ss = new ServiceStopper();
+ try {
+ synchronized (reconnectMutex) {
+ ServiceStopper ss = new ServiceStopper();
- if (!started) {
- return;
- }
- started = false;
- disposed = true;
- connected=false;
+ if (!started) {
+ return;
+ }
+ started = false;
+ disposed = true;
+ connected=false;
- for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
- FanoutTransportHandler th = iter.next();
- if (th.transport != null) {
- ss.stop(th.transport);
+ for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
+ FanoutTransportHandler th = iter.next();
+ if (th.transport != null) {
+ ss.stop(th.transport);
+ }
}
- }
- LOG.debug("Stopped: " + this);
- ss.throwFirstException();
+ LOG.debug("Stopped: " + this);
+ ss.throwFirstException();
+ }
+ } finally {
+ reconnectTask.shutdown();
+ reconnectTaskFactory.shutdownNow();
}
- reconnectTask.shutdown();
}
public int getMinAckCount() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java?rev=1381985&r1=1381984&r2=1381985&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java Fri Sep 7 11:58:21 2012
@@ -37,7 +37,7 @@ import javax.net.ssl.SSLSession;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.thread.DefaultThreadPools;
+import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
@@ -52,9 +52,10 @@ public class NIOSSLTransport extends NIO
protected SSLEngine sslEngine;
protected SSLSession sslSession;
- protected boolean handshakeInProgress = false;
+ protected volatile boolean handshakeInProgress = false;
protected SSLEngineResult.Status status = null;
protected SSLEngineResult.HandshakeStatus handshakeStatus = null;
+ protected TaskRunnerFactory taskRunnerFactory;
public NIOSSLTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
super(wireFormat, socketFactory, remoteLocation, localLocation);
@@ -259,7 +260,7 @@ public class NIOSSLTransport extends NIO
case NEED_TASK:
Runnable task;
while ((task = sslEngine.getDelegatedTask()) != null) {
- DefaultThreadPools.getDefaultTaskRunnerFactory().execute(task);
+ taskRunnerFactory.execute(task);
}
break;
case NEED_WRAP:
@@ -274,7 +275,18 @@ public class NIOSSLTransport extends NIO
}
@Override
+ protected void doStart() throws Exception {
+ taskRunnerFactory = new TaskRunnerFactory("ActiveMQ NIOSSLTransport Task");
+ // no need to init as we can delay that until demand (eg in doHandshake)
+ super.doStart();
+ }
+
+ @Override
protected void doStop(ServiceStopper stopper) throws Exception {
+ if (taskRunnerFactory != null) {
+ taskRunnerFactory.shutdownNow();
+ taskRunnerFactory = null;
+ }
if (channel != null) {
channel.close();
channel = null;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=1381985&r1=1381984&r2=1381985&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Fri Sep 7 11:58:21 2012
@@ -37,7 +37,7 @@ import java.util.concurrent.atomic.Atomi
import javax.net.SocketFactory;
import org.apache.activemq.Service;
-import org.apache.activemq.thread.DefaultThreadPools;
+import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportLoggerFactory;
import org.apache.activemq.transport.TransportThreadSupport;
@@ -536,13 +536,17 @@ public class TcpTransport extends Transp
//closing the socket can hang also
final CountDownLatch latch = new CountDownLatch(1);
- DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
+ // need a async task for this
+ final TaskRunnerFactory taskRunnerFactory = new TaskRunnerFactory();
+ taskRunnerFactory.execute(new Runnable() {
public void run() {
+ LOG.trace("Closing socket {}", socket);
try {
socket.close();
+ LOG.debug("Closed socket {}", socket);
} catch (IOException e) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Caught exception closing socket", e);
+ LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e);
}
} finally {
latch.countDown();
@@ -554,14 +558,20 @@ public class TcpTransport extends Transp
latch.await(1,TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
+ } finally {
+ taskRunnerFactory.shutdownNow();
}
} else {
-
+ // close synchronously
+ LOG.trace("Closing socket {}", socket);
try {
socket.close();
+ LOG.debug("Closed socket {}", socket);
} catch (IOException e) {
- LOG.debug("Caught exception closing socket",e);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e);
+ }
}
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=1381985&r1=1381984&r2=1381985&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Fri Sep 7 11:58:21 2012
@@ -26,9 +26,9 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.command.ShutdownInfo;
-import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
+import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
@@ -55,6 +55,7 @@ public class VMTransport implements Tran
// Implementation
private LinkedBlockingQueue<Object> messageQueue;
+ private TaskRunnerFactory taskRunnerFactory;
private TaskRunner taskRunner;
// Transport State
@@ -188,6 +189,7 @@ public class VMTransport implements Tran
tr.shutdown(TimeUnit.SECONDS.toMillis(1));
} catch(Exception e) {
}
+ taskRunner = null;
}
// let the peer know that we are disconnecting after attempting
@@ -197,6 +199,12 @@ public class VMTransport implements Tran
peer.transportListener.onCommand(new ShutdownInfo());
} catch (Exception ignore) {
}
+
+ // shutdown task runner factory
+ if (taskRunnerFactory != null) {
+ taskRunnerFactory.shutdownNow();
+ taskRunnerFactory = null;
+ }
}
}
@@ -280,7 +288,11 @@ public class VMTransport implements Tran
throw new TransportDisposedIOException("The Transport has been disposed");
}
- taskRunner = result = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(this, "VMTransport: " + toString());
+ if (taskRunnerFactory == null) {
+ taskRunnerFactory = new TaskRunnerFactory("ActiveMQ VMTransport: " + toString());
+ taskRunnerFactory.init();
+ }
+ taskRunner = result = taskRunnerFactory.createTaskRunner(this, "VMTransport: " + toString());
}
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java?rev=1381985&r1=1381984&r2=1381985&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java Fri Sep 7 11:58:21 2012
@@ -30,7 +30,7 @@ public final class ThreadPoolUtils {
private static final Logger LOG = LoggerFactory.getLogger(ThreadPoolUtils.class);
- public static final long DEFAULT_SHUTDOWN_AWAIT_TERMINATION = 30 * 1000L;
+ public static final long DEFAULT_SHUTDOWN_AWAIT_TERMINATION = 10 * 1000L;
/**
* Shutdown the given executor service only (ie not graceful shutdown).
@@ -38,7 +38,7 @@ public final class ThreadPoolUtils {
* @see java.util.concurrent.ExecutorService#shutdown()
*/
public static void shutdown(ExecutorService executorService) {
- doShutdown(executorService, -1, true);
+ doShutdown(executorService, 0);
}
/**
@@ -70,7 +70,7 @@ public final class ThreadPoolUtils {
* with a timeout value of {@link #DEFAULT_SHUTDOWN_AWAIT_TERMINATION} millis.
*/
public static void shutdownGraceful(ExecutorService executorService) {
- doShutdown(executorService, DEFAULT_SHUTDOWN_AWAIT_TERMINATION, false);
+ doShutdown(executorService, DEFAULT_SHUTDOWN_AWAIT_TERMINATION);
}
/**
@@ -83,62 +83,49 @@ public final class ThreadPoolUtils {
* forces a shutdown. The parameter <tt>shutdownAwaitTermination</tt>
* is used as timeout value waiting for orderly shutdown to
* complete normally, before going aggressively.
- * <p/>
- * Notice if the given parameter <tt>shutdownAwaitTermination</tt> is negative, then a quick shutdown
- * is commenced, by invoking the {@link java.util.concurrent.ExecutorService#shutdown()} method
- * and then exit from this method (ie. no graceful shutdown is performed).
*
* @param executorService the executor service to shutdown
- * @param shutdownAwaitTermination timeout in millis to wait for orderly shutdown, if the value if negative
- * then the thread pool is <b>not</b> graceful shutdown, but a regular shutdown
- * is commenced.
+ * @param shutdownAwaitTermination timeout in millis to wait for orderly shutdown
*/
public static void shutdownGraceful(ExecutorService executorService, long shutdownAwaitTermination) {
- doShutdown(executorService, shutdownAwaitTermination, false);
+ doShutdown(executorService, shutdownAwaitTermination);
}
- private static void doShutdown(ExecutorService executorService, long shutdownAwaitTermination, boolean quick) {
+ private static void doShutdown(ExecutorService executorService, long shutdownAwaitTermination) {
// code from Apache Camel - org.apache.camel.impl.DefaultExecutorServiceManager
if (executorService == null) {
return;
}
- if (quick) {
- // do not shutdown graceful, but just quick shutdown on the thread pool
- executorService.shutdown();
- LOG.debug("Quick shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.",
- new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated()});
- return;
- }
-
- if (shutdownAwaitTermination <= 0) {
- throw new IllegalArgumentException("ShutdownAwaitTermination must be a positive number, was: " + shutdownAwaitTermination);
- }
-
// shutting down a thread pool is a 2 step process. First we try graceful, and if that fails, then we go more aggressively
// and try shutting down again. In both cases we wait at most the given shutdown timeout value given
- // (total wait could then be 2 x shutdownAwaitTermination)
- boolean warned = false;
- StopWatch watch = new StopWatch();
+ // (total wait could then be 2 x shutdownAwaitTermination, but when we shutdown the 2nd time we are aggressive and thus
+ // we ought to shutdown much faster)
if (!executorService.isShutdown()) {
+ boolean warned = false;
+ StopWatch watch = new StopWatch();
+
LOG.trace("Shutdown of ExecutorService: {} with await termination: {} millis", executorService, shutdownAwaitTermination);
executorService.shutdown();
- try {
- if (!awaitTermination(executorService, shutdownAwaitTermination)) {
- warned = true;
- LOG.warn("Forcing shutdown of ExecutorService: {} due first await termination elapsed.", executorService);
- executorService.shutdownNow();
- // we are now shutting down aggressively, so wait to see if we can completely shutdown or not
+
+ if (shutdownAwaitTermination > 0) {
+ try {
if (!awaitTermination(executorService, shutdownAwaitTermination)) {
- LOG.warn("Cannot completely force shutdown of ExecutorService: {} due second await termination elapsed.", executorService);
+ warned = true;
+ LOG.warn("Forcing shutdown of ExecutorService: {} due first await termination elapsed.", executorService);
+ executorService.shutdownNow();
+ // we are now shutting down aggressively, so wait to see if we can completely shutdown or not
+ if (!awaitTermination(executorService, shutdownAwaitTermination)) {
+ LOG.warn("Cannot completely force shutdown of ExecutorService: {} due second await termination elapsed.", executorService);
+ }
}
+ } catch (InterruptedException e) {
+ warned = true;
+ LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.", executorService);
+ // we were interrupted during shutdown, so force shutdown
+ executorService.shutdownNow();
}
- } catch (InterruptedException e) {
- warned = true;
- LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.", executorService);
- // we were interrupted during shutdown, so force shutdown
- executorService.shutdownNow();
}
// if we logged at WARN level, then report at INFO level when we are complete so the end user can see this in the log
@@ -155,8 +142,8 @@ public final class ThreadPoolUtils {
/**
* Awaits the termination of the thread pool.
* <p/>
- * This implementation will log every 5th second at INFO level that we are waiting, so the end user
- * can see we are not hanging in case it takes longer time to shutdown the pool.
+ * This implementation will log every 2nd second at INFO level that we are waiting, so the end user
+ * can see we are not hanging in case it takes longer time to terminate the pool.
*
* @param executorService the thread pool
* @param shutdownAwaitTermination time in millis to use as timeout
@@ -166,15 +153,15 @@ public final class ThreadPoolUtils {
public static boolean awaitTermination(ExecutorService executorService, long shutdownAwaitTermination) throws InterruptedException {
// log progress every 5th second so end user is aware of we are shutting down
StopWatch watch = new StopWatch();
- long interval = Math.min(5000, shutdownAwaitTermination);
+ long interval = Math.min(2000, shutdownAwaitTermination);
boolean done = false;
while (!done && interval > 0) {
if (executorService.awaitTermination(interval, TimeUnit.MILLISECONDS)) {
done = true;
} else {
- LOG.info("Waited {} for ExecutorService: {} to shutdown...", TimeUtils.printDuration(watch.taken()), executorService);
+ LOG.info("Waited {} for ExecutorService: {} to terminate...", TimeUtils.printDuration(watch.taken()), executorService);
// recalculate interval
- interval = Math.min(5000, shutdownAwaitTermination - watch.taken());
+ interval = Math.min(2000, shutdownAwaitTermination - watch.taken());
}
}