You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by da...@apache.org on 2012/09/06 19:50:44 UTC
svn commit: r1381695 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ./ broker/
broker/region/ network/ network/jms/ store/journal/ store/kahadb/ thread/
transport/ transport/discovery/multicast/ transport/mqtt/ util/
Author: davsclaus
Date: Thu Sep 6 17:50:43 2012
New Revision: 1381695
URL: http://svn.apache.org/viewvc?rev=1381695&view=rev
Log:
AMQ-4026: Using ThreadPoolUtils to shutdown thread pool. Use thread pool from broker service where applicable.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=1381695&r1=1381694&r2=1381695&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Thu Sep 6 17:50:43 2012
@@ -103,6 +103,7 @@ import org.apache.activemq.util.Introspe
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.ServiceSupport;
+import org.apache.activemq.util.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -692,10 +693,10 @@ public class ActiveMQConnection implemen
} finally {
try {
if (executor != null) {
- executor.shutdown();
+ ThreadPoolUtils.shutdown(executor);
}
} catch (Throwable e) {
- LOG.error("Error shutting down thread pool " + e, e);
+ LOG.warn("Error shutting down thread pool: " + executor + ". This exception will be ignored.", e);
}
ServiceSupport.dispose(this.transport);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=1381695&r1=1381694&r2=1381695&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Thu Sep 6 17:50:43 2012
@@ -26,7 +26,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -61,6 +60,7 @@ import org.apache.activemq.transaction.S
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.activemq.util.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -769,12 +769,8 @@ public class ActiveMQMessageConsumer imp
}
}
if (executorService != null) {
- executorService.shutdown();
- try {
- executorService.awaitTermination(60, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ ThreadPoolUtils.shutdownGraceful(executorService, 60000L);
+ executorService = null;
}
if (session.isClientAcknowledge()) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1381695&r1=1381694&r2=1381695&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Thu Sep 6 17:50:43 2012
@@ -111,6 +111,7 @@ import org.apache.activemq.util.IOHelper
import org.apache.activemq.util.InetAddressUtil;
import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.ThreadPoolUtils;
import org.apache.activemq.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -769,7 +770,7 @@ public class BrokerService implements Se
this.taskRunnerFactory = null;
}
if (this.executor != null) {
- this.executor.shutdownNow();
+ ThreadPoolUtils.shutdownNow(executor);
this.executor = null;
}
@@ -2410,8 +2411,7 @@ public class BrokerService implements Se
}
if (networkConnectorStartExecutor != null) {
// executor done when enqueued tasks are complete
- networkConnectorStartExecutor.shutdown();
- networkConnectorStartExecutor = null;
+ ThreadPoolUtils.shutdown(networkConnectorStartExecutor);
}
for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
@@ -2755,7 +2755,7 @@ public class BrokerService implements Se
/**
* Sets whether Authenticated User Name information is shown in MBeans that support this field.
- * @param true if MBeans should expose user name information.
+ * @param value if MBeans should expose user name information.
*/
public void setPopulateUserNameInMBeans(boolean value) {
this.populateUserNameInMBeans = value;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1381695&r1=1381694&r2=1381695&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Sep 6 17:50:43 2012
@@ -86,6 +86,7 @@ import org.apache.activemq.transaction.S
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
import org.apache.activemq.util.BrokerSupport;
+import org.apache.activemq.util.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -883,7 +884,8 @@ public class Queue extends BaseDestinati
taskRunner.shutdown();
}
if (this.executor != null) {
- this.executor.shutdownNow();
+ ThreadPoolUtils.shutdownNow(executor);
+ executor = null;
}
scheduler.cancel(expireMessagesTask);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1381695&r1=1381694&r2=1381695&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Thu Sep 6 17:50:43 2012
@@ -70,7 +70,6 @@ import org.apache.activemq.command.Shutd
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.MessageEvaluationContext;
-import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
@@ -92,7 +91,6 @@ import org.slf4j.LoggerFactory;
*/
public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
- private TaskRunnerFactory asyncTaskRunner;
protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
protected final Transport localBroker;
protected final Transport remoteBroker;
@@ -156,8 +154,10 @@ public abstract class DemandForwardingBr
public void start() throws Exception {
if (started.compareAndSet(false, true)) {
- asyncTaskRunner = new TaskRunnerFactory("ActiveMQ ForwardingBridge Task");
- asyncTaskRunner.init();
+
+ if (brokerService == null) {
+ throw new IllegalArgumentException("BrokerService is null on " + this);
+ }
localBroker.setTransportListener(new DefaultTransportListener() {
@@ -201,7 +201,7 @@ public abstract class DemandForwardingBr
}
protected void triggerLocalStartBridge() throws IOException {
- asyncTaskRunner.execute(new Runnable() {
+ brokerService.getTaskRunnerFactory().execute(new Runnable() {
public void run() {
final String originalName = Thread.currentThread().getName();
Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker);
@@ -217,7 +217,7 @@ public abstract class DemandForwardingBr
}
protected void triggerRemoteStartBridge() throws IOException {
- asyncTaskRunner.execute(new Runnable() {
+ brokerService.getTaskRunnerFactory().execute(new Runnable() {
public void run() {
final String originalName = Thread.currentThread().getName();
Thread.currentThread().setName("StartRemoteBridge: remoteBroker=" + remoteBroker);
@@ -350,7 +350,8 @@ public abstract class DemandForwardingBr
try {
remoteBridgeStarted.set(false);
final CountDownLatch sendShutdown = new CountDownLatch(1);
- asyncTaskRunner.execute(new Runnable() {
+
+ brokerService.getTaskRunnerFactory().execute(new Runnable() {
public void run() {
try {
localBroker.oneway(new ShutdownInfo());
@@ -363,7 +364,8 @@ public abstract class DemandForwardingBr
}
}
- });
+ }, "ActiveMQ ForwardingBridge StopTask");
+
if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
LOG.info("Network Could not shutdown in a timely manner");
}
@@ -377,9 +379,6 @@ public abstract class DemandForwardingBr
startedLatch.countDown();
localStartedLatch.countDown();
- // stop task runner
- asyncTaskRunner.shutdown();
- asyncTaskRunner = null;
ss.throwFirstException();
}
}
@@ -399,7 +398,7 @@ public abstract class DemandForwardingBr
LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
}
LOG.debug("The remote Exception was: " + error, error);
- asyncTaskRunner.execute(new Runnable() {
+ brokerService.getTaskRunnerFactory().execute(new Runnable() {
public void run() {
ServiceSupport.dispose(getControllingService());
}
@@ -632,7 +631,7 @@ public abstract class DemandForwardingBr
if (!disposed.get()) {
LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error);
LOG.debug("The local Exception was:" + error, error);
- asyncTaskRunner.execute(new Runnable() {
+ brokerService.getTaskRunnerFactory().execute(new Runnable() {
public void run() {
ServiceSupport.dispose(getControllingService());
}
@@ -660,7 +659,7 @@ public abstract class DemandForwardingBr
subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
// continue removal in separate thread to free up this thread for outstanding responses
- asyncTaskRunner.execute(new Runnable() {
+ brokerService.getTaskRunnerFactory().execute(new Runnable() {
public void run() {
sub.waitForCompletion();
try {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java?rev=1381695&r1=1381694&r2=1381695&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java Thu Sep 6 17:50:43 2012
@@ -35,6 +35,7 @@ import org.apache.activemq.ActiveMQConne
import org.apache.activemq.Service;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.LRUCache;
+import org.apache.activemq.util.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jndi.JndiTemplate;
@@ -166,7 +167,8 @@ public abstract class JmsConnector imple
public void stop() throws Exception {
if (started.compareAndSet(true, false)) {
- this.connectionSerivce.shutdown();
+ ThreadPoolUtils.shutdown(connectionSerivce);
+ connectionSerivce = null;
for (DestinationBridge bridge : inboundBridges) {
bridge.stop();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?rev=1381695&r1=1381694&r2=1381695&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Thu Sep 6 17:50:43 2012
@@ -69,6 +69,7 @@ import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ThreadPoolUtils;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -287,7 +288,8 @@ public class JournalPersistenceAdapter i
// Take one final checkpoint and stop checkpoint processing.
checkpoint(true, true);
checkpointTask.shutdown();
- checkpointExecutor.shutdown();
+ ThreadPoolUtils.shutdown(checkpointExecutor);
+ checkpointExecutor = null;
queues.clear();
topics.clear();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1381695&r1=1381694&r2=1381695&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Thu Sep 6 17:50:43 2012
@@ -64,6 +64,7 @@ import org.apache.activemq.store.kahadb.
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.ThreadPoolUtils;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -237,10 +238,12 @@ public class KahaDBStore extends Message
this.globalTopicSemaphore.drainPermits();
}
if (this.queueExecutor != null) {
- this.queueExecutor.shutdownNow();
+ ThreadPoolUtils.shutdownNow(queueExecutor);
+ queueExecutor = null;
}
if (this.topicExecutor != null) {
- this.topicExecutor.shutdownNow();
+ ThreadPoolUtils.shutdownNow(topicExecutor);
+ topicExecutor = null;
}
LOG.info("Stopped KahaDB");
super.doStop(stopper);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java?rev=1381695&r1=1381694&r2=1381695&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java Thu Sep 6 17:50:43 2012
@@ -92,9 +92,40 @@ public class TaskRunnerFactory implement
}
}
+ /**
+ * Performs a shutdown only, by which the thread pool is shutdown by not graceful nor aggressively.
+ *
+ * @see ThreadPoolUtils#shutdown(java.util.concurrent.ExecutorService)
+ */
public void shutdown() {
if (executor != null) {
- ThreadPoolUtils.shutdown(executor, shutdownAwaitTermination);
+ ThreadPoolUtils.shutdown(executor);
+ executor = null;
+ }
+ initDone.set(false);
+ }
+
+ /**
+ * Performs a shutdown now (aggressively) on the thread pool.
+ *
+ * @see ThreadPoolUtils#shutdownNow(java.util.concurrent.ExecutorService)
+ */
+ public void shutdownNow() {
+ if (executor != null) {
+ ThreadPoolUtils.shutdownNow(executor);
+ executor = null;
+ }
+ initDone.set(false);
+ }
+
+ /**
+ * Performs a graceful shutdown.
+ *
+ * @see ThreadPoolUtils#shutdownGraceful(java.util.concurrent.ExecutorService)
+ */
+ public void shutdownGraceful() {
+ if (executor != null) {
+ ThreadPoolUtils.shutdownGraceful(executor, shutdownAwaitTermination);
executor = null;
}
initDone.set(false);
@@ -119,10 +150,19 @@ public class TaskRunnerFactory implement
if (executor != null) {
executor.execute(runnable);
} else {
- new Thread(runnable, name + "-" + id.incrementAndGet()).start();
+ doExecuteNewThread(runnable, name);
}
}
+ private void doExecuteNewThread(Runnable runnable, String name) {
+ String threadName = name + "-" + id.incrementAndGet();
+ Thread thread = new Thread(runnable, threadName);
+ thread.setDaemon(daemon);
+
+ LOG.trace("Created and running thread[{}]: {}", threadName, thread);
+ thread.start();
+ }
+
protected ExecutorService createDefaultExecutor() {
ThreadPoolExecutor rc = new ThreadPoolExecutor(0, getMaxThreadPoolSize(), 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java?rev=1381695&r1=1381694&r2=1381695&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java Thu Sep 6 17:50:43 2012
@@ -29,6 +29,7 @@ import java.util.concurrent.locks.Reentr
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.thread.SchedulerTimerTask;
+import org.apache.activemq.util.ThreadPoolUtils;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -364,7 +365,7 @@ public abstract class AbstractInactivity
READ_CHECK_TIMER.cancel();
WRITE_CHECK_TIMER = null;
READ_CHECK_TIMER = null;
- ASYNC_TASKS.shutdown();
+ ThreadPoolUtils.shutdown(ASYNC_TASKS);
ASYNC_TASKS = null;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java?rev=1381695&r1=1381694&r2=1381695&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java Thu Sep 6 17:50:43 2012
@@ -39,6 +39,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.activemq.command.DiscoveryEvent;
import org.apache.activemq.transport.discovery.DiscoveryAgent;
import org.apache.activemq.transport.discovery.DiscoveryListener;
+import org.apache.activemq.util.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -348,7 +349,10 @@ public class MulticastDiscoveryAgent imp
if (runner != null) {
runner.interrupt();
}
- getExecutor().shutdownNow();
+ if (executor != null) {
+ ThreadPoolUtils.shutdownNow(executor);
+ executor = null;
+ }
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java?rev=1381695&r1=1381694&r2=1381695&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java Thu Sep 6 17:50:43 2012
@@ -33,6 +33,7 @@ import org.apache.activemq.transport.Abs
import org.apache.activemq.transport.InactivityIOException;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
+import org.apache.activemq.util.ThreadPoolUtils;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -276,7 +277,7 @@ public class MQTTInactivityMonitor exten
if (CHECKER_COUNTER == 0) {
READ_CHECK_TIMER.cancel();
READ_CHECK_TIMER = null;
- ASYNC_TASKS.shutdown();
+ ThreadPoolUtils.shutdown(ASYNC_TASKS);
ASYNC_TASKS = null;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java?rev=1381695&r1=1381694&r2=1381695&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java Thu Sep 6 17:50:43 2012
@@ -24,27 +24,53 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- *
+ * Utility methods for working with thread pools {@link ExecutorService}.
*/
-public class ThreadPoolUtils {
+public final class ThreadPoolUtils {
private static final Logger LOG = LoggerFactory.getLogger(ThreadPoolUtils.class);
- // TODO: Should be 30 sec
- // but lowered due some unit tests dont yet properly shutdown, so want to run these a bit faster
- public static final long DEFAULT_SHUTDOWN_AWAIT_TERMINATION = 10 * 1000L;
+ public static final long DEFAULT_SHUTDOWN_AWAIT_TERMINATION = 30 * 1000L;
+
+ /**
+ * Shutdown the given executor service only (ie not graceful shutdown).
+ *
+ * @see java.util.concurrent.ExecutorService#shutdown()
+ */
+ public static void shutdown(ExecutorService executorService) {
+ doShutdown(executorService, -1, true);
+ }
+
+ /**
+ * Shutdown now the given executor service aggressively.
+ *
+ * @param executorService the executor service to shutdown now
+ * @return list of tasks that never commenced execution
+ * @see java.util.concurrent.ExecutorService#shutdownNow()
+ */
+ public static List<Runnable> shutdownNow(ExecutorService executorService) {
+ List<Runnable> answer = null;
+ if (!executorService.isShutdown()) {
+ LOG.debug("Forcing shutdown of ExecutorService: {}", executorService);
+ answer = executorService.shutdownNow();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.",
+ new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated()});
+ }
+ }
+
+ return answer;
+ }
/**
* Shutdown the given executor service graceful at first, and then aggressively
* if the await termination timeout was hit.
* <p/>
- * This implementation invokes the {@link #shutdown(java.util.concurrent.ExecutorService, long)}
+ * This implementation invokes the {@link #shutdownGraceful(java.util.concurrent.ExecutorService, long)}
* with a timeout value of {@link #DEFAULT_SHUTDOWN_AWAIT_TERMINATION} millis.
- *
- * @see #shutdown(java.util.concurrent.ExecutorService, long)
*/
- public void shutdown(ExecutorService executorService) {
- shutdown(executorService, DEFAULT_SHUTDOWN_AWAIT_TERMINATION);
+ public static void shutdownGraceful(ExecutorService executorService) {
+ doShutdown(executorService, DEFAULT_SHUTDOWN_AWAIT_TERMINATION, false);
}
/**
@@ -57,14 +83,35 @@ public class ThreadPoolUtils {
* forces a shutdown. The parameter <tt>shutdownAwaitTermination</tt>
* is used as timeout value waiting for orderly shutdown to
* complete normally, before going aggressively.
+ * <p/>
+ * Notice if the given parameter <tt>shutdownAwaitTermination</tt> is negative, then a quick shutdown
+ * is commenced, by invoking the {@link java.util.concurrent.ExecutorService#shutdown()} method
+ * and then exit from this method (ie. no graceful shutdown is performed).
*
* @param executorService the executor service to shutdown
- * @param shutdownAwaitTermination timeout in millis to wait for orderly shutdown
- * @see java.util.concurrent.ExecutorService#shutdown()
+ * @param shutdownAwaitTermination timeout in millis to wait for orderly shutdown, if the value if negative
+ * then the thread pool is <b>not</b> graceful shutdown, but a regular shutdown
+ * is commenced.
*/
- public static void shutdown(ExecutorService executorService, long shutdownAwaitTermination) {
+ public static void shutdownGraceful(ExecutorService executorService, long shutdownAwaitTermination) {
+ doShutdown(executorService, shutdownAwaitTermination, false);
+ }
+
+ private static void doShutdown(ExecutorService executorService, long shutdownAwaitTermination, boolean quick) {
// code from Apache Camel - org.apache.camel.impl.DefaultExecutorServiceManager
+ if (executorService == null) {
+ return;
+ }
+
+ if (quick) {
+ // do not shutdown graceful, but just quick shutdown on the thread pool
+ executorService.shutdown();
+ LOG.debug("Quick shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.",
+ new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated()});
+ return;
+ }
+
if (shutdownAwaitTermination <= 0) {
throw new IllegalArgumentException("ShutdownAwaitTermination must be a positive number, was: " + shutdownAwaitTermination);
}
@@ -106,27 +153,6 @@ public class ThreadPoolUtils {
}
/**
- * Shutdown now the given executor service aggressively.
- *
- * @param executorService the executor service to shutdown now
- * @return list of tasks that never commenced execution
- * @see java.util.concurrent.ExecutorService#shutdownNow()
- */
- public static List<Runnable> shutdownNow(ExecutorService executorService) {
- List<Runnable> answer = null;
- if (!executorService.isShutdown()) {
- LOG.debug("Forcing shutdown of ExecutorService: {}", executorService);
- answer = executorService.shutdownNow();
- if (LOG.isTraceEnabled()) {
- LOG.trace("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.",
- new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated()});
- }
- }
-
- return answer;
- }
-
- /**
* Awaits the termination of the thread pool.
* <p/>
* This implementation will log every 5th second at INFO level that we are waiting, so the end user