You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/09/05 11:53:25 UTC
svn commit: r1381120 - in /camel/branches/camel-2.10.x: ./
camel-core/src/main/java/org/apache/camel/component/dataset/
camel-core/src/main/java/org/apache/camel/component/seda/
camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/o...
Author: davsclaus
Date: Wed Sep 5 09:53:23 2012
New Revision: 1381120
URL: http://svn.apache.org/viewvc?rev=1381120&view=rev
Log:
CAMEL-5563: Camel now shutdown thread pools graceful at first and then fallback to be aggresive as before. Added more logging details during shutdown, as well logging if the shutdown takes a while. As well if there was any thread pools when Camel shutdown that wasnt properly shutdown beforehand. The graceful shutdown uses a 30 sec timeout.
Modified:
camel/branches/camel-2.10.x/ (props changed)
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/CamelThreadFactory.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/SizedScheduledExecutorService.java
camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
camel/branches/camel-2.10.x/camel-core/src/test/resources/log4j.properties
camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
camel/branches/camel-2.10.x/components/camel-kestrel/src/main/java/org/apache/camel/component/kestrel/KestrelConsumer.java
camel/branches/camel-2.10.x/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
camel/branches/camel-2.10.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
Merged /camel/trunk:r1381072
Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java (original)
+++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java Wed Sep 5 09:53:23 2012
@@ -77,7 +77,7 @@ public class DataSetConsumer extends Def
super.doStop();
if (executorService != null) {
- camelContext.getExecutorServiceManager().shutdownNow(executorService);
+ camelContext.getExecutorServiceManager().shutdown(executorService);
executorService = null;
}
}
Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original)
+++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Wed Sep 5 09:53:23 2012
@@ -306,7 +306,7 @@ public class SedaConsumer extends Servic
protected void doShutdown() throws Exception {
// only shutdown thread pool when we shutdown
if (executor != null) {
- endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+ endpoint.getCamelContext().getExecutorServiceManager().shutdown(executor);
executor = null;
}
}
Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original)
+++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Wed Sep 5 09:53:23 2012
@@ -372,7 +372,7 @@ public class SedaEndpoint extends Defaul
}
// shutdown thread pool if it was in use
if (multicastExecutor != null) {
- getCamelContext().getExecutorServiceManager().shutdownNow(multicastExecutor);
+ getCamelContext().getExecutorServiceManager().shutdown(multicastExecutor);
multicastExecutor = null;
}
Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java (original)
+++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java Wed Sep 5 09:53:23 2012
@@ -40,6 +40,8 @@ import org.apache.camel.spi.ThreadPoolFa
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.StopWatch;
+import org.apache.camel.util.TimeUtils;
import org.apache.camel.util.URISupport;
import org.apache.camel.util.concurrent.CamelThreadFactory;
import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
@@ -57,6 +59,7 @@ public class DefaultExecutorServiceManag
private ThreadPoolFactory threadPoolFactory = new DefaultThreadPoolFactory();
private final List<ExecutorService> executorServices = new ArrayList<ExecutorService>();
private String threadNamePattern;
+ private long shutdownAwaitTermination = 30000;
private String defaultThreadPoolProfileId = "defaultThreadPoolProfile";
private final Map<String, ThreadPoolProfile> threadPoolProfiles = new HashMap<String, ThreadPoolProfile>();
private ThreadPoolProfile builtIndefaultProfile;
@@ -126,7 +129,17 @@ public class DefaultExecutorServiceManag
String name = threadNamePattern.replaceFirst("#camelId#", this.camelContext.getName());
this.threadNamePattern = name;
}
-
+
+ @Override
+ public long getShutdownAwaitTermination() {
+ return shutdownAwaitTermination;
+ }
+
+ @Override
+ public void setShutdownAwaitTermination(long shutdownAwaitTermination) {
+ this.shutdownAwaitTermination = shutdownAwaitTermination;
+ }
+
@Override
public String resolveThreadName(String name) {
return ThreadHelper.resolveThreadName(threadNamePattern, name);
@@ -244,11 +257,50 @@ public class DefaultExecutorServiceManag
@Override
public void shutdown(ExecutorService executorService) {
ObjectHelper.notNull(executorService, "executorService");
+ shutdown(executorService, shutdownAwaitTermination);
+ }
+ @Override
+ public void shutdown(ExecutorService executorService, long shutdownAwaitTermination) {
+ ObjectHelper.notNull(executorService, "executorService");
+ if (shutdownAwaitTermination <= 0) {
+ throw new IllegalArgumentException("ShutdownAwaitTermination must be a positive number, was: " + shutdownAwaitTermination);
+ }
+
+
+ // shutting down a thread pool is a 2 step process. First we try graceful, and if that fails, then we go more aggressively
+ // and try shutting down again. In both cases we wait at most the given shutdown timeout value given
+ // (total wait could then be 2 x shutdownAwaitTermination)
+ boolean warned = false;
+ StopWatch watch = new StopWatch();
if (!executorService.isShutdown()) {
- LOG.debug("Shutdown ExecutorService: {}", executorService);
+ LOG.trace("Shutdown of ExecutorService: {} with await termination: {} millis", executorService, shutdownAwaitTermination);
executorService.shutdown();
- LOG.trace("Shutdown ExecutorService: {} complete.", executorService);
+ try {
+ if (!awaitTermination(executorService, shutdownAwaitTermination)) {
+ warned = true;
+ LOG.warn("Forcing shutdown of ExecutorService: {} due first await termination elapsed.", executorService);
+ executorService.shutdownNow();
+ // we are now shutting down aggressively, so wait to see if we can completely shutdown or not
+ if (!awaitTermination(executorService, shutdownAwaitTermination)) {
+ LOG.warn("Cannot completely force shutdown of ExecutorService: {} due second await termination elapsed.", executorService);
+ }
+ }
+ } catch (InterruptedException e) {
+ warned = true;
+ LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.", executorService);
+ // we were interrupted during shutdown, so force shutdown
+ executorService.shutdownNow();
+ }
+
+ // if we logged at WARN level, then report at INFO level when we are complete so the end user can see this in the log
+ if (warned) {
+ LOG.info("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {} took: {}.",
+ new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated(), TimeUtils.printDuration(watch.taken())});
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {} took: {}.",
+ new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated(), TimeUtils.printDuration(watch.taken())});
+ }
}
if (executorService instanceof ThreadPoolExecutor) {
@@ -262,19 +314,56 @@ public class DefaultExecutorServiceManag
executorServices.remove(executorService);
}
+ /**
+ * Awaits the termination of the thread pool.
+ * <p/>
+ * This implementation will log every 5th second at INFO level that we are waiting, so the end user
+ * can see we are not hanging in case it takes longer time to shutdown the pool.
+ *
+ * @param executorService the thread pool
+ * @param shutdownAwaitTermination time in millis to use as timeout
+ * @return <tt>true</tt> if the pool is terminated, or <tt>false</tt> if we timed out
+ * @throws InterruptedException is thrown if we are interrupted during the waiting
+ */
+ private static boolean awaitTermination(ExecutorService executorService, long shutdownAwaitTermination) throws InterruptedException {
+ // log progress every 5th second so end user is aware of we are shutting down
+ StopWatch watch = new StopWatch();
+ long interval = Math.min(5000, shutdownAwaitTermination);
+ boolean done = false;
+ while (!done && interval > 0) {
+ if (executorService.awaitTermination(interval, TimeUnit.MILLISECONDS)) {
+ done = true;
+ } else {
+ LOG.info("Waited {} for ExecutorService: {} to shutdown...", TimeUtils.printDuration(watch.taken()), executorService);
+ // recalculate interval
+ interval = Math.min(5000, shutdownAwaitTermination - watch.taken());
+ }
+ }
+
+ return done;
+ }
+
@Override
public List<Runnable> shutdownNow(ExecutorService executorService) {
- return doShutdownNow(executorService, true);
+ return doShutdownNow(executorService, false);
}
- private List<Runnable> doShutdownNow(ExecutorService executorService, boolean remove) {
+ private List<Runnable> doShutdownNow(ExecutorService executorService, boolean failSafe) {
ObjectHelper.notNull(executorService, "executorService");
List<Runnable> answer = null;
if (!executorService.isShutdown()) {
- LOG.debug("ShutdownNow ExecutorService: {}", executorService);
+ if (failSafe) {
+ // log as warn, as we shutdown as fail-safe, so end user should see more details in the log.
+ LOG.warn("Forcing shutdown of ExecutorService: {}", executorService);
+ } else {
+ LOG.debug("Forcing shutdown of ExecutorService: {}", executorService);
+ }
answer = executorService.shutdownNow();
- LOG.trace("ShutdownNow ExecutorService: {} complete.", executorService);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.",
+ new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated()});
+ }
}
if (executorService instanceof ThreadPoolExecutor) {
@@ -285,7 +374,7 @@ public class DefaultExecutorServiceManag
}
// remove reference as its shutdown
- if (remove) {
+ if (!failSafe) {
executorServices.remove(executorService);
}
@@ -316,17 +405,23 @@ public class DefaultExecutorServiceManag
@Override
protected void doShutdown() throws Exception {
- // shutdown all executor services by looping
- for (ExecutorService executorService : executorServices) {
- // only log if something goes wrong as we want to shutdown them all
- try {
- // must not remove during looping, as we clear the list afterwards
- doShutdownNow(executorService, false);
- } catch (Throwable e) {
- LOG.warn("Error occurred during shutdown of ExecutorService: "
- + executorService + ". This exception will be ignored.", e);
+ // shutdown all remainder executor services by looping and doing this aggressively
+ // as by normal all threads pool should have been shutdown using proper lifecycle
+ // by their EIPs, components etc. This is acting as a fail-safe during shutdown
+ // of CamelContext itself.
+ if (!executorServices.isEmpty()) {
+ LOG.warn("Shutting down {} ExecutorService's which has not been shutdown properly (acting as fail-safe)", executorServices.size());
+ for (ExecutorService executorService : executorServices) {
+ // only log if something goes wrong as we want to shutdown them all
+ try {
+ doShutdownNow(executorService, true);
+ } catch (Throwable e) {
+ LOG.warn("Error occurred during shutdown of ExecutorService: "
+ + executorService + ". This exception will be ignored.", e);
+ }
}
}
+ // clear list
executorServices.clear();
// do not clear the default profile as we could potential be restarted
Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java (original)
+++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java Wed Sep 5 09:53:23 2012
@@ -355,7 +355,7 @@ public class DefaultShutdownStrategy ext
@Override
protected void doShutdown() throws Exception {
if (executor != null) {
- camelContext.getExecutorServiceManager().shutdownNow(executor);
+ camelContext.getExecutorServiceManager().shutdown(executor);
// should clear executor so we can restart by creating a new thread pool
executor = null;
}
Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Wed Sep 5 09:53:23 2012
@@ -360,7 +360,7 @@ public abstract class ScheduledPollConsu
@Override
protected void doShutdown() throws Exception {
if (shutdownExecutor && scheduledExecutorService != null) {
- getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutorService);
+ getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(scheduledExecutorService);
scheduledExecutorService = null;
future = null;
}
Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java (original)
+++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java Wed Sep 5 09:53:23 2012
@@ -232,7 +232,7 @@ public abstract class DelayProcessorSupp
@Override
protected void doShutdown() throws Exception {
if (shutdownExecutorService && executorService != null) {
- camelContext.getExecutorServiceManager().shutdownNow(executorService);
+ camelContext.getExecutorServiceManager().shutdown(executorService);
}
super.doShutdown();
}
Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Wed Sep 5 09:53:23 2012
@@ -965,7 +965,7 @@ public class MulticastProcessor extends
errorHandlers.clear();
if (shutdownExecutorService && executorService != null) {
- getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+ getCamelContext().getExecutorServiceManager().shutdown(executorService);
}
}
Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java (original)
+++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java Wed Sep 5 09:53:23 2012
@@ -80,7 +80,7 @@ public class OnCompletionProcessor exten
protected void doShutdown() throws Exception {
ServiceHelper.stopAndShutdownService(processor);
if (shutdownExecutorService) {
- getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+ getCamelContext().getExecutorServiceManager().shutdown(executorService);
}
}
Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java (original)
+++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java Wed Sep 5 09:53:23 2012
@@ -171,7 +171,7 @@ public class ThreadsProcessor extends Se
protected void doShutdown() throws Exception {
if (shutdownExecutorService) {
- camelContext.getExecutorServiceManager().shutdownNow(executorService);
+ camelContext.getExecutorServiceManager().shutdown(executorService);
}
super.doShutdown();
}
Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java (original)
+++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java Wed Sep 5 09:53:23 2012
@@ -157,7 +157,7 @@ public class ThroughputLogger extends Se
@Override
public void doStop() throws Exception {
if (logSchedulerService != null) {
- camelContext.getExecutorServiceManager().shutdownNow(logSchedulerService);
+ camelContext.getExecutorServiceManager().shutdown(logSchedulerService);
logSchedulerService = null;
}
}
Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java (original)
+++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java Wed Sep 5 09:53:23 2012
@@ -217,7 +217,7 @@ public class WireTapProcessor extends Se
protected void doShutdown() throws Exception {
ServiceHelper.stopAndShutdownService(processor);
if (shutdownExecutorService) {
- destination.getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+ destination.getCamelContext().getExecutorServiceManager().shutdown(executorService);
}
}
}
Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Wed Sep 5 09:53:23 2012
@@ -917,7 +917,7 @@ public class AggregateProcessor extends
// and is better suited for preparing to shutdown than this doStop method is
if (recoverService != null) {
- camelContext.getExecutorServiceManager().shutdownNow(recoverService);
+ camelContext.getExecutorServiceManager().shutdown(recoverService);
}
ServiceHelper.stopServices(timeoutMap, processor, deadLetterProducerTemplate);
@@ -970,10 +970,10 @@ public class AggregateProcessor extends
inProgressCompleteExchanges.clear();
if (shutdownExecutorService) {
- camelContext.getExecutorServiceManager().shutdownNow(executorService);
+ camelContext.getExecutorServiceManager().shutdown(executorService);
}
if (shutdownTimeoutCheckerExecutorService) {
- camelContext.getExecutorServiceManager().shutdownNow(timeoutCheckerExecutorService);
+ camelContext.getExecutorServiceManager().shutdown(timeoutCheckerExecutorService);
timeoutCheckerExecutorService = null;
}
Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java (original)
+++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java Wed Sep 5 09:53:23 2012
@@ -43,6 +43,11 @@ import org.apache.camel.ShutdownableServ
* If you use the <tt>newXXX</tt> methods to create thread pools, then Camel will by default take care of
* shutting down those created pools when {@link org.apache.camel.CamelContext} is shutting down.
* <p/>
+ * For more information about shutting down thread pools see the {@link #shutdown(java.util.concurrent.ExecutorService)}
+ * and {@link #shutdownNow(java.util.concurrent.ExecutorService)}, and {@link #getShutdownAwaitTermination()} methods.
+ * Notice the details about using a graceful shutdown at fist, and then falling back to aggressive shutdown in case
+ * of await termination timeout occurred.
+ *
* @see ThreadPoolFactory
*/
public interface ExecutorServiceManager extends ShutdownableService {
@@ -121,6 +126,26 @@ public interface ExecutorServiceManager
String getThreadNamePattern();
/**
+ * Sets the time to wait for thread pools to shutdown orderly, when invoking the
+ * {@link #shutdown()} method.
+ * <p/>
+ * The default value is <tt>30000</tt> millis.
+ *
+ * @param timeInMillis time in millis.
+ */
+ void setShutdownAwaitTermination(long timeInMillis);
+
+ /**
+ * Gets the time to wait for thread pools to shutdown orderly, when invoking the
+ * {@link #shutdown()} method.
+ * <p/>
+ * The default value is <tt>30000</tt> millis.
+ *
+ * @return the timeout value
+ */
+ long getShutdownAwaitTermination();
+
+ /**
* Creates a new thread pool using the default thread pool profile.
*
* @param source the source object, usually it should be <tt>this</tt> passed in as parameter
@@ -246,15 +271,41 @@ public interface ExecutorServiceManager
ScheduledExecutorService newScheduledThreadPool(Object source, String name, String profileId);
/**
- * Shutdown the given executor service.
+ * Shutdown the given executor service graceful at first, and then aggressively
+ * if the await termination timeout was hit.
+ * <p/>
+ * Will try to perform an orderly shutdown by giving the running threads
+ * time to complete tasks, before going more aggressively by doing a
+ * {@link #shutdownNow(java.util.concurrent.ExecutorService)} which
+ * forces a shutdown. The {@link #getShutdownAwaitTermination()}
+ * is used as timeout value waiting for orderly shutdown to
+ * complete normally, before going aggressively.
*
* @param executorService the executor service to shutdown
* @see java.util.concurrent.ExecutorService#shutdown()
+ * @see #getShutdownAwaitTermination()
*/
void shutdown(ExecutorService executorService);
/**
- * Shutdown now the given executor service.
+ * Shutdown the given executor service graceful at first, and then aggressively
+ * if the await termination timeout was hit.
+ * <p/>
+ * Will try to perform an orderly shutdown by giving the running threads
+ * time to complete tasks, before going more aggressively by doing a
+ * {@link #shutdownNow(java.util.concurrent.ExecutorService)} which
+ * forces a shutdown. The parameter <tt>shutdownAwaitTermination</tt>
+ * is used as timeout value waiting for orderly shutdown to
+ * complete normally, before going aggressively.
+ *
+ * @param executorService the executor service to shutdown
+ * @param shutdownAwaitTermination timeout in millis to wait for orderly shutdown
+ * @see java.util.concurrent.ExecutorService#shutdown()
+ */
+ void shutdown(ExecutorService executorService, long shutdownAwaitTermination);
+
+ /**
+ * Shutdown now the given executor service aggressively.
*
* @param executorService the executor service to shutdown now
* @return list of tasks that never commenced execution
Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/CamelThreadFactory.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/CamelThreadFactory.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/CamelThreadFactory.java (original)
+++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/CamelThreadFactory.java Wed Sep 5 09:53:23 2012
@@ -46,6 +46,10 @@ public final class CamelThreadFactory im
return answer;
}
+ public String getName() {
+ return name;
+ }
+
public String toString() {
return "CamelThreadFactory[" + name + "]";
}
Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java (original)
+++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java Wed Sep 5 09:53:23 2012
@@ -78,4 +78,15 @@ public class RejectableScheduledThreadPo
return new RejectableFutureTask<T>(callable);
}
+ @Override
+ public String toString() {
+ // the thread factory often have more precise details what the thread pool is used for
+ if (getThreadFactory() instanceof CamelThreadFactory) {
+ String name = ((CamelThreadFactory) getThreadFactory()).getName();
+ return super.toString() + "[" + name + "]";
+ } else {
+ return super.toString();
+ }
+ }
+
}
Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java (original)
+++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java Wed Sep 5 09:53:23 2012
@@ -84,4 +84,15 @@ public class RejectableThreadPoolExecuto
return new RejectableFutureTask<T>(callable);
}
+ @Override
+ public String toString() {
+ // the thread factory often have more precise details what the thread pool is used for
+ if (getThreadFactory() instanceof CamelThreadFactory) {
+ String name = ((CamelThreadFactory) getThreadFactory()).getName();
+ return super.toString() + "[" + name + "]";
+ } else {
+ return super.toString();
+ }
+ }
+
}
Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/SizedScheduledExecutorService.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/SizedScheduledExecutorService.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/SizedScheduledExecutorService.java (original)
+++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/SizedScheduledExecutorService.java Wed Sep 5 09:53:23 2012
@@ -303,6 +303,12 @@ public class SizedScheduledExecutorServi
@Override
public String toString() {
- return delegate.toString();
+ // the thread factory often have more precise details what the thread pool is used for
+ if (delegate.getThreadFactory() instanceof CamelThreadFactory) {
+ String name = ((CamelThreadFactory) delegate.getThreadFactory()).getName();
+ return super.toString() + "[" + name + "]";
+ } else {
+ return super.toString();
+ }
}
}
Modified: camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java (original)
+++ camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java Wed Sep 5 09:53:23 2012
@@ -16,6 +16,7 @@
*/
package org.apache.camel.impl;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -456,4 +457,34 @@ public class DefaultExecutorServiceManag
assertTrue(tp.isShutdown());
}
+ // this is a manual test, by looking at the logs
+ public void xxxTestLongShutdownOfThreadPool() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ ExecutorService pool = context.getExecutorServiceManager().newSingleThreadExecutor(this, "Cool");
+
+ pool.execute(new Runnable() {
+ @Override
+ public void run() {
+ log.info("Starting thread");
+
+ // this should take a long time to shutdown
+ try {
+ latch.await(42, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+
+ log.info("Existing thread");
+ }
+ });
+
+ // sleep a bit before shutting down
+ Thread.sleep(3000);
+
+ context.getExecutorServiceManager().shutdown(pool);
+
+ assertTrue(pool.isShutdown());
+ assertTrue(pool.isTerminated());
+ }
+
}
Modified: camel/branches/camel-2.10.x/camel-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/test/resources/log4j.properties?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/test/resources/log4j.properties (original)
+++ camel/branches/camel-2.10.x/camel-core/src/test/resources/log4j.properties Wed Sep 5 09:53:23 2012
@@ -33,6 +33,7 @@ log4j.logger.org.apache.camel.impl.Defau
#log4j.logger.org.apache.camel.component.seda=TRACE
#log4j.logger.org.apache.camel.component.file=TRACE
#log4j.logger.org.apache.camel.impl.DefaultUnitOfWork=TRACE
+#log4j.logger.org.apache.camel.impl.DefaultExecutorServiceManager=TRACE
#log4j.logger.org.apache.camel.component.mock=DEBUG
#log4j.logger.org.apache.camel.component.file=TRACE
#log4j.logger.org.apache.camel.processor.DefaultErrorHandler=TRACE
Modified: camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java (original)
+++ camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java Wed Sep 5 09:53:23 2012
@@ -222,7 +222,7 @@ public class SqsConsumer extends Schedul
protected void doShutdown() throws Exception {
super.doShutdown();
if (scheduledExecutor != null) {
- getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutor);
+ getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(scheduledExecutor);
scheduledExecutor = null;
}
}
Modified: camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java (original)
+++ camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java Wed Sep 5 09:53:23 2012
@@ -62,7 +62,7 @@ public class HazelcastSedaConsumer exten
@Override
protected void doStop() throws Exception {
if (executor != null) {
- endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+ endpoint.getCamelContext().getExecutorServiceManager().shutdown(executor);
executor = null;
}
super.doStop();
Modified: camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java (original)
+++ camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java Wed Sep 5 09:53:23 2012
@@ -148,7 +148,7 @@ public class HdfsProducer extends Defaul
protected void doStop() throws Exception {
super.doStop();
if (scheduler != null) {
- getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(scheduler);
+ getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(scheduler);
scheduler = null;
}
if (ostream != null) {
Modified: camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java (original)
+++ camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java Wed Sep 5 09:53:23 2012
@@ -405,7 +405,7 @@ public class JmsComponent extends Defaul
@Override
protected void doShutdown() throws Exception {
if (asyncStartStopExecutorService != null) {
- getCamelContext().getExecutorServiceManager().shutdownNow(asyncStartStopExecutorService);
+ getCamelContext().getExecutorServiceManager().shutdown(asyncStartStopExecutorService);
asyncStartStopExecutorService = null;
}
super.doShutdown();
Modified: camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java (original)
+++ camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java Wed Sep 5 09:53:23 2012
@@ -238,7 +238,7 @@ public abstract class ReplyManagerSuppor
// must also stop executor service
if (executorService != null) {
- camelContext.getExecutorServiceManager().shutdownNow(executorService);
+ camelContext.getExecutorServiceManager().shutdown(executorService);
executorService = null;
}
}
Modified: camel/branches/camel-2.10.x/components/camel-kestrel/src/main/java/org/apache/camel/component/kestrel/KestrelConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-kestrel/src/main/java/org/apache/camel/component/kestrel/KestrelConsumer.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-kestrel/src/main/java/org/apache/camel/component/kestrel/KestrelConsumer.java (original)
+++ camel/branches/camel-2.10.x/components/camel-kestrel/src/main/java/org/apache/camel/component/kestrel/KestrelConsumer.java Wed Sep 5 09:53:23 2012
@@ -86,10 +86,12 @@ public class KestrelConsumer extends Def
log.info("Stopping consumer for " + endpoint.getEndpointUri());
if (pollerExecutor != null) {
- endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(pollerExecutor);
+ endpoint.getCamelContext().getExecutorServiceManager().shutdown(pollerExecutor);
+ pollerExecutor = null;
}
if (handlerExecutor != null) {
- endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(handlerExecutor);
+ endpoint.getCamelContext().getExecutorServiceManager().shutdown(handlerExecutor);
+ handlerExecutor = null;
}
super.doStop();
Modified: camel/branches/camel-2.10.x/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java (original)
+++ camel/branches/camel-2.10.x/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java Wed Sep 5 09:53:23 2012
@@ -110,7 +110,7 @@ public class MinaEndpoint extends Defaul
protected void doShutdown() throws Exception {
// shutdown thread pools
for (ExecutorService executor : executors) {
- getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+ getCamelContext().getExecutorServiceManager().shutdown(executor);
}
executors.clear();
super.doShutdown();
Modified: camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java (original)
+++ camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java Wed Sep 5 09:53:23 2012
@@ -101,10 +101,12 @@ public class NettyConsumer extends Defau
// and then shutdown the thread pools
if (bossExecutor != null) {
- context.getExecutorServiceManager().shutdownNow(bossExecutor);
+ context.getExecutorServiceManager().shutdown(bossExecutor);
+ bossExecutor = null;
}
if (workerExecutor != null) {
- context.getExecutorServiceManager().shutdownNow(workerExecutor);
+ context.getExecutorServiceManager().shutdown(workerExecutor);
+ workerExecutor = null;
}
LOG.info("Netty consumer unbound from: " + configuration.getAddress());
Modified: camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original)
+++ camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Wed Sep 5 09:53:23 2012
@@ -128,10 +128,12 @@ public class NettyProducer extends Defau
// and then shutdown the thread pools
if (bossExecutor != null) {
- context.getExecutorServiceManager().shutdownNow(bossExecutor);
+ context.getExecutorServiceManager().shutdown(bossExecutor);
+ bossExecutor = null;
}
if (workerExecutor != null) {
- context.getExecutorServiceManager().shutdownNow(workerExecutor);
+ context.getExecutorServiceManager().shutdown(workerExecutor);
+ workerExecutor = null;
}
super.doStop();
Modified: camel/branches/camel-2.10.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java (original)
+++ camel/branches/camel-2.10.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java Wed Sep 5 09:53:23 2012
@@ -85,7 +85,7 @@ public class StreamConsumer extends Defa
// important: do not close the stream as it will close the standard
// system.in etc.
if (executor != null) {
- endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+ endpoint.getCamelContext().getExecutorServiceManager().shutdown(executor);
executor = null;
}
lines.clear();