You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/09/05 10:44:53 UTC

svn commit: r1381072 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/component/dataset/ camel-core/src/main/java/org/apache/camel/component/seda/ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/pro...

Author: davsclaus
Date: Wed Sep  5 08:44:51 2012
New Revision: 1381072

URL: http://svn.apache.org/viewvc?rev=1381072&view=rev
Log:
CAMEL-5563: Camel now shutdown thread pools graceful at first and then fallback to be aggresive as before. Added more logging details during shutdown, as well logging if the shutdown takes a while. As well if there was any thread pools when Camel shutdown that wasnt properly shutdown beforehand. The graceful shutdown uses a 30 sec timeout.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/CamelThreadFactory.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SizedScheduledExecutorService.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
    camel/trunk/camel-core/src/test/resources/log4j.properties
    camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
    camel/trunk/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
    camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
    camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
    camel/trunk/components/camel-kestrel/src/main/java/org/apache/camel/component/kestrel/KestrelConsumer.java
    camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
    camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
    camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java Wed Sep  5 08:44:51 2012
@@ -77,7 +77,7 @@ public class DataSetConsumer extends Def
         super.doStop();
 
         if (executorService != null) {
-            camelContext.getExecutorServiceManager().shutdownNow(executorService);
+            camelContext.getExecutorServiceManager().shutdown(executorService);
             executorService = null;
         }
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Wed Sep  5 08:44:51 2012
@@ -306,7 +306,7 @@ public class SedaConsumer extends Servic
     protected void doShutdown() throws Exception {
         // only shutdown thread pool when we shutdown
         if (executor != null) {
-            endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+            endpoint.getCamelContext().getExecutorServiceManager().shutdown(executor);
             executor = null;
         }
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Wed Sep  5 08:44:51 2012
@@ -372,7 +372,7 @@ public class SedaEndpoint extends Defaul
         }
         // shutdown thread pool if it was in use
         if (multicastExecutor != null) {
-            getCamelContext().getExecutorServiceManager().shutdownNow(multicastExecutor);
+            getCamelContext().getExecutorServiceManager().shutdown(multicastExecutor);
             multicastExecutor = null;
         }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java Wed Sep  5 08:44:51 2012
@@ -40,6 +40,8 @@ import org.apache.camel.spi.ThreadPoolFa
 import org.apache.camel.spi.ThreadPoolProfile;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.StopWatch;
+import org.apache.camel.util.TimeUtils;
 import org.apache.camel.util.URISupport;
 import org.apache.camel.util.concurrent.CamelThreadFactory;
 import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
@@ -57,6 +59,7 @@ public class DefaultExecutorServiceManag
     private ThreadPoolFactory threadPoolFactory = new DefaultThreadPoolFactory();
     private final List<ExecutorService> executorServices = new ArrayList<ExecutorService>();
     private String threadNamePattern;
+    private long shutdownAwaitTermination = 30000;
     private String defaultThreadPoolProfileId = "defaultThreadPoolProfile";
     private final Map<String, ThreadPoolProfile> threadPoolProfiles = new HashMap<String, ThreadPoolProfile>();
     private ThreadPoolProfile builtIndefaultProfile;
@@ -126,7 +129,17 @@ public class DefaultExecutorServiceManag
         String name = threadNamePattern.replaceFirst("#camelId#", this.camelContext.getName());
         this.threadNamePattern = name;
     }
-    
+
+    @Override
+    public long getShutdownAwaitTermination() {
+        return shutdownAwaitTermination;
+    }
+
+    @Override
+    public void setShutdownAwaitTermination(long shutdownAwaitTermination) {
+        this.shutdownAwaitTermination = shutdownAwaitTermination;
+    }
+
     @Override
     public String resolveThreadName(String name) {
         return ThreadHelper.resolveThreadName(threadNamePattern, name);
@@ -244,11 +257,50 @@ public class DefaultExecutorServiceManag
     @Override
     public void shutdown(ExecutorService executorService) {
         ObjectHelper.notNull(executorService, "executorService");
+        shutdown(executorService, shutdownAwaitTermination);
+    }
 
+    @Override
+    public void shutdown(ExecutorService executorService, long shutdownAwaitTermination) {
+        ObjectHelper.notNull(executorService, "executorService");
+        if (shutdownAwaitTermination <= 0) {
+            throw new IllegalArgumentException("ShutdownAwaitTermination must be a positive number, was: " + shutdownAwaitTermination);
+        }
+
+
+        // shutting down a thread pool is a 2 step process. First we try graceful, and if that fails, then we go more aggressively
+        // and try shutting down again. In both cases we wait at most the given shutdown timeout value given
+        // (total wait could then be 2 x shutdownAwaitTermination)
+        boolean warned = false;
+        StopWatch watch = new StopWatch();
         if (!executorService.isShutdown()) {
-            LOG.debug("Shutdown ExecutorService: {}", executorService);
+            LOG.trace("Shutdown of ExecutorService: {} with await termination: {} millis", executorService, shutdownAwaitTermination);
             executorService.shutdown();
-            LOG.trace("Shutdown ExecutorService: {} complete.", executorService);
+            try {
+                if (!awaitTermination(executorService, shutdownAwaitTermination)) {
+                    warned = true;
+                    LOG.warn("Forcing shutdown of ExecutorService: {} due first await termination elapsed.", executorService);
+                    executorService.shutdownNow();
+                    // we are now shutting down aggressively, so wait to see if we can completely shutdown or not
+                    if (!awaitTermination(executorService, shutdownAwaitTermination)) {
+                        LOG.warn("Cannot completely force shutdown of ExecutorService: {} due second await termination elapsed.", executorService);
+                    }
+                }
+            } catch (InterruptedException e) {
+                warned = true;
+                LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.", executorService);
+                // we were interrupted during shutdown, so force shutdown
+                executorService.shutdownNow();
+            }
+
+            // if we logged at WARN level, then report at INFO level when we are complete so the end user can see this in the log
+            if (warned) {
+                LOG.info("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {} took: {}.",
+                        new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated(), TimeUtils.printDuration(watch.taken())});
+            } else if (LOG.isDebugEnabled()) {
+                LOG.debug("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {} took: {}.",
+                    new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated(), TimeUtils.printDuration(watch.taken())});
+            }
         }
 
         if (executorService instanceof ThreadPoolExecutor) {
@@ -262,19 +314,56 @@ public class DefaultExecutorServiceManag
         executorServices.remove(executorService);
     }
 
+    /**
+     * Awaits the termination of the thread pool.
+     * <p/>
+     * This implementation will log every 5th second at INFO level that we are waiting, so the end user
+     * can see we are not hanging in case it takes longer time to shutdown the pool.
+     *
+     * @param executorService            the thread pool
+     * @param shutdownAwaitTermination   time in millis to use as timeout
+     * @return <tt>true</tt> if the pool is terminated, or <tt>false</tt> if we timed out
+     * @throws InterruptedException is thrown if we are interrupted during the waiting
+     */
+    private static boolean awaitTermination(ExecutorService executorService, long shutdownAwaitTermination) throws InterruptedException {
+        // log progress every 5th second so end user is aware of we are shutting down
+        StopWatch watch = new StopWatch();
+        long interval = Math.min(5000, shutdownAwaitTermination);
+        boolean done = false;
+        while (!done && interval > 0) {
+            if (executorService.awaitTermination(interval, TimeUnit.MILLISECONDS)) {
+                done = true;
+            } else {
+                LOG.info("Waited {} for ExecutorService: {} to shutdown...", TimeUtils.printDuration(watch.taken()), executorService);
+                // recalculate interval
+                interval = Math.min(5000, shutdownAwaitTermination - watch.taken());
+            }
+        }
+
+        return done;
+    }
+
     @Override
     public List<Runnable> shutdownNow(ExecutorService executorService) {
-        return doShutdownNow(executorService, true);
+        return doShutdownNow(executorService, false);
     }
 
-    private List<Runnable> doShutdownNow(ExecutorService executorService, boolean remove) {
+    private List<Runnable> doShutdownNow(ExecutorService executorService, boolean failSafe) {
         ObjectHelper.notNull(executorService, "executorService");
 
         List<Runnable> answer = null;
         if (!executorService.isShutdown()) {
-            LOG.debug("ShutdownNow ExecutorService: {}", executorService);
+            if (failSafe) {
+                // log as warn, as we shutdown as fail-safe, so end user should see more details in the log.
+                LOG.warn("Forcing shutdown of ExecutorService: {}", executorService);
+            } else {
+                LOG.debug("Forcing shutdown of ExecutorService: {}", executorService);
+            }
             answer = executorService.shutdownNow();
-            LOG.trace("ShutdownNow ExecutorService: {} complete.", executorService);
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.",
+                        new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated()});
+            }
         }
 
         if (executorService instanceof ThreadPoolExecutor) {
@@ -285,7 +374,7 @@ public class DefaultExecutorServiceManag
         }
 
         // remove reference as its shutdown
-        if (remove) {
+        if (!failSafe) {
             executorServices.remove(executorService);
         }
 
@@ -316,17 +405,23 @@ public class DefaultExecutorServiceManag
 
     @Override
     protected void doShutdown() throws Exception {
-        // shutdown all executor services by looping
-        for (ExecutorService executorService : executorServices) {
-            // only log if something goes wrong as we want to shutdown them all
-            try {
-                // must not remove during looping, as we clear the list afterwards
-                doShutdownNow(executorService, false);
-            } catch (Throwable e) {
-                LOG.warn("Error occurred during shutdown of ExecutorService: "
-                        + executorService + ". This exception will be ignored.", e);
+        // shutdown all remainder executor services by looping and doing this aggressively
+        // as by normal all threads pool should have been shutdown using proper lifecycle
+        // by their EIPs, components etc. This is acting as a fail-safe during shutdown
+        // of CamelContext itself.
+        if (!executorServices.isEmpty()) {
+            LOG.warn("Shutting down {} ExecutorService's which has not been shutdown properly (acting as fail-safe)", executorServices.size());
+            for (ExecutorService executorService : executorServices) {
+                // only log if something goes wrong as we want to shutdown them all
+                try {
+                    doShutdownNow(executorService, true);
+                } catch (Throwable e) {
+                    LOG.warn("Error occurred during shutdown of ExecutorService: "
+                            + executorService + ". This exception will be ignored.", e);
+                }
             }
         }
+        // clear list
         executorServices.clear();
 
         // do not clear the default profile as we could potential be restarted

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java Wed Sep  5 08:44:51 2012
@@ -355,7 +355,7 @@ public class DefaultShutdownStrategy ext
     @Override
     protected void doShutdown() throws Exception {
         if (executor != null) {
-            camelContext.getExecutorServiceManager().shutdownNow(executor);
+            camelContext.getExecutorServiceManager().shutdown(executor);
             // should clear executor so we can restart by creating a new thread pool
             executor = null;
         }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Wed Sep  5 08:44:51 2012
@@ -360,7 +360,7 @@ public abstract class ScheduledPollConsu
     @Override
     protected void doShutdown() throws Exception {
         if (shutdownExecutor && scheduledExecutorService != null) {
-            getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutorService);
+            getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(scheduledExecutorService);
             scheduledExecutorService = null;
             future = null;
         }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java Wed Sep  5 08:44:51 2012
@@ -232,7 +232,7 @@ public abstract class DelayProcessorSupp
     @Override
     protected void doShutdown() throws Exception {
         if (shutdownExecutorService && executorService != null) {
-            camelContext.getExecutorServiceManager().shutdownNow(executorService);
+            camelContext.getExecutorServiceManager().shutdown(executorService);
         }
         super.doShutdown();
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Wed Sep  5 08:44:51 2012
@@ -965,7 +965,7 @@ public class MulticastProcessor extends 
         errorHandlers.clear();
 
         if (shutdownExecutorService && executorService != null) {
-            getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+            getCamelContext().getExecutorServiceManager().shutdown(executorService);
         }
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java Wed Sep  5 08:44:51 2012
@@ -80,7 +80,7 @@ public class OnCompletionProcessor exten
     protected void doShutdown() throws Exception {
         ServiceHelper.stopAndShutdownService(processor);
         if (shutdownExecutorService) {
-            getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+            getCamelContext().getExecutorServiceManager().shutdown(executorService);
         }
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java Wed Sep  5 08:44:51 2012
@@ -171,7 +171,7 @@ public class ThreadsProcessor extends Se
 
     protected void doShutdown() throws Exception {
         if (shutdownExecutorService) {
-            camelContext.getExecutorServiceManager().shutdownNow(executorService);
+            camelContext.getExecutorServiceManager().shutdown(executorService);
         }
         super.doShutdown();
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java Wed Sep  5 08:44:51 2012
@@ -157,7 +157,7 @@ public class ThroughputLogger extends Se
     @Override
     public void doStop() throws Exception {
         if (logSchedulerService != null) {
-            camelContext.getExecutorServiceManager().shutdownNow(logSchedulerService);
+            camelContext.getExecutorServiceManager().shutdown(logSchedulerService);
             logSchedulerService = null;
         }
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java Wed Sep  5 08:44:51 2012
@@ -217,7 +217,7 @@ public class WireTapProcessor extends Se
     protected void doShutdown() throws Exception {
         ServiceHelper.stopAndShutdownService(processor);
         if (shutdownExecutorService) {
-            destination.getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+            destination.getCamelContext().getExecutorServiceManager().shutdown(executorService);
         }
     }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Wed Sep  5 08:44:51 2012
@@ -917,7 +917,7 @@ public class AggregateProcessor extends 
         // and is better suited for preparing to shutdown than this doStop method is
 
         if (recoverService != null) {
-            camelContext.getExecutorServiceManager().shutdownNow(recoverService);
+            camelContext.getExecutorServiceManager().shutdown(recoverService);
         }
         ServiceHelper.stopServices(timeoutMap, processor, deadLetterProducerTemplate);
 
@@ -970,10 +970,10 @@ public class AggregateProcessor extends 
         inProgressCompleteExchanges.clear();
 
         if (shutdownExecutorService) {
-            camelContext.getExecutorServiceManager().shutdownNow(executorService);
+            camelContext.getExecutorServiceManager().shutdown(executorService);
         }
         if (shutdownTimeoutCheckerExecutorService) {
-            camelContext.getExecutorServiceManager().shutdownNow(timeoutCheckerExecutorService);
+            camelContext.getExecutorServiceManager().shutdown(timeoutCheckerExecutorService);
             timeoutCheckerExecutorService = null;
         }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java Wed Sep  5 08:44:51 2012
@@ -43,6 +43,11 @@ import org.apache.camel.ShutdownableServ
  * If you use the <tt>newXXX</tt> methods to create thread pools, then Camel will by default take care of
  * shutting down those created pools when {@link org.apache.camel.CamelContext} is shutting down.
  * <p/>
+ * For more information about shutting down thread pools see the {@link #shutdown(java.util.concurrent.ExecutorService)}
+ * and {@link #shutdownNow(java.util.concurrent.ExecutorService)}, and {@link #getShutdownAwaitTermination()} methods.
+ * Notice the details about using a graceful shutdown at fist, and then falling back to aggressive shutdown in case
+ * of await termination timeout occurred.
+ *
  * @see ThreadPoolFactory
  */
 public interface ExecutorServiceManager extends ShutdownableService {
@@ -121,6 +126,26 @@ public interface ExecutorServiceManager 
     String getThreadNamePattern();
 
     /**
+     * Sets the time to wait for thread pools to shutdown orderly, when invoking the
+     * {@link #shutdown()} method.
+     * <p/>
+     * The default value is <tt>30000</tt> millis.
+     *
+     * @param timeInMillis time in millis.
+     */
+    void setShutdownAwaitTermination(long timeInMillis);
+
+    /**
+     * Gets the time to wait for thread pools to shutdown orderly, when invoking the
+     * {@link #shutdown()} method.
+     * <p/>
+     * The default value is <tt>30000</tt> millis.
+     *
+     * @return the timeout value
+     */
+    long getShutdownAwaitTermination();
+
+    /**
      * Creates a new thread pool using the default thread pool profile.
      *
      * @param source the source object, usually it should be <tt>this</tt> passed in as parameter
@@ -246,15 +271,41 @@ public interface ExecutorServiceManager 
     ScheduledExecutorService newScheduledThreadPool(Object source, String name, String profileId);
 
     /**
-     * Shutdown the given executor service.
+     * Shutdown the given executor service graceful at first, and then aggressively
+     * if the await termination timeout was hit.
+     * <p/>
+     * Will try to perform an orderly shutdown by giving the running threads
+     * time to complete tasks, before going more aggressively by doing a
+     * {@link #shutdownNow(java.util.concurrent.ExecutorService)} which
+     * forces a shutdown. The {@link #getShutdownAwaitTermination()}
+     * is used as timeout value waiting for orderly shutdown to
+     * complete normally, before going aggressively.
      *
      * @param executorService the executor service to shutdown
      * @see java.util.concurrent.ExecutorService#shutdown()
+     * @see #getShutdownAwaitTermination()
      */
     void shutdown(ExecutorService executorService);
 
     /**
-     * Shutdown now the given executor service.
+     * Shutdown the given executor service graceful at first, and then aggressively
+     * if the await termination timeout was hit.
+     * <p/>
+     * Will try to perform an orderly shutdown by giving the running threads
+     * time to complete tasks, before going more aggressively by doing a
+     * {@link #shutdownNow(java.util.concurrent.ExecutorService)} which
+     * forces a shutdown. The parameter <tt>shutdownAwaitTermination</tt>
+     * is used as timeout value waiting for orderly shutdown to
+     * complete normally, before going aggressively.
+     *
+     * @param executorService the executor service to shutdown
+     * @param shutdownAwaitTermination timeout in millis to wait for orderly shutdown
+     * @see java.util.concurrent.ExecutorService#shutdown()
+     */
+    void shutdown(ExecutorService executorService, long shutdownAwaitTermination);
+
+    /**
+     * Shutdown now the given executor service aggressively.
      *
      * @param executorService the executor service to shutdown now
      * @return list of tasks that never commenced execution

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/CamelThreadFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/CamelThreadFactory.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/CamelThreadFactory.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/CamelThreadFactory.java Wed Sep  5 08:44:51 2012
@@ -46,6 +46,10 @@ public final class CamelThreadFactory im
         return answer;
     }
 
+    public String getName() {
+        return name;
+    }
+
     public String toString() {
         return "CamelThreadFactory[" + name + "]";
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java Wed Sep  5 08:44:51 2012
@@ -78,4 +78,15 @@ public class RejectableScheduledThreadPo
         return new RejectableFutureTask<T>(callable);
     }
 
+    @Override
+    public String toString() {
+        // the thread factory often have more precise details what the thread pool is used for
+        if (getThreadFactory() instanceof CamelThreadFactory) {
+            String name = ((CamelThreadFactory) getThreadFactory()).getName();
+            return super.toString() + "[" + name + "]";
+        } else {
+            return super.toString();
+        }
+    }
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java Wed Sep  5 08:44:51 2012
@@ -84,4 +84,15 @@ public class RejectableThreadPoolExecuto
         return new RejectableFutureTask<T>(callable);
     }
 
+    @Override
+    public String toString() {
+        // the thread factory often have more precise details what the thread pool is used for
+        if (getThreadFactory() instanceof CamelThreadFactory) {
+            String name = ((CamelThreadFactory) getThreadFactory()).getName();
+            return super.toString() + "[" + name + "]";
+        } else {
+            return super.toString();
+        }
+    }
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SizedScheduledExecutorService.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SizedScheduledExecutorService.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SizedScheduledExecutorService.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SizedScheduledExecutorService.java Wed Sep  5 08:44:51 2012
@@ -303,6 +303,12 @@ public class SizedScheduledExecutorServi
 
     @Override
     public String toString() {
-        return delegate.toString();
+        // the thread factory often have more precise details what the thread pool is used for
+        if (delegate.getThreadFactory() instanceof CamelThreadFactory) {
+            String name = ((CamelThreadFactory) delegate.getThreadFactory()).getName();
+            return super.toString() + "[" + name + "]";
+        } else {
+            return super.toString();
+        }
     }
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java Wed Sep  5 08:44:51 2012
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.impl;
 
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -456,4 +457,34 @@ public class DefaultExecutorServiceManag
         assertTrue(tp.isShutdown());
     }
 
+    // this is a manual test, by looking at the logs
+    public void xxxTestLongShutdownOfThreadPool() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+        ExecutorService pool = context.getExecutorServiceManager().newSingleThreadExecutor(this, "Cool");
+
+        pool.execute(new Runnable() {
+            @Override
+            public void run() {
+                log.info("Starting thread");
+
+                // this should take a long time to shutdown
+                try {
+                    latch.await(42, TimeUnit.SECONDS);
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+
+                log.info("Existing thread");
+            }
+        });
+
+        // sleep a bit before shutting down
+        Thread.sleep(3000);
+
+        context.getExecutorServiceManager().shutdown(pool);
+
+        assertTrue(pool.isShutdown());
+        assertTrue(pool.isTerminated());
+    }
+
 }

Modified: camel/trunk/camel-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/resources/log4j.properties?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/resources/log4j.properties (original)
+++ camel/trunk/camel-core/src/test/resources/log4j.properties Wed Sep  5 08:44:51 2012
@@ -33,6 +33,7 @@ log4j.logger.org.apache.camel.impl.Defau
 #log4j.logger.org.apache.camel.component.seda=TRACE
 #log4j.logger.org.apache.camel.component.file=TRACE
 #log4j.logger.org.apache.camel.impl.DefaultUnitOfWork=TRACE
+#log4j.logger.org.apache.camel.impl.DefaultExecutorServiceManager=TRACE
 #log4j.logger.org.apache.camel.component.mock=DEBUG
 #log4j.logger.org.apache.camel.component.file=TRACE
 #log4j.logger.org.apache.camel.processor.DefaultErrorHandler=TRACE

Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java (original)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java Wed Sep  5 08:44:51 2012
@@ -222,7 +222,7 @@ public class SqsConsumer extends Schedul
     protected void doShutdown() throws Exception {
         super.doShutdown();
         if (scheduledExecutor != null) {
-            getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutor);
+            getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(scheduledExecutor);
             scheduledExecutor = null;
         }
     }

Modified: camel/trunk/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java (original)
+++ camel/trunk/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java Wed Sep  5 08:44:51 2012
@@ -52,7 +52,7 @@ public class CouchDbConsumer extends Def
             task.stop();
         }
         if (executor != null) {
-            endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+            endpoint.getCamelContext().getExecutorServiceManager().shutdown(executor);
             executor = null;
         }
     }

Modified: camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java (original)
+++ camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java Wed Sep  5 08:44:51 2012
@@ -62,7 +62,7 @@ public class HazelcastSedaConsumer exten
     @Override
     protected void doStop() throws Exception {
         if (executor != null) {
-            endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+            endpoint.getCamelContext().getExecutorServiceManager().shutdown(executor);
             executor = null;
         }
         super.doStop();

Modified: camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java (original)
+++ camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java Wed Sep  5 08:44:51 2012
@@ -148,7 +148,7 @@ public class HdfsProducer extends Defaul
     protected void doStop() throws Exception {
         super.doStop();
         if (scheduler != null) {
-            getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(scheduler);
+            getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(scheduler);
             scheduler = null;
         }
         if (ostream != null) {

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java Wed Sep  5 08:44:51 2012
@@ -405,7 +405,7 @@ public class JmsComponent extends Defaul
     @Override
     protected void doShutdown() throws Exception {
         if (asyncStartStopExecutorService != null) {
-            getCamelContext().getExecutorServiceManager().shutdownNow(asyncStartStopExecutorService);
+            getCamelContext().getExecutorServiceManager().shutdown(asyncStartStopExecutorService);
             asyncStartStopExecutorService = null;
         }
         super.doShutdown();

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java Wed Sep  5 08:44:51 2012
@@ -238,7 +238,7 @@ public abstract class ReplyManagerSuppor
 
         // must also stop executor service
         if (executorService != null) {
-            camelContext.getExecutorServiceManager().shutdownNow(executorService);
+            camelContext.getExecutorServiceManager().shutdown(executorService);
             executorService = null;
         }
     }

Modified: camel/trunk/components/camel-kestrel/src/main/java/org/apache/camel/component/kestrel/KestrelConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-kestrel/src/main/java/org/apache/camel/component/kestrel/KestrelConsumer.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/components/camel-kestrel/src/main/java/org/apache/camel/component/kestrel/KestrelConsumer.java (original)
+++ camel/trunk/components/camel-kestrel/src/main/java/org/apache/camel/component/kestrel/KestrelConsumer.java Wed Sep  5 08:44:51 2012
@@ -86,10 +86,12 @@ public class KestrelConsumer extends Def
         log.info("Stopping consumer for " + endpoint.getEndpointUri());
 
         if (pollerExecutor != null) {
-            endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(pollerExecutor);
+            endpoint.getCamelContext().getExecutorServiceManager().shutdown(pollerExecutor);
+            pollerExecutor = null;
         }
         if (handlerExecutor != null) {
-            endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(handlerExecutor);
+            endpoint.getCamelContext().getExecutorServiceManager().shutdown(handlerExecutor);
+            handlerExecutor = null;
         }
 
         super.doStop();

Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java Wed Sep  5 08:44:51 2012
@@ -110,7 +110,7 @@ public class MinaEndpoint extends Defaul
     protected void doShutdown() throws Exception {
         // shutdown thread pools
         for (ExecutorService executor : executors) {
-            getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+            getCamelContext().getExecutorServiceManager().shutdown(executor);
         }
         executors.clear();
         super.doShutdown();

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java Wed Sep  5 08:44:51 2012
@@ -101,10 +101,12 @@ public class NettyConsumer extends Defau
 
         // and then shutdown the thread pools
         if (bossExecutor != null) {
-            context.getExecutorServiceManager().shutdownNow(bossExecutor);
+            context.getExecutorServiceManager().shutdown(bossExecutor);
+            bossExecutor = null;
         }
         if (workerExecutor != null) {
-            context.getExecutorServiceManager().shutdownNow(workerExecutor);
+            context.getExecutorServiceManager().shutdown(workerExecutor);
+            workerExecutor = null;
         }
 
         LOG.info("Netty consumer unbound from: " + configuration.getAddress());

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Wed Sep  5 08:44:51 2012
@@ -128,10 +128,12 @@ public class NettyProducer extends Defau
 
         // and then shutdown the thread pools
         if (bossExecutor != null) {
-            context.getExecutorServiceManager().shutdownNow(bossExecutor);
+            context.getExecutorServiceManager().shutdown(bossExecutor);
+            bossExecutor = null;
         }
         if (workerExecutor != null) {
-            context.getExecutorServiceManager().shutdownNow(workerExecutor);
+            context.getExecutorServiceManager().shutdown(workerExecutor);
+            workerExecutor = null;
         }
 
         super.doStop();

Modified: camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java (original)
+++ camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java Wed Sep  5 08:44:51 2012
@@ -85,7 +85,7 @@ public class StreamConsumer extends Defa
         // important: do not close the stream as it will close the standard
         // system.in etc.
         if (executor != null) {
-            endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+            endpoint.getCamelContext().getExecutorServiceManager().shutdown(executor);
             executor = null;
         }
         lines.clear();

Modified: camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java?rev=1381072&r1=1381071&r2=1381072&view=diff
==============================================================================
--- camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java (original)
+++ camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java Wed Sep  5 08:44:51 2012
@@ -177,7 +177,7 @@ public class XmppConsumer extends Defaul
             connection.disconnect();
         }
         if (scheduledExecutor != null) {
-            getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutor);
+            getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(scheduledExecutor);
             scheduledExecutor = null;
         }
     }