You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/03/13 17:20:45 UTC
svn commit: r1300218 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/component/seda/
camel-core/src/main/java/org/apache/camel/model/
camel-core/src/main/java/org/apache/camel/processor/
camel-core/src/main/java/org/apache/camel/processor/...
Author: davsclaus
Date: Tue Mar 13 16:20:43 2012
New Revision: 1300218
URL: http://svn.apache.org/viewvc?rev=1300218&view=rev
Log:
CAMEL-5079: EIPs using thread pools will now eager shutdown thread pools if thread pool was created only for the EIP. This avoids leaks when adding and removing a lot of routes etc.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapExplicitThreadPoolTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapTest.java
- copied, changed from r1300065, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveContextScopedErrorHandlerTest.java
Removed:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithExecutorServiceRefTest.java
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateTimeoutWithExecutorServiceRefTest.java
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateTimeoutWithExecutorServiceRefTest.xml
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Tue Mar 13 16:20:43 2012
@@ -133,7 +133,7 @@ public class SedaEndpoint extends Defaul
}
// create multicast processor
multicastStarted = false;
- consumerMulticastProcessor = new MulticastProcessor(getCamelContext(), processors, null, true, multicastExecutor, false, false, 0, null, false);
+ consumerMulticastProcessor = new MulticastProcessor(getCamelContext(), processors, null, true, multicastExecutor, false, false, false, 0, null, false);
} else {
// not needed
consumerMulticastProcessor = null;
@@ -337,6 +337,11 @@ public class SedaEndpoint extends Defaul
if (getComponent() != null) {
getComponent().onShutdownEndpoint(this);
}
+ // shutdown thread pool if it was in use
+ if (multicastExecutor != null) {
+ getCamelContext().getExecutorServiceManager().shutdownNow(multicastExecutor);
+ multicastExecutor = null;
+ }
super.doShutdown();
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Tue Mar 13 16:20:43 2012
@@ -159,26 +159,41 @@ public class AggregateDefinition extends
Expression correlation = getExpression().createExpression(routeContext);
AggregationStrategy strategy = createAggregationStrategy(routeContext);
- executorService = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Aggregator", this, isParallelProcessing());
- if (executorService == null && !isParallelProcessing()) {
+ boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing());
+ ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Aggregator", this, isParallelProcessing());
+ if (threadPool == null && !isParallelProcessing()) {
// executor service is mandatory for the Aggregator
// we do not run in parallel mode, but use a synchronous executor, so we run in current thread
- executorService = new SynchronousExecutorService();
+ threadPool = new SynchronousExecutorService();
+ shutdownThreadPool = true;
}
-
- if (timeoutCheckerExecutorServiceRef != null && timeoutCheckerExecutorService == null) {
- timeoutCheckerExecutorService = getConfiguredScheduledExecutorService(routeContext);
- }
- AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(), processor, correlation, strategy, executorService);
+
+ AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(), processor,
+ correlation, strategy, threadPool, shutdownThreadPool);
AggregationRepository repository = createAggregationRepository(routeContext);
if (repository != null) {
answer.setAggregationRepository(repository);
}
-
- if (getTimeoutCheckerExecutorService() != null) {
- answer.setTimeoutCheckerExecutorService(timeoutCheckerExecutorService);
+
+ // this EIP supports using a shared timeout checker thread pool or fallback to create a new thread pool
+ boolean shutdownTimeoutThreadPool = false;
+ ScheduledExecutorService timeoutThreadPool = timeoutCheckerExecutorService;
+ if (timeoutThreadPool == null && timeoutCheckerExecutorServiceRef != null) {
+ // lookup existing thread pool
+ timeoutThreadPool = routeContext.getCamelContext().getRegistry().lookup(timeoutCheckerExecutorServiceRef, ScheduledExecutorService.class);
+ if (timeoutThreadPool == null) {
+ // then create a thread pool assuming the ref is a thread pool profile id
+ timeoutThreadPool = routeContext.getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this,
+ AggregateProcessor.AGGREGATE_TIMEOUT_CHECKER, timeoutCheckerExecutorServiceRef);
+ if (timeoutThreadPool == null) {
+ throw new IllegalArgumentException("ExecutorServiceRef " + timeoutCheckerExecutorServiceRef + " not found in registry or as a thread pool profile.");
+ }
+ shutdownTimeoutThreadPool = true;
+ }
}
+ answer.setTimeoutCheckerExecutorService(timeoutThreadPool);
+ answer.setShutdownTimeoutCheckerExecutorService(shutdownTimeoutThreadPool);
// set other options
answer.setParallelProcessing(isParallelProcessing());
@@ -225,28 +240,6 @@ public class AggregateDefinition extends
return answer;
}
- private ScheduledExecutorService getConfiguredScheduledExecutorService(RouteContext routeContext) {
- // TODO: maybe rather than this one-off method to support an executorService & scheduledExecutorService for the aggregator,
- // create ScheduledExecutorServiceAwareDefinition and the change other definitions that currently use ScheduledExecutorServices to
- // use that one instead of the more generic ExecutorServiceAwareDefinition
- ScheduledExecutorService answer = routeContext.getCamelContext().getRegistry().lookup(timeoutCheckerExecutorServiceRef, ScheduledExecutorService.class);
- if (answer == null) {
- ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
- // then create a thread pool assuming the ref is a thread pool profile id
- ThreadPoolProfile profile = manager.getThreadPoolProfile(timeoutCheckerExecutorServiceRef);
- if (profile != null) {
- // okay we need to grab the pool size from the ref
- Integer poolSize = profile.getPoolSize();
- if (poolSize == null) {
- // fallback and use the default pool size, if none was set on the profile
- poolSize = manager.getDefaultThreadPoolProfile().getPoolSize();
- }
- answer = manager.newScheduledThreadPool(this, "Aggregator", poolSize);
- }
- }
- return answer;
- }
-
@Override
protected void configureChild(ProcessorDefinition<?> output) {
if (expression != null && expression instanceof ExpressionClause) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java Tue Mar 13 16:20:43 2012
@@ -78,9 +78,10 @@ public class DelayDefinition extends Exp
Processor childProcessor = this.createChildProcessor(routeContext, false);
Expression delay = createAbsoluteTimeDelayExpression(routeContext);
- ScheduledExecutorService scheduled = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Delay", this, isAsyncDelayed());
+ boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isAsyncDelayed());
+ ScheduledExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Delay", this, isAsyncDelayed());
- Delayer answer = new Delayer(childProcessor, delay, scheduled);
+ Delayer answer = new Delayer(routeContext.getCamelContext(), childProcessor, delay, threadPool, shutdownThreadPool);
if (getAsyncDelayed() != null) {
answer.setAsyncDelayed(getAsyncDelayed());
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java Tue Mar 13 16:20:43 2012
@@ -216,7 +216,8 @@ public class MulticastDefinition extends
aggregationStrategy = new UseLatestAggregationStrategy();
}
- executorService = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Multicast", this, isParallelProcessing());
+ boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing());
+ ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Multicast", this, isParallelProcessing());
long timeout = getTimeout() != null ? getTimeout() : 0;
if (timeout > 0 && !isParallelProcessing()) {
@@ -227,7 +228,7 @@ public class MulticastDefinition extends
}
MulticastProcessor answer = new MulticastProcessor(routeContext.getCamelContext(), list, aggregationStrategy, isParallelProcessing(),
- executorService, isStreaming(), isStopOnException(), timeout, onPrepare, isShareUnitOfWork());
+ threadPool, shutdownThreadPool, isStreaming(), isStopOnException(), timeout, onPrepare, isShareUnitOfWork());
if (isShareUnitOfWork()) {
// wrap answer in a sub unit of work, since we share the unit of work
return new SubUnitOfWorkProcessor(answer);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java Tue Mar 13 16:20:43 2012
@@ -130,12 +130,13 @@ public class OnCompletionDefinition exte
}
// executor service is mandatory for on completion
- executorService = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "OnCompletion", this, true);
+ boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true);
+ ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "OnCompletion", this, true);
// should be false by default
boolean original = getUseOriginalMessagePolicy() != null ? getUseOriginalMessagePolicy() : false;
OnCompletionProcessor answer = new OnCompletionProcessor(routeContext.getCamelContext(), childProcessor,
- executorService, isOnCompleteOnly(), isOnFailureOnly(), when, original);
+ threadPool, shutdownThreadPool, isOnCompleteOnly(), isOnFailureOnly(), when, original);
return answer;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java Tue Mar 13 16:20:43 2012
@@ -206,6 +206,69 @@ public final class ProcessorDefinitionHe
}
/**
+ * Determines whether a new thread pool will be created or not.
+ * <p/>
+ * This is used to know if a new thread pool will be created, and therefore is not shared by others, and therefore
+ * exclusive to the definition.
+ *
+ * @param routeContext the route context
+ * @param definition the node definition which may leverage executor service.
+ * @param useDefault whether to fallback and use a default thread pool, if no explicit configured
+ * @return <tt>true</tt> if a new thread pool will be created, <tt>false</tt> if not
+ * @see #getConfiguredExecutorService(org.apache.camel.spi.RouteContext, String, ExecutorServiceAwareDefinition, boolean)
+ */
+ public static boolean willCreateNewThreadPool(RouteContext routeContext, ExecutorServiceAwareDefinition<?> definition, boolean useDefault) {
+ ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
+ ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext());
+
+ if (definition.getExecutorService() != null) {
+ // no there is a custom thread pool configured
+ return false;
+ } else if (definition.getExecutorServiceRef() != null) {
+ ExecutorService answer = routeContext.getCamelContext().getRegistry().lookup(definition.getExecutorServiceRef(), ExecutorService.class);
+ // if no existing thread pool, then we will have to create a new thread pool
+ return answer == null;
+ } else if (useDefault) {
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Will lookup in {@link org.apache.camel.spi.Registry} for a {@link ExecutorService} registered with the given
+ * <tt>executorServiceRef</tt> name.
+ * <p/>
+ * This method will lookup for configured thread pool in the following order
+ * <ul>
+ * <li>from the {@link org.apache.camel.spi.Registry} if found</li>
+ * <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.</li>
+ * <li>if none found, then <tt>null</tt> is returned.</li>
+ * </ul>
+ * @param routeContext the route context
+ * @param name name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService}
+ * is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}.
+ * @param source the source to use the thread pool
+ * @param executorServiceRef reference name of the thread pool
+ * @return the executor service, or <tt>null</tt> if none was found.
+ */
+ public static ExecutorService lookupExecutorServiceRef(RouteContext routeContext, String name,
+ Object source, String executorServiceRef) {
+
+ ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
+ ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext());
+ ObjectHelper.notNull(executorServiceRef, "executorServiceRef");
+
+ // lookup in registry first and use existing thread pool if exists
+ ExecutorService answer = routeContext.getCamelContext().getRegistry().lookup(executorServiceRef, ExecutorService.class);
+ if (answer == null) {
+ // then create a thread pool assuming the ref is a thread pool profile id
+ answer = manager.newThreadPool(source, name, executorServiceRef);
+ }
+ return answer;
+ }
+
+ /**
* Will lookup and get the configured {@link java.util.concurrent.ExecutorService} from the given definition.
* <p/>
* This method will lookup for configured thread pool in the following order
@@ -237,15 +300,10 @@ public final class ProcessorDefinitionHe
return definition.getExecutorService();
} else if (definition.getExecutorServiceRef() != null) {
// lookup in registry first and use existing thread pool if exists
- ExecutorService answer = routeContext.getCamelContext().getRegistry().lookup(definition.getExecutorServiceRef(), ExecutorService.class);
- if (answer == null) {
- // then create a thread pool assuming the ref is a thread pool profile id
- answer = manager.newThreadPool(definition, name, definition.getExecutorServiceRef());
- }
+ ExecutorService answer = lookupExecutorServiceRef(routeContext, name, definition, definition.getExecutorServiceRef());
if (answer == null) {
throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " not found in registry or as a thread pool profile.");
}
- return answer;
} else if (useDefault) {
return manager.newDefaultThreadPool(definition, name);
}
@@ -254,6 +312,39 @@ public final class ProcessorDefinitionHe
}
/**
+ * Will lookup in {@link org.apache.camel.spi.Registry} for a {@link ScheduledExecutorService} registered with the given
+ * <tt>executorServiceRef</tt> name.
+ * <p/>
+ * This method will lookup for configured thread pool in the following order
+ * <ul>
+ * <li>from the {@link org.apache.camel.spi.Registry} if found</li>
+ * <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.</li>
+ * <li>if none found, then <tt>null</tt> is returned.</li>
+ * </ul>
+ * @param routeContext the route context
+ * @param name name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService}
+ * is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}.
+ * @param source the source to use the thread pool
+ * @param executorServiceRef reference name of the thread pool
+ * @return the executor service, or <tt>null</tt> if none was found.
+ */
+ public static ScheduledExecutorService lookupScheduledExecutorServiceRef(RouteContext routeContext, String name,
+ Object source, String executorServiceRef) {
+
+ ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
+ ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext());
+ ObjectHelper.notNull(executorServiceRef, "executorServiceRef");
+
+ // lookup in registry first and use existing thread pool if exists
+ ScheduledExecutorService answer = routeContext.getCamelContext().getRegistry().lookup(executorServiceRef, ScheduledExecutorService.class);
+ if (answer == null) {
+ // then create a thread pool assuming the ref is a thread pool profile id
+ answer = manager.newScheduledThreadPool(source, name, executorServiceRef);
+ }
+ return answer;
+ }
+
+ /**
* Will lookup and get the configured {@link java.util.concurrent.ScheduledExecutorService} from the given definition.
* <p/>
* This method will lookup for configured thread pool in the following order
@@ -289,14 +380,7 @@ public final class ProcessorDefinitionHe
}
throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " is not an ScheduledExecutorService instance");
} else if (definition.getExecutorServiceRef() != null) {
- ScheduledExecutorService answer = routeContext.getCamelContext().getRegistry().lookup(definition.getExecutorServiceRef(), ScheduledExecutorService.class);
- if (answer == null) {
- // then create a thread pool assuming the ref is a thread pool profile id
- ThreadPoolProfile profile = manager.getThreadPoolProfile(definition.getExecutorServiceRef());
- if (profile != null) {
- answer = manager.newScheduledThreadPool(definition, name, profile);
- }
- }
+ ScheduledExecutorService answer = lookupScheduledExecutorServiceRef(routeContext, name, definition, definition.getExecutorServiceRef());
if (answer == null) {
throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " not found in registry or as a thread pool profile.");
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java Tue Mar 13 16:20:43 2012
@@ -128,8 +128,10 @@ public class RecipientListDefinition<Typ
answer.setTimeout(getTimeout());
}
- executorService = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "RecipientList", this, isParallelProcessing());
- answer.setExecutorService(executorService);
+ boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing());
+ ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "RecipientList", this, isParallelProcessing());
+ answer.setExecutorService(threadPool);
+ answer.setShutdownExecutorService(shutdownThreadPool);
long timeout = getTimeout() != null ? getTimeout() : 0;
if (timeout > 0 && !isParallelProcessing()) {
throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled.");
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java Tue Mar 13 16:20:43 2012
@@ -94,7 +94,9 @@ public class SplitDefinition extends Exp
public Processor createProcessor(RouteContext routeContext) throws Exception {
Processor childProcessor = this.createChildProcessor(routeContext, true);
aggregationStrategy = createAggregationStrategy(routeContext);
- executorService = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Split", this, isParallelProcessing());
+
+ boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing());
+ ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Split", this, isParallelProcessing());
long timeout = getTimeout() != null ? getTimeout() : 0;
if (timeout > 0 && !isParallelProcessing()) {
@@ -107,7 +109,7 @@ public class SplitDefinition extends Exp
Expression exp = getExpression().createExpression(routeContext);
Splitter answer = new Splitter(routeContext.getCamelContext(), exp, childProcessor, aggregationStrategy,
- isParallelProcessing(), executorService, isStreaming(), isStopOnException(),
+ isParallelProcessing(), threadPool, shutdownThreadPool, isStreaming(), isStopOnException(),
timeout, onPrepare, isShareUnitOfWork());
if (isShareUnitOfWork()) {
// wrap answer in a sub unit of work, since we share the unit of work
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java Tue Mar 13 16:20:43 2012
@@ -80,9 +80,10 @@ public class ThreadsDefinition extends O
// the threads name
String name = getThreadName() != null ? getThreadName() : "Threads";
// prefer any explicit configured executor service
- executorService = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, name, this, false);
+ boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true);
+ ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, name, this, false);
// if no explicit then create from the options
- if (executorService == null) {
+ if (threadPool == null) {
ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
// create the thread pool using a builder
ThreadPoolProfile profile = new ThreadPoolProfileBuilder(name)
@@ -92,10 +93,11 @@ public class ThreadsDefinition extends O
.maxQueueSize(getMaxQueueSize())
.rejectedPolicy(getRejectedPolicy())
.build();
- executorService = manager.newThreadPool(this, name, profile);
+ threadPool = manager.newThreadPool(this, name, profile);
+ shutdownThreadPool = true;
}
- ThreadsProcessor thread = new ThreadsProcessor(routeContext.getCamelContext(), executorService);
+ ThreadsProcessor thread = new ThreadsProcessor(routeContext.getCamelContext(), threadPool, shutdownThreadPool);
if (getCallerRunsWhenRejected() == null) {
// should be true by default
thread.setCallerRunsWhenRejected(true);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java Tue Mar 13 16:20:43 2012
@@ -83,13 +83,14 @@ public class ThrottleDefinition extends
public Processor createProcessor(RouteContext routeContext) throws Exception {
Processor childProcessor = this.createChildProcessor(routeContext, true);
- ScheduledExecutorService scheduled = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Throttle", this, isAsyncDelayed());
+ boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isAsyncDelayed());
+ ScheduledExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Throttle", this, isAsyncDelayed());
// should be default 1000 millis
long period = getTimePeriodMillis() != null ? getTimePeriodMillis() : 1000L;
Expression maxRequestsExpression = createMaxRequestsPerPeriodExpression(routeContext);
- Throttler answer = new Throttler(childProcessor, maxRequestsExpression, period, scheduled);
+ Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool);
if (getAsyncDelayed() != null) {
answer.setAsyncDelayed(getAsyncDelayed());
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java Tue Mar 13 16:20:43 2012
@@ -83,9 +83,9 @@ public class WireTapDefinition<Type exte
Endpoint endpoint = resolveEndpoint(routeContext);
// executor service is mandatory for wire tap
- executorService = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "WireTap", this, true);
-
- WireTapProcessor answer = new WireTapProcessor(endpoint, getPattern(), executorService);
+ boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true);
+ ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "WireTap", this, true);
+ WireTapProcessor answer = new WireTapProcessor(endpoint, getPattern(), threadPool, shutdownThreadPool);
answer.setCopy(isCopy());
if (newExchangeProcessorRef != null) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java Tue Mar 13 16:20:43 2012
@@ -21,6 +21,7 @@ import java.util.concurrent.ScheduledExe
import java.util.concurrent.TimeUnit;
import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.util.ObjectHelper;
@@ -37,7 +38,9 @@ import org.slf4j.LoggerFactory;
*/
public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
protected final transient Logger log = LoggerFactory.getLogger(getClass());
+ private final CamelContext camelContext;
private final ScheduledExecutorService executorService;
+ private final boolean shutdownExecutorService;
private boolean asyncDelayed;
private boolean callerRunsWhenRejected = true;
@@ -72,13 +75,15 @@ public abstract class DelayProcessorSupp
}
}
- public DelayProcessorSupport(Processor processor) {
- this(processor, null);
+ public DelayProcessorSupport(CamelContext camelContext, Processor processor) {
+ this(camelContext, processor, null, false);
}
- public DelayProcessorSupport(Processor processor, ScheduledExecutorService executorService) {
+ public DelayProcessorSupport(CamelContext camelContext, Processor processor, ScheduledExecutorService executorService, boolean shutdownExecutorService) {
super(processor);
+ this.camelContext = camelContext;
this.executorService = executorService;
+ this.shutdownExecutorService = shutdownExecutorService;
}
@Override
@@ -216,4 +221,12 @@ public abstract class DelayProcessorSupp
}
super.doStart();
}
+
+ @Override
+ protected void doShutdown() throws Exception {
+ if (shutdownExecutorService && executorService != null) {
+ camelContext.getExecutorServiceManager().shutdownNow(executorService);
+ }
+ super.doShutdown();
+ }
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java Tue Mar 13 16:20:43 2012
@@ -18,6 +18,7 @@ package org.apache.camel.processor;
import java.util.concurrent.ScheduledExecutorService;
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Processor;
@@ -36,8 +37,9 @@ public class Delayer extends DelayProces
private Expression delay;
private long delayValue;
- public Delayer(Processor processor, Expression delay, ScheduledExecutorService executorService) {
- super(processor, executorService);
+ public Delayer(CamelContext camelContext, Processor processor, Expression delay,
+ ScheduledExecutorService executorService, boolean shutdownExecutorService) {
+ super(camelContext, processor, executorService, shutdownExecutorService);
this.delay = delay;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Tue Mar 13 16:20:43 2012
@@ -147,6 +147,7 @@ public class MulticastProcessor extends
private final boolean streaming;
private final boolean stopOnException;
private final ExecutorService executorService;
+ private final boolean shutdownExecutorService;
private ExecutorService aggregateExecutorService;
private final long timeout;
private final ConcurrentMap<PreparedErrorHandler, Processor> errorHandlers = new ConcurrentHashMap<PreparedErrorHandler, Processor>();
@@ -157,18 +158,18 @@ public class MulticastProcessor extends
}
public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy) {
- this(camelContext, processors, aggregationStrategy, false, null, false, false, 0, null, false);
+ this(camelContext, processors, aggregationStrategy, false, null, false, false, false, 0, null, false);
}
public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy,
- boolean parallelProcessing, ExecutorService executorService, boolean streaming,
- boolean stopOnException, long timeout, Processor onPrepare,
- boolean shareUnitOfWork) {
+ boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
+ boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork) {
notNull(camelContext, "camelContext");
this.camelContext = camelContext;
this.processors = processors;
this.aggregationStrategy = aggregationStrategy;
this.executorService = executorService;
+ this.shutdownExecutorService = shutdownExecutorService;
this.streaming = streaming;
this.stopOnException = stopOnException;
// must enable parallel if executor service is provided
@@ -953,6 +954,10 @@ public class MulticastProcessor extends
ServiceHelper.stopAndShutdownServices(processors, errorHandlers);
// only clear error handlers when shutting down
errorHandlers.clear();
+
+ if (shutdownExecutorService && executorService != null) {
+ getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+ }
}
protected static void setToEndpoint(Exchange exchange, Processor processor) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java Tue Mar 13 16:20:43 2012
@@ -45,12 +45,13 @@ public class OnCompletionProcessor exten
private final CamelContext camelContext;
private final Processor processor;
private final ExecutorService executorService;
+ private final boolean shutdownExecutorService;
private final boolean onCompleteOnly;
private final boolean onFailureOnly;
private final Predicate onWhen;
private final boolean useOriginalBody;
- public OnCompletionProcessor(CamelContext camelContext, Processor processor, ExecutorService executorService,
+ public OnCompletionProcessor(CamelContext camelContext, Processor processor, ExecutorService executorService, boolean shutdownExecutorService,
boolean onCompleteOnly, boolean onFailureOnly, Predicate onWhen, boolean useOriginalBody) {
notNull(camelContext, "camelContext");
notNull(processor, "processor");
@@ -58,6 +59,7 @@ public class OnCompletionProcessor exten
// wrap processor in UnitOfWork so what we send out runs in a UoW
this.processor = new UnitOfWorkProcessor(processor);
this.executorService = executorService;
+ this.shutdownExecutorService = shutdownExecutorService;
this.onCompleteOnly = onCompleteOnly;
this.onFailureOnly = onFailureOnly;
this.onWhen = onWhen;
@@ -77,6 +79,9 @@ public class OnCompletionProcessor exten
@Override
protected void doShutdown() throws Exception {
ServiceHelper.stopAndShutdownService(processor);
+ if (shutdownExecutorService) {
+ getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+ }
}
public CamelContext getCamelContext() {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java Tue Mar 13 16:20:43 2012
@@ -58,6 +58,7 @@ public class RecipientList extends Servi
private Processor onPrepare;
private boolean shareUnitOfWork;
private ExecutorService executorService;
+ private boolean shutdownExecutorService;
private ExecutorService aggregateExecutorService;
private AggregationStrategy aggregationStrategy = new UseLatestAggregationStrategy();
@@ -118,8 +119,8 @@ public class RecipientList extends Servi
Iterator<Object> iter = ObjectHelper.createIterator(recipientList, delimiter);
RecipientListProcessor rlp = new RecipientListProcessor(exchange.getContext(), producerCache, iter, getAggregationStrategy(),
- isParallelProcessing(), getExecutorService(), isStreaming(),
- isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork()) {
+ isParallelProcessing(), getExecutorService(), isShutdownExecutorService(),
+ isStreaming(), isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork()) {
@Override
protected synchronized ExecutorService createAggregateExecutorService(String name) {
// use a shared executor service to avoid creating new thread pools
@@ -217,6 +218,14 @@ public class RecipientList extends Servi
this.executorService = executorService;
}
+ public boolean isShutdownExecutorService() {
+ return shutdownExecutorService;
+ }
+
+ public void setShutdownExecutorService(boolean shutdownExecutorService) {
+ this.shutdownExecutorService = shutdownExecutorService;
+ }
+
public AggregationStrategy getAggregationStrategy() {
return aggregationStrategy;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java Tue Mar 13 16:20:43 2012
@@ -128,9 +128,10 @@ public class RecipientListProcessor exte
}
public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy,
- boolean parallelProcessing, ExecutorService executorService, boolean streaming, boolean stopOnException, long timeout,
- Processor onPrepare, boolean shareUnitOfWork) {
- super(camelContext, null, aggregationStrategy, parallelProcessing, executorService, streaming, stopOnException, timeout, onPrepare, shareUnitOfWork);
+ boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
+ boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork) {
+ super(camelContext, null, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService,
+ streaming, stopOnException, timeout, onPrepare, shareUnitOfWork);
this.producerCache = producerCache;
this.iter = iter;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java Tue Mar 13 16:20:43 2012
@@ -58,15 +58,14 @@ public class Splitter extends MulticastP
private final Expression expression;
public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy) {
- this(camelContext, expression, destination, aggregationStrategy, false, null, false, false, 0, null, false);
+ this(camelContext, expression, destination, aggregationStrategy, false, null, false, false, false, 0, null, false);
}
public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy,
- boolean parallelProcessing, ExecutorService executorService, boolean streaming, boolean stopOnException, long timeout,
- Processor onPrepare, boolean useSubUnitOfWork) {
+ boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
+ boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean useSubUnitOfWork) {
super(camelContext, Collections.singleton(destination), aggregationStrategy, parallelProcessing, executorService,
- streaming, stopOnException, timeout, onPrepare, useSubUnitOfWork);
-
+ shutdownExecutorService, streaming, stopOnException, timeout, onPrepare, useSubUnitOfWork);
this.expression = expression;
notNull(expression, "expression");
notNull(destination, "destination");
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java Tue Mar 13 16:20:43 2012
@@ -53,7 +53,9 @@ import org.slf4j.LoggerFactory;
public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor {
private static final Logger LOG = LoggerFactory.getLogger(ThreadsProcessor.class);
+ private final CamelContext camelContext;
private final ExecutorService executorService;
+ private volatile boolean shutdownExecutorService;
private final AtomicBoolean shutdown = new AtomicBoolean(true);
private boolean callerRunsWhenRejected = true;
private ThreadPoolRejectedPolicy rejectedPolicy;
@@ -101,10 +103,12 @@ public class ThreadsProcessor extends Se
}
}
- public ThreadsProcessor(CamelContext camelContext, ExecutorService executorService) {
+ public ThreadsProcessor(CamelContext camelContext, ExecutorService executorService, boolean shutdownExecutorService) {
ObjectHelper.notNull(camelContext, "camelContext");
ObjectHelper.notNull(executorService, "executorService");
+ this.camelContext = camelContext;
this.executorService = executorService;
+ this.shutdownExecutorService = shutdownExecutorService;
}
public void process(final Exchange exchange) throws Exception {
@@ -164,4 +168,12 @@ public class ThreadsProcessor extends Se
protected void doStop() throws Exception {
shutdown.set(true);
}
+
+ protected void doShutdown() throws Exception {
+ if (shutdownExecutorService) {
+ camelContext.getExecutorServiceManager().shutdownNow(executorService);
+ }
+ super.doShutdown();
+ }
+
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java Tue Mar 13 16:20:43 2012
@@ -18,6 +18,7 @@ package org.apache.camel.processor;
import java.util.concurrent.ScheduledExecutorService;
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Processor;
@@ -41,8 +42,9 @@ public class Throttler extends DelayProc
private long timePeriodMillis = 1000;
private volatile TimeSlot slot;
- public Throttler(Processor processor, Expression maxRequestsPerPeriodExpression, long timePeriodMillis, ScheduledExecutorService executorService) {
- super(processor, executorService);
+ public Throttler(CamelContext camelContext, Processor processor, Expression maxRequestsPerPeriodExpression, long timePeriodMillis,
+ ScheduledExecutorService executorService, boolean shutdownExecutorService) {
+ super(camelContext, processor, executorService, shutdownExecutorService);
ObjectHelper.notNull(maxRequestsPerPeriodExpression, "maxRequestsPerPeriodExpression");
this.maxRequestsPerPeriodExpression = maxRequestsPerPeriodExpression;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java Tue Mar 13 16:20:43 2012
@@ -40,6 +40,7 @@ import org.apache.camel.util.ObjectHelpe
*/
public class WireTapProcessor extends SendProcessor {
private final ExecutorService executorService;
+ private volatile boolean shutdownExecutorService;
// expression or processor used for populating a new exchange to send
// as opposed to traditional wiretap that sends a copy of the original exchange
@@ -48,16 +49,18 @@ public class WireTapProcessor extends Se
private boolean copy;
private Processor onPrepare;
- public WireTapProcessor(Endpoint destination, ExecutorService executorService) {
+ public WireTapProcessor(Endpoint destination, ExecutorService executorService, boolean shutdownExecutorService) {
super(destination);
ObjectHelper.notNull(executorService, "executorService");
this.executorService = executorService;
+ this.shutdownExecutorService = shutdownExecutorService;
}
- public WireTapProcessor(Endpoint destination, ExchangePattern pattern, ExecutorService executorService) {
+ public WireTapProcessor(Endpoint destination, ExchangePattern pattern, ExecutorService executorService, boolean shutdownExecutorService) {
super(destination, pattern);
ObjectHelper.notNull(executorService, "executorService");
this.executorService = executorService;
+ this.shutdownExecutorService = shutdownExecutorService;
}
@Override
@@ -175,6 +178,13 @@ public class WireTapProcessor extends Se
return new DefaultExchange(exchange.getFromEndpoint(), ExchangePattern.InOnly);
}
+ protected void doShutdown() throws Exception {
+ if (shutdownExecutorService) {
+ getDestination().getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+ }
+ super.doShutdown();
+ }
+
public List<Processor> getNewExchangeProcessors() {
return newExchangeProcessors;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Tue Mar 13 16:20:43 2012
@@ -87,7 +87,9 @@ public class AggregateProcessor extends
private final AggregationStrategy aggregationStrategy;
private final Expression correlationExpression;
private final ExecutorService executorService;
- private ScheduledExecutorService timeoutCheckerExecutorService;
+ private final boolean shutdownExecutorService;
+ private ScheduledExecutorService timeoutCheckerExecutorService;
+ private boolean shutdownTimeoutCheckerExecutorService;
private ScheduledExecutorService recoverService;
// store correlation key -> exchange id in timeout map
private TimeoutMap<String, String> timeoutMap;
@@ -125,7 +127,7 @@ public class AggregateProcessor extends
public AggregateProcessor(CamelContext camelContext, Processor processor,
Expression correlationExpression, AggregationStrategy aggregationStrategy,
- ExecutorService executorService) {
+ ExecutorService executorService, boolean shutdownExecutorService) {
ObjectHelper.notNull(camelContext, "camelContext");
ObjectHelper.notNull(processor, "processor");
ObjectHelper.notNull(correlationExpression, "correlationExpression");
@@ -136,6 +138,7 @@ public class AggregateProcessor extends
this.correlationExpression = correlationExpression;
this.aggregationStrategy = aggregationStrategy;
this.executorService = executorService;
+ this.shutdownExecutorService = shutdownExecutorService;
}
@Override
@@ -583,7 +586,15 @@ public class AggregateProcessor extends
public ScheduledExecutorService getTimeoutCheckerExecutorService() {
return timeoutCheckerExecutorService;
}
-
+
+ public boolean isShutdownTimeoutCheckerExecutorService() {
+ return shutdownTimeoutCheckerExecutorService;
+ }
+
+ public void setShutdownTimeoutCheckerExecutorService(boolean shutdownTimeoutCheckerExecutorService) {
+ this.shutdownTimeoutCheckerExecutorService = shutdownTimeoutCheckerExecutorService;
+ }
+
/**
* On completion task which keeps the booking of the in progress up to date
*/
@@ -858,6 +869,7 @@ public class AggregateProcessor extends
LOG.info("Using CompletionInterval to run every " + getCompletionInterval() + " millis.");
if (getTimeoutCheckerExecutorService() == null) {
setTimeoutCheckerExecutorService(camelContext.getExecutorServiceManager().newScheduledThreadPool(this, AGGREGATE_TIMEOUT_CHECKER, 1));
+ shutdownTimeoutCheckerExecutorService = true;
}
// trigger completion based on interval
getTimeoutCheckerExecutorService().scheduleAtFixedRate(new AggregationIntervalTask(), getCompletionInterval(), getCompletionInterval(), TimeUnit.MILLISECONDS);
@@ -868,6 +880,7 @@ public class AggregateProcessor extends
LOG.info("Using CompletionTimeout to trigger after " + getCompletionTimeout() + " millis of inactivity.");
if (getTimeoutCheckerExecutorService() == null) {
setTimeoutCheckerExecutorService(camelContext.getExecutorServiceManager().newScheduledThreadPool(this, AGGREGATE_TIMEOUT_CHECKER, 1));
+ shutdownTimeoutCheckerExecutorService = true;
}
// check for timed out aggregated messages once every second
timeoutMap = new AggregationTimeoutMap(getTimeoutCheckerExecutorService(), 1000L);
@@ -937,6 +950,14 @@ public class AggregateProcessor extends
// cleanup when shutting down
inProgressCompleteExchanges.clear();
+ if (shutdownExecutorService) {
+ camelContext.getExecutorServiceManager().shutdownNow(executorService);
+ }
+ if (shutdownTimeoutCheckerExecutorService) {
+ camelContext.getExecutorServiceManager().shutdownNow(timeoutCheckerExecutorService);
+ timeoutCheckerExecutorService = null;
+ }
+
super.doShutdown();
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java Tue Mar 13 16:20:43 2012
@@ -16,6 +16,7 @@
*/
package org.apache.camel.processor.interceptor;
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.model.ProcessorDefinition;
@@ -29,8 +30,8 @@ public class DelayInterceptor extends De
private final ProcessorDefinition<?> node;
private Delayer delayer;
- public DelayInterceptor(ProcessorDefinition<?> node, Processor target, Delayer delayer) {
- super(target);
+ public DelayInterceptor(CamelContext camelContext, ProcessorDefinition<?> node, Processor target, Delayer delayer) {
+ super(camelContext, target);
this.node = node;
this.delayer = delayer;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java Tue Mar 13 16:20:43 2012
@@ -28,7 +28,7 @@ import org.apache.camel.spi.InterceptStr
*/
public class Delayer implements InterceptStrategy {
- private boolean enabled = true;
+ private volatile boolean enabled = true;
private final long delay;
public Delayer(long delay) {
@@ -53,7 +53,7 @@ public class Delayer implements Intercep
public Processor wrapProcessorInInterceptors(CamelContext context, ProcessorDefinition<?> definition,
Processor target, Processor nextTarget) throws Exception {
- return new DelayInterceptor(definition, target, this);
+ return new DelayInterceptor(context, definition, target, this);
}
public boolean isEnabled() {
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapExplicitThreadPoolTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapExplicitThreadPoolTest.java?rev=1300218&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapExplicitThreadPoolTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapExplicitThreadPoolTest.java Tue Mar 13 16:20:43 2012
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version
+ */
+public class ManagedRouteRemoveWireTapExplicitThreadPoolTest extends ManagementTestSupport {
+
+ private ExecutorService myThreadPool;
+
+ public void testRemove() throws Exception {
+ MBeanServer mbeanServer = getMBeanServer();
+ ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"foo\"");
+
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+ getMockEndpoint("mock:tap").expectedMessageCount(1);
+
+ template.sendBody("seda:foo", "Hello World");
+
+ assertMockEndpointsSatisfied();
+
+ // should be started
+ String state = (String) mbeanServer.getAttribute(on, "State");
+ assertEquals("Should be started", ServiceStatus.Started.name(), state);
+
+ // and no wire tap thread pool as we use an existing external pool
+ Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=threadpools,*"), null);
+ boolean wireTap = false;
+ for (ObjectName name : set) {
+ if (name.toString().contains("wireTap")) {
+ wireTap = true;
+ break;
+ }
+ }
+ assertFalse("Should not have a wire tap thread pool", wireTap);
+
+ // stop
+ mbeanServer.invoke(on, "stop", null, null);
+
+ state = (String) mbeanServer.getAttribute(on, "State");
+ assertEquals("Should be stopped", ServiceStatus.Stopped.name(), state);
+
+ // remove
+ mbeanServer.invoke(on, "remove", null, null);
+
+ // should not be registered anymore
+ boolean registered = mbeanServer.isRegistered(on);
+ assertFalse("Route mbean should have been unregistered", registered);
+
+ // and no wire tap thread pool as we use an existing external pool
+ set = mbeanServer.queryNames(new ObjectName("*:type=threadpools,*"), null);
+ wireTap = false;
+ for (ObjectName name : set) {
+ if (name.toString().contains("wireTap")) {
+ wireTap = true;
+ break;
+ }
+ }
+ assertFalse("Should not have a wire tap thread pool", wireTap);
+
+ // should not be shutdown
+ assertFalse("Thread pool should not be shutdown", myThreadPool.isShutdown());
+
+ myThreadPool.shutdownNow();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // create a new thread pool to use for wire tap
+ myThreadPool = Executors.newFixedThreadPool(1);
+
+ from("seda:foo").routeId("foo").wireTap("direct:tap").executorService(myThreadPool).to("mock:result");
+ from("direct:tap").routeId("tap").to("mock:tap");
+ }
+ };
+ }
+
+}
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapTest.java (from r1300065, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveContextScopedErrorHandlerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveContextScopedErrorHandlerTest.java&r1=1300065&r2=1300218&rev=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveContextScopedErrorHandlerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapTest.java Tue Mar 13 16:20:43 2012
@@ -22,19 +22,18 @@ import javax.management.ObjectName;
import org.apache.camel.ServiceStatus;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
/**
* @version
*/
-public class ManagedRouteRemoveContextScopedErrorHandlerTest extends ManagementTestSupport {
+public class ManagedRouteRemoveWireTapTest extends ManagementTestSupport {
public void testRemove() throws Exception {
MBeanServer mbeanServer = getMBeanServer();
- ObjectName on = getRouteObjectName(mbeanServer);
+ ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"foo\"");
- MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+ getMockEndpoint("mock:tap").expectedMessageCount(1);
template.sendBody("seda:foo", "Hello World");
@@ -44,9 +43,16 @@ public class ManagedRouteRemoveContextSc
String state = (String) mbeanServer.getAttribute(on, "State");
assertEquals("Should be started", ServiceStatus.Started.name(), state);
- // and one context scoped error handler
- Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=errorhandlers,*"), null);
- assertEquals(1, set.size());
+ // and a number of thread pools
+ Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=threadpools,*"), null);
+ boolean wireTap = false;
+ for (ObjectName name : set) {
+ if (name.toString().contains("wireTap")) {
+ wireTap = true;
+ break;
+ }
+ }
+ assertTrue("Should have a wire tap thread pool", wireTap);
// stop
mbeanServer.invoke(on, "stop", null, null);
@@ -61,20 +67,16 @@ public class ManagedRouteRemoveContextSc
boolean registered = mbeanServer.isRegistered(on);
assertFalse("Route mbean should have been unregistered", registered);
- // and no more routes
- set = mbeanServer.queryNames(new ObjectName("*:type=routes,*"), null);
- assertEquals(0, set.size());
-
- // but still 1 context scoped error handler
- set = mbeanServer.queryNames(new ObjectName("*:type=errorhandlers,*"), null);
- assertEquals(1, set.size());
- }
-
- static ObjectName getRouteObjectName(MBeanServer mbeanServer) throws Exception {
- Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=routes,*"), null);
- assertEquals(1, set.size());
-
- return set.iterator().next();
+ // and a thread pool less
+ set = mbeanServer.queryNames(new ObjectName("*:type=threadpools,*"), null);
+ wireTap = false;
+ for (ObjectName name : set) {
+ if (name.toString().contains("wireTap")) {
+ wireTap = true;
+ break;
+ }
+ }
+ assertFalse("Should not have a wire tap thread pool", wireTap);
}
@Override
@@ -82,11 +84,9 @@ public class ManagedRouteRemoveContextSc
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- // context scoped error handler
- errorHandler(deadLetterChannel("mock:dead"));
+ from("seda:foo").routeId("foo").wireTap("direct:tap").to("mock:result");
- // which this route will use
- from("seda:foo").to("mock:result");
+ from("direct:tap").routeId("tap").to("mock:tap");
}
};
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java Tue Mar 13 16:20:43 2012
@@ -75,7 +75,7 @@ public class ThrottlerTest extends Conte
}
public void testTimeSlotCalculus() throws Exception {
- Throttler throttler = new Throttler(null, constant(3), 1000, null);
+ Throttler throttler = new Throttler(context, null, constant(3), 1000, null, false);
// calculate will assign a new slot
throttler.calculateDelay(new DefaultExchange(context));
TimeSlot slot = throttler.nextSlot();
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Tue Mar 13 16:20:43 2012
@@ -62,7 +62,7 @@ public class AggregateProcessorTest exte
AggregationStrategy as = new BodyInAggregatingStrategy();
Predicate complete = body().contains("END");
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
ap.setCompletionPredicate(complete);
ap.setEagerCheckCompletion(false);
ap.start();
@@ -103,7 +103,7 @@ public class AggregateProcessorTest exte
AggregationStrategy as = new BodyInAggregatingStrategy();
Predicate complete = body().isEqualTo("END");
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
ap.setCompletionPredicate(complete);
ap.setEagerCheckCompletion(true);
ap.start();
@@ -151,7 +151,7 @@ public class AggregateProcessorTest exte
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
ap.setCompletionSize(3);
ap.setEagerCheckCompletion(eager);
ap.start();
@@ -199,7 +199,7 @@ public class AggregateProcessorTest exte
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
ap.setCompletionTimeout(3000);
ap.setEagerCheckCompletion(eager);
ap.start();
@@ -248,7 +248,7 @@ public class AggregateProcessorTest exte
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
ap.setCompletionInterval(3000);
ap.start();
@@ -289,7 +289,7 @@ public class AggregateProcessorTest exte
AggregationStrategy as = new BodyInAggregatingStrategy();
Predicate complete = body().contains("END");
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
ap.setCompletionPredicate(complete);
ap.setIgnoreInvalidCorrelationKeys(true);
@@ -329,7 +329,7 @@ public class AggregateProcessorTest exte
AggregationStrategy as = new BodyInAggregatingStrategy();
Predicate complete = body().contains("END");
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
ap.setCompletionPredicate(complete);
ap.start();
@@ -376,7 +376,7 @@ public class AggregateProcessorTest exte
AggregationStrategy as = new BodyInAggregatingStrategy();
Predicate complete = body().contains("END");
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
ap.setCompletionPredicate(complete);
ap.setCloseCorrelationKeyOnCompletion(1000);
@@ -423,7 +423,7 @@ public class AggregateProcessorTest exte
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
ap.setCompletionSize(100);
ap.setCompletionFromBatchConsumer(true);
@@ -520,7 +520,7 @@ public class AggregateProcessorTest exte
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
ap.setEagerCheckCompletion(true);
ap.setCompletionPredicate(body().isEqualTo("END"));
if (handler != null) {
@@ -571,7 +571,7 @@ public class AggregateProcessorTest exte
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
ap.setCompletionSize(10);
ap.start();
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java Tue Mar 13 16:20:43 2012
@@ -59,7 +59,7 @@ public class AggregateProcessorTimeoutCo
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
// start with a high timeout so no completes before we stop
ap.setCompletionTimeout(2000);
ap.start();
@@ -101,7 +101,7 @@ public class AggregateProcessorTimeoutCo
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
// start with a high timeout so no completes before we stop
ap.setCompletionTimeoutExpression(header("myTimeout"));
ap.start();
@@ -145,7 +145,7 @@ public class AggregateProcessorTimeoutCo
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
// start with a high timeout so no completes before we stop
ap.setCompletionTimeoutExpression(header("myTimeout"));
ap.start();