You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by da...@apache.org on 2012/09/06 19:50:44 UTC

svn commit: r1381695 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ./ broker/ broker/region/ network/ network/jms/ store/journal/ store/kahadb/ thread/ transport/ transport/discovery/multicast/ transport/mqtt/ util/

Author: davsclaus
Date: Thu Sep  6 17:50:43 2012
New Revision: 1381695

URL: http://svn.apache.org/viewvc?rev=1381695&view=rev
Log:
AMQ-4026: Using ThreadPoolUtils to shutdown thread pool. Use thread pool from broker service where applicable.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=1381695&r1=1381694&r2=1381695&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Thu Sep  6 17:50:43 2012
@@ -103,6 +103,7 @@ import org.apache.activemq.util.Introspe
 import org.apache.activemq.util.JMSExceptionSupport;
 import org.apache.activemq.util.LongSequenceGenerator;
 import org.apache.activemq.util.ServiceSupport;
+import org.apache.activemq.util.ThreadPoolUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -692,10 +693,10 @@ public class ActiveMQConnection implemen
         } finally {
             try {
                 if (executor != null) {
-                    executor.shutdown();
+                    ThreadPoolUtils.shutdown(executor);
                 }
             } catch (Throwable e) {
-                LOG.error("Error shutting down thread pool " + e, e);
+                LOG.warn("Error shutting down thread pool: " + executor + ". This exception will be ignored.", e);
             }
 
             ServiceSupport.dispose(this.transport);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=1381695&r1=1381694&r2=1381695&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Thu Sep  6 17:50:43 2012
@@ -26,7 +26,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -61,6 +60,7 @@ import org.apache.activemq.transaction.S
 import org.apache.activemq.util.Callback;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.activemq.util.ThreadPoolUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -769,12 +769,8 @@ public class ActiveMQMessageConsumer imp
                 }
             }
             if (executorService != null) {
-                executorService.shutdown();
-                try {
-                    executorService.awaitTermination(60, TimeUnit.SECONDS);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                }
+                ThreadPoolUtils.shutdownGraceful(executorService, 60000L);
+                executorService = null;
             }
 
             if (session.isClientAcknowledge()) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1381695&r1=1381694&r2=1381695&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Thu Sep  6 17:50:43 2012
@@ -111,6 +111,7 @@ import org.apache.activemq.util.IOHelper
 import org.apache.activemq.util.InetAddressUtil;
 import org.apache.activemq.util.JMXSupport;
 import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.ThreadPoolUtils;
 import org.apache.activemq.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -769,7 +770,7 @@ public class BrokerService implements Se
             this.taskRunnerFactory = null;
         }
         if (this.executor != null) {
-            this.executor.shutdownNow();
+            ThreadPoolUtils.shutdownNow(executor);
             this.executor = null;
         }
 
@@ -2410,8 +2411,7 @@ public class BrokerService implements Se
                 }
                 if (networkConnectorStartExecutor != null) {
                     // executor done when enqueued tasks are complete
-                    networkConnectorStartExecutor.shutdown();
-                    networkConnectorStartExecutor = null;
+                    ThreadPoolUtils.shutdown(networkConnectorStartExecutor);
                 }
 
                 for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
@@ -2755,7 +2755,7 @@ public class BrokerService implements Se
 
     /**
      * Sets whether Authenticated User Name information is shown in MBeans that support this field.
-     * @param true if MBeans should expose user name information.
+     * @param value if MBeans should expose user name information.
      */
     public void setPopulateUserNameInMBeans(boolean value) {
         this.populateUserNameInMBeans = value;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1381695&r1=1381694&r2=1381695&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Sep  6 17:50:43 2012
@@ -86,6 +86,7 @@ import org.apache.activemq.transaction.S
 import org.apache.activemq.usage.Usage;
 import org.apache.activemq.usage.UsageListener;
 import org.apache.activemq.util.BrokerSupport;
+import org.apache.activemq.util.ThreadPoolUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -883,7 +884,8 @@ public class Queue extends BaseDestinati
             taskRunner.shutdown();
         }
         if (this.executor != null) {
-            this.executor.shutdownNow();
+            ThreadPoolUtils.shutdownNow(executor);
+            executor = null;
         }
 
         scheduler.cancel(expireMessagesTask);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1381695&r1=1381694&r2=1381695&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Thu Sep  6 17:50:43 2012
@@ -70,7 +70,6 @@ import org.apache.activemq.command.Shutd
 import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.filter.DestinationFilter;
 import org.apache.activemq.filter.MessageEvaluationContext;
-import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.FutureResponse;
 import org.apache.activemq.transport.ResponseCallback;
@@ -92,7 +91,6 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
     private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
-    private TaskRunnerFactory asyncTaskRunner;
     protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
     protected final Transport localBroker;
     protected final Transport remoteBroker;
@@ -156,8 +154,10 @@ public abstract class DemandForwardingBr
 
     public void start() throws Exception {
         if (started.compareAndSet(false, true)) {
-            asyncTaskRunner = new TaskRunnerFactory("ActiveMQ ForwardingBridge Task");
-            asyncTaskRunner.init();
+
+            if (brokerService == null) {
+                throw new IllegalArgumentException("BrokerService is null on " + this);
+            }
 
             localBroker.setTransportListener(new DefaultTransportListener() {
 
@@ -201,7 +201,7 @@ public abstract class DemandForwardingBr
     }
 
     protected void triggerLocalStartBridge() throws IOException {
-        asyncTaskRunner.execute(new Runnable() {
+        brokerService.getTaskRunnerFactory().execute(new Runnable() {
             public void run() {
                 final String originalName = Thread.currentThread().getName();
                 Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker);
@@ -217,7 +217,7 @@ public abstract class DemandForwardingBr
     }
 
     protected void triggerRemoteStartBridge() throws IOException {
-        asyncTaskRunner.execute(new Runnable() {
+        brokerService.getTaskRunnerFactory().execute(new Runnable() {
             public void run() {
                 final String originalName = Thread.currentThread().getName();
                 Thread.currentThread().setName("StartRemoteBridge: remoteBroker=" + remoteBroker);
@@ -350,7 +350,8 @@ public abstract class DemandForwardingBr
                 try {
                     remoteBridgeStarted.set(false);
                     final CountDownLatch sendShutdown = new CountDownLatch(1);
-                    asyncTaskRunner.execute(new Runnable() {
+
+                    brokerService.getTaskRunnerFactory().execute(new Runnable() {
                         public void run() {
                             try {
                                 localBroker.oneway(new ShutdownInfo());
@@ -363,7 +364,8 @@ public abstract class DemandForwardingBr
                             }
 
                         }
-                    });
+                    }, "ActiveMQ ForwardingBridge StopTask");
+
                     if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
                         LOG.info("Network Could not shutdown in a timely manner");
                     }
@@ -377,9 +379,6 @@ public abstract class DemandForwardingBr
                     startedLatch.countDown();
                     localStartedLatch.countDown();
 
-                    // stop task runner
-                    asyncTaskRunner.shutdown();
-                    asyncTaskRunner = null;
                     ss.throwFirstException();
                 }
             }
@@ -399,7 +398,7 @@ public abstract class DemandForwardingBr
                 LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
             }
             LOG.debug("The remote Exception was: " + error, error);
-            asyncTaskRunner.execute(new Runnable() {
+            brokerService.getTaskRunnerFactory().execute(new Runnable() {
                 public void run() {
                     ServiceSupport.dispose(getControllingService());
                 }
@@ -632,7 +631,7 @@ public abstract class DemandForwardingBr
         if (!disposed.get()) {
             LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error);
             LOG.debug("The local Exception was:" + error, error);
-            asyncTaskRunner.execute(new Runnable() {
+            brokerService.getTaskRunnerFactory().execute(new Runnable() {
                 public void run() {
                     ServiceSupport.dispose(getControllingService());
                 }
@@ -660,7 +659,7 @@ public abstract class DemandForwardingBr
             subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
 
             // continue removal in separate thread to free up this thread for outstanding responses
-            asyncTaskRunner.execute(new Runnable() {
+            brokerService.getTaskRunnerFactory().execute(new Runnable() {
                 public void run() {
                     sub.waitForCompletion();
                     try {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java?rev=1381695&r1=1381694&r2=1381695&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java Thu Sep  6 17:50:43 2012
@@ -35,6 +35,7 @@ import org.apache.activemq.ActiveMQConne
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.util.LRUCache;
+import org.apache.activemq.util.ThreadPoolUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.jndi.JndiTemplate;
@@ -166,7 +167,8 @@ public abstract class JmsConnector imple
     public void stop() throws Exception {
         if (started.compareAndSet(true, false)) {
 
-            this.connectionSerivce.shutdown();
+            ThreadPoolUtils.shutdown(connectionSerivce);
+            connectionSerivce = null;
 
             for (DestinationBridge bridge : inboundBridges) {
                 bridge.stop();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?rev=1381695&r1=1381694&r2=1381695&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Thu Sep  6 17:50:43 2012
@@ -69,6 +69,7 @@ import org.apache.activemq.usage.Usage;
 import org.apache.activemq.usage.UsageListener;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ThreadPoolUtils;
 import org.apache.activemq.wireformat.WireFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -287,7 +288,8 @@ public class JournalPersistenceAdapter i
         // Take one final checkpoint and stop checkpoint processing.
         checkpoint(true, true);
         checkpointTask.shutdown();
-        checkpointExecutor.shutdown();
+        ThreadPoolUtils.shutdown(checkpointExecutor);
+        checkpointExecutor = null;
 
         queues.clear();
         topics.clear();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1381695&r1=1381694&r2=1381695&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Thu Sep  6 17:50:43 2012
@@ -64,6 +64,7 @@ import org.apache.activemq.store.kahadb.
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.ThreadPoolUtils;
 import org.apache.activemq.wireformat.WireFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -237,10 +238,12 @@ public class KahaDBStore extends Message
             this.globalTopicSemaphore.drainPermits();
         }
         if (this.queueExecutor != null) {
-            this.queueExecutor.shutdownNow();
+            ThreadPoolUtils.shutdownNow(queueExecutor);
+            queueExecutor = null;
         }
         if (this.topicExecutor != null) {
-            this.topicExecutor.shutdownNow();
+            ThreadPoolUtils.shutdownNow(topicExecutor);
+            topicExecutor = null;
         }
         LOG.info("Stopped KahaDB");
         super.doStop(stopper);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java?rev=1381695&r1=1381694&r2=1381695&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java Thu Sep  6 17:50:43 2012
@@ -92,9 +92,40 @@ public class TaskRunnerFactory implement
         }
     }
 
+    /**
+     * Performs a shutdown only, by which the thread pool is shutdown by not graceful nor aggressively.
+     *
+     * @see ThreadPoolUtils#shutdown(java.util.concurrent.ExecutorService)
+     */
     public void shutdown() {
         if (executor != null) {
-            ThreadPoolUtils.shutdown(executor, shutdownAwaitTermination);
+            ThreadPoolUtils.shutdown(executor);
+            executor = null;
+        }
+        initDone.set(false);
+    }
+
+    /**
+     * Performs a shutdown now (aggressively) on the thread pool.
+     *
+     * @see ThreadPoolUtils#shutdownNow(java.util.concurrent.ExecutorService)
+     */
+    public void shutdownNow() {
+        if (executor != null) {
+            ThreadPoolUtils.shutdownNow(executor);
+            executor = null;
+        }
+        initDone.set(false);
+    }
+
+    /**
+     * Performs a graceful shutdown.
+     *
+     * @see ThreadPoolUtils#shutdownGraceful(java.util.concurrent.ExecutorService)
+     */
+    public void shutdownGraceful() {
+        if (executor != null) {
+            ThreadPoolUtils.shutdownGraceful(executor, shutdownAwaitTermination);
             executor = null;
         }
         initDone.set(false);
@@ -119,10 +150,19 @@ public class TaskRunnerFactory implement
         if (executor != null) {
             executor.execute(runnable);
         } else {
-            new Thread(runnable, name + "-" + id.incrementAndGet()).start();
+            doExecuteNewThread(runnable, name);
         }
     }
 
+    private void doExecuteNewThread(Runnable runnable, String name) {
+        String threadName = name + "-" + id.incrementAndGet();
+        Thread thread = new Thread(runnable, threadName);
+        thread.setDaemon(daemon);
+
+        LOG.trace("Created and running thread[{}]: {}", threadName, thread);
+        thread.start();
+    }
+
     protected ExecutorService createDefaultExecutor() {
         ThreadPoolExecutor rc = new ThreadPoolExecutor(0, getMaxThreadPoolSize(), 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
             public Thread newThread(Runnable runnable) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java?rev=1381695&r1=1381694&r2=1381695&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java Thu Sep  6 17:50:43 2012
@@ -29,6 +29,7 @@ import java.util.concurrent.locks.Reentr
 import org.apache.activemq.command.KeepAliveInfo;
 import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.thread.SchedulerTimerTask;
+import org.apache.activemq.util.ThreadPoolUtils;
 import org.apache.activemq.wireformat.WireFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -364,7 +365,7 @@ public abstract class AbstractInactivity
                   READ_CHECK_TIMER.cancel();
                     WRITE_CHECK_TIMER = null;
                     READ_CHECK_TIMER = null;
-                    ASYNC_TASKS.shutdown();
+                    ThreadPoolUtils.shutdown(ASYNC_TASKS);
                     ASYNC_TASKS = null;
                 }
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java?rev=1381695&r1=1381694&r2=1381695&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java Thu Sep  6 17:50:43 2012
@@ -39,6 +39,7 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.activemq.command.DiscoveryEvent;
 import org.apache.activemq.transport.discovery.DiscoveryAgent;
 import org.apache.activemq.transport.discovery.DiscoveryListener;
+import org.apache.activemq.util.ThreadPoolUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -348,7 +349,10 @@ public class MulticastDiscoveryAgent imp
             if (runner != null) {
                 runner.interrupt();
             }
-            getExecutor().shutdownNow();
+            if (executor != null) {
+                ThreadPoolUtils.shutdownNow(executor);
+                executor = null;
+            }
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java?rev=1381695&r1=1381694&r2=1381695&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java Thu Sep  6 17:50:43 2012
@@ -33,6 +33,7 @@ import org.apache.activemq.transport.Abs
 import org.apache.activemq.transport.InactivityIOException;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFilter;
+import org.apache.activemq.util.ThreadPoolUtils;
 import org.apache.activemq.wireformat.WireFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -276,7 +277,7 @@ public class MQTTInactivityMonitor exten
                 if (CHECKER_COUNTER == 0) {
                     READ_CHECK_TIMER.cancel();
                     READ_CHECK_TIMER = null;
-                    ASYNC_TASKS.shutdown();
+                    ThreadPoolUtils.shutdown(ASYNC_TASKS);
                     ASYNC_TASKS = null;
                 }
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java?rev=1381695&r1=1381694&r2=1381695&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java Thu Sep  6 17:50:43 2012
@@ -24,27 +24,53 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- *
+ * Utility methods for working with thread pools {@link ExecutorService}.
  */
-public class ThreadPoolUtils {
+public final class ThreadPoolUtils {
 
     private static final Logger LOG = LoggerFactory.getLogger(ThreadPoolUtils.class);
 
-    // TODO: Should be 30 sec
-    // but lowered due some unit tests dont yet properly shutdown, so want to run these a bit faster
-    public static final long DEFAULT_SHUTDOWN_AWAIT_TERMINATION = 10 * 1000L;
+    public static final long DEFAULT_SHUTDOWN_AWAIT_TERMINATION = 30 * 1000L;
+
+    /**
+     * Shutdown the given executor service only (ie not graceful shutdown).
+     *
+     * @see java.util.concurrent.ExecutorService#shutdown()
+     */
+    public static void shutdown(ExecutorService executorService) {
+        doShutdown(executorService, -1, true);
+    }
+
+    /**
+     * Shutdown now the given executor service aggressively.
+     *
+     * @param executorService the executor service to shutdown now
+     * @return list of tasks that never commenced execution
+     * @see java.util.concurrent.ExecutorService#shutdownNow()
+     */
+    public static List<Runnable> shutdownNow(ExecutorService executorService) {
+        List<Runnable> answer = null;
+        if (!executorService.isShutdown()) {
+            LOG.debug("Forcing shutdown of ExecutorService: {}", executorService);
+            answer = executorService.shutdownNow();
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.",
+                        new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated()});
+            }
+        }
+
+        return answer;
+    }
 
     /**
      * Shutdown the given executor service graceful at first, and then aggressively
      * if the await termination timeout was hit.
      * <p/>
-     * This implementation invokes the {@link #shutdown(java.util.concurrent.ExecutorService, long)}
+     * This implementation invokes the {@link #shutdownGraceful(java.util.concurrent.ExecutorService, long)}
      * with a timeout value of {@link #DEFAULT_SHUTDOWN_AWAIT_TERMINATION} millis.
-     *
-     * @see #shutdown(java.util.concurrent.ExecutorService, long)
      */
-    public void shutdown(ExecutorService executorService) {
-        shutdown(executorService, DEFAULT_SHUTDOWN_AWAIT_TERMINATION);
+    public static void shutdownGraceful(ExecutorService executorService) {
+        doShutdown(executorService, DEFAULT_SHUTDOWN_AWAIT_TERMINATION, false);
     }
 
     /**
@@ -57,14 +83,35 @@ public class ThreadPoolUtils {
      * forces a shutdown. The parameter <tt>shutdownAwaitTermination</tt>
      * is used as timeout value waiting for orderly shutdown to
      * complete normally, before going aggressively.
+     * <p/>
+     * Notice if the given parameter <tt>shutdownAwaitTermination</tt> is negative, then a quick shutdown
+     * is commenced, by invoking the {@link java.util.concurrent.ExecutorService#shutdown()} method
+     * and then exit from this method (ie. no graceful shutdown is performed).
      *
      * @param executorService the executor service to shutdown
-     * @param shutdownAwaitTermination timeout in millis to wait for orderly shutdown
-     * @see java.util.concurrent.ExecutorService#shutdown()
+     * @param shutdownAwaitTermination timeout in millis to wait for orderly shutdown, if the value if negative
+     *                                 then the thread pool is <b>not</b> graceful shutdown, but a regular shutdown
+     *                                 is commenced.
      */
-    public static void shutdown(ExecutorService executorService, long shutdownAwaitTermination) {
+    public static void shutdownGraceful(ExecutorService executorService, long shutdownAwaitTermination) {
+        doShutdown(executorService, shutdownAwaitTermination, false);
+    }
+
+    private static void doShutdown(ExecutorService executorService, long shutdownAwaitTermination, boolean quick) {
         // code from Apache Camel - org.apache.camel.impl.DefaultExecutorServiceManager
 
+        if (executorService == null) {
+            return;
+        }
+
+        if (quick) {
+            // do not shutdown graceful, but just quick shutdown on the thread pool
+            executorService.shutdown();
+            LOG.debug("Quick shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.",
+                    new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated()});
+            return;
+        }
+
         if (shutdownAwaitTermination <= 0) {
             throw new IllegalArgumentException("ShutdownAwaitTermination must be a positive number, was: " + shutdownAwaitTermination);
         }
@@ -106,27 +153,6 @@ public class ThreadPoolUtils {
     }
 
     /**
-     * Shutdown now the given executor service aggressively.
-     *
-     * @param executorService the executor service to shutdown now
-     * @return list of tasks that never commenced execution
-     * @see java.util.concurrent.ExecutorService#shutdownNow()
-     */
-    public static List<Runnable> shutdownNow(ExecutorService executorService) {
-        List<Runnable> answer = null;
-        if (!executorService.isShutdown()) {
-            LOG.debug("Forcing shutdown of ExecutorService: {}", executorService);
-            answer = executorService.shutdownNow();
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.",
-                        new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated()});
-            }
-        }
-
-        return answer;
-    }
-
-    /**
      * Awaits the termination of the thread pool.
      * <p/>
      * This implementation will log every 5th second at INFO level that we are waiting, so the end user