You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/03/20 15:54:12 UTC
svn commit: r925619 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/builder/
camel-core/src/main/java/org/apache/camel/impl/
camel-core/src/main/java/org/apache/camel/model/ camel-core/src/main/ja...
Author: davsclaus
Date: Sat Mar 20 14:54:11 2010
New Revision: 925619
URL: http://svn.apache.org/viewvc?rev=925619&view=rev
Log:
CAMEL-1588: Lookup thread pool will fallback to use thread pool profiles if not found in Registry, but a profile id matched the ref name.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateThreadPoolProfileTest.java
- copied, changed from r925107, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutTest.java
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateThreadPoolProfileTest.java
- copied, changed from r925107, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.java
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateThreadPoolProfileTest.xml
- copied, changed from r925107, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.xml
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolBuilder.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java Sat Mar 20 14:54:11 2010
@@ -43,7 +43,6 @@ import org.apache.camel.spi.PackageScanC
import org.apache.camel.spi.Registry;
import org.apache.camel.spi.ServicePool;
import org.apache.camel.spi.ShutdownStrategy;
-import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.spi.TypeConverterRegistry;
/**
@@ -461,7 +460,9 @@ public interface CamelContext extends Se
List<String> getLanguageNames();
/**
- * Creates a new ProducerTemplate.
+ * Creates a new {@link ProducerTemplate} which is <b>not</b> started.
+ * <p/>
+ * You <b>must</b> start the template before its being used.
* <p/>
* See this FAQ before use: <a href="http://camel.apache.org/why-does-camel-use-too-many-threads-with-producertemplate.html">
* Why does Camel use too many threads with ProducerTemplate?</a>
@@ -471,7 +472,9 @@ public interface CamelContext extends Se
ProducerTemplate createProducerTemplate();
/**
- * Creates a new ConsumerTemplate.
+ * Creates a new {@link ConsumerTemplate} which is <b>not</b> started.
+ * <p/>
+ * You <b>must</b> start the template before its being used.
* <p/>
* See this FAQ before use: <a href="http://camel.apache.org/why-does-camel-use-too-many-threads-with-producertemplate.html">
* Why does Camel use too many threads with ProducerTemplate?</a> as it also applies for ConsumerTemplate.
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolBuilder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolBuilder.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolBuilder.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolBuilder.java Sat Mar 20 14:54:11 2010
@@ -70,17 +70,6 @@ public final class ThreadPoolBuilder {
}
/**
- * Lookup a {@link java.util.concurrent.ExecutorService} from the {@link org.apache.camel.spi.Registry}.
- *
- * @param source the source object, usually it should be <tt>this</tt> passed in as parameter
- * @param executorServiceRef reference to lookup
- * @return the {@link java.util.concurrent.ExecutorService} or <tt>null</tt> if not found
- */
- public ExecutorService lookup(Object source, String executorServiceRef) {
- return camelContext.getExecutorServiceStrategy().lookup(source, executorServiceRef);
- }
-
- /**
* Builds the new thread pool
*
* @param name name which is appended to the thread name
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java Sat Mar 20 14:54:11 2010
@@ -134,11 +134,24 @@ public class DefaultExecutorServiceStrat
this.threadNamePattern = threadNamePattern;
}
- public ExecutorService lookup(Object source, String executorServiceRef) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Looking up ExecutorService with ref: " + executorServiceRef);
+ public ExecutorService lookup(Object source, String name, String executorServiceRef) {
+ ExecutorService answer = camelContext.getRegistry().lookup(executorServiceRef, ExecutorService.class);
+ if (answer != null && LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Looking up ExecutorService with ref: " + executorServiceRef + " and found it from Registry: " + answer);
+ }
}
- return camelContext.getRegistry().lookup(executorServiceRef, ExecutorService.class);
+
+ if (answer == null) {
+ // try to see if we got a thread pool profile with that id
+ answer = newThreadPool(source, name, executorServiceRef);
+ if (answer != null && LOG.isDebugEnabled()) {
+ LOG.debug("Looking up ExecutorService with ref: " + executorServiceRef
+ + " and found a matching ThreadPoolProfile to create the ExecutorService: " + answer);
+ }
+ }
+
+ return answer;
}
public ExecutorService newDefaultThreadPool(Object source, String name) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java Sat Mar 20 14:54:11 2010
@@ -505,6 +505,9 @@ public class DefaultProducerTemplate ext
}
};
+ if (executor == null) {
+ throw new IllegalStateException("ProducerTemplate has not been started");
+ }
return executor.submit(task);
}
@@ -515,6 +518,9 @@ public class DefaultProducerTemplate ext
}
};
+ if (executor == null) {
+ throw new IllegalStateException("ProducerTemplate has not been started");
+ }
return executor.submit(task);
}
@@ -526,6 +532,9 @@ public class DefaultProducerTemplate ext
}
};
+ if (executor == null) {
+ throw new IllegalStateException("ProducerTemplate has not been started");
+ }
return executor.submit(task);
}
@@ -537,6 +546,9 @@ public class DefaultProducerTemplate ext
}
};
+ if (executor == null) {
+ throw new IllegalStateException("ProducerTemplate has not been started");
+ }
return executor.submit(task);
}
@@ -549,6 +561,9 @@ public class DefaultProducerTemplate ext
}
};
+ if (executor == null) {
+ throw new IllegalStateException("ProducerTemplate has not been started");
+ }
return executor.submit(task);
}
@@ -560,6 +575,9 @@ public class DefaultProducerTemplate ext
}
};
+ if (executor == null) {
+ throw new IllegalStateException("ProducerTemplate has not been started");
+ }
return executor.submit(task);
}
@@ -570,6 +588,9 @@ public class DefaultProducerTemplate ext
}
};
+ if (executor == null) {
+ throw new IllegalStateException("ProducerTemplate has not been started");
+ }
return executor.submit(task);
}
@@ -580,6 +601,9 @@ public class DefaultProducerTemplate ext
}
};
+ if (executor == null) {
+ throw new IllegalStateException("ProducerTemplate has not been started");
+ }
return executor.submit(task);
}
@@ -592,6 +616,9 @@ public class DefaultProducerTemplate ext
}
};
+ if (executor == null) {
+ throw new IllegalStateException("ProducerTemplate has not been started");
+ }
return executor.submit(task);
}
@@ -620,6 +647,9 @@ public class DefaultProducerTemplate ext
}
};
+ if (executor == null) {
+ throw new IllegalStateException("ProducerTemplate has not been started");
+ }
return executor.submit(task);
}
@@ -641,6 +671,9 @@ public class DefaultProducerTemplate ext
}
};
+ if (executor == null) {
+ throw new IllegalStateException("ProducerTemplate has not been started");
+ }
return executor.submit(task);
}
@@ -662,6 +695,9 @@ public class DefaultProducerTemplate ext
}
};
+ if (executor == null) {
+ throw new IllegalStateException("ProducerTemplate has not been started");
+ }
return executor.submit(task);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Sat Mar 20 14:54:11 2010
@@ -150,7 +150,7 @@ public class AggregateDefinition extends
AggregationStrategy strategy = createAggregationStrategy(routeContext);
// executor service is mandatory for the Aggregator
- executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
+ executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, "Aggregator", this);
if (executorService == null) {
if (isParallelProcessing()) {
// we are running in parallel so create a cached thread pool which grows/shrinks automatic
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java Sat Mar 20 14:54:11 2010
@@ -24,7 +24,7 @@ import org.apache.camel.ExecutorServiceA
* Enables definitions to support concurrency using {@link java.util.concurrent.ExecutorService}
*
* @version $Revision$
- * @see org.apache.camel.util.concurrent.ExecutorServiceHelper#getConfiguredExecutorService(org.apache.camel.spi.RouteContext,
+ * @see org.apache.camel.util.concurrent.ExecutorServiceHelper#getConfiguredExecutorService(org.apache.camel.spi.RouteContext, String name,
* ExecutorServiceAwareDefinition)
*/
public interface ExecutorServiceAwareDefinition<Type extends ProcessorDefinition> extends ExecutorServiceAware {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java Sat Mar 20 14:54:11 2010
@@ -148,7 +148,7 @@ public class MulticastDefinition extends
aggregationStrategy = new UseLatestAggregationStrategy();
}
- executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
+ executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, "Multicast", this);
if (isParallelProcessing() && executorService == null) {
// we are running in parallel so create a cached thread pool which grows/shrinks automatic
executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, "Multicast");
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java Sat Mar 20 14:54:11 2010
@@ -97,7 +97,7 @@ public class OnCompletionDefinition exte
when = onWhen.getExpression().createPredicate(routeContext);
}
- executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
+ executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, "OnCompletion", this);
if (executorService == null) {
executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, "OnCompletion");
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java Sat Mar 20 14:54:11 2010
@@ -95,7 +95,7 @@ public class RecipientListDefinition ext
answer.setStopOnException(isStopOnException());
}
- executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
+ executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, "RecipientList", this);
if (isParallelProcessing() && executorService == null) {
// we are running in parallel so create a cached thread pool which grows/shrinks automatic
executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, "RecipientList");
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java Sat Mar 20 14:54:11 2010
@@ -88,7 +88,7 @@ public class SplitDefinition extends Exp
aggregationStrategy = createAggregationStrategy(routeContext);
- executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
+ executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, "Split", this);
if (isParallelProcessing() && executorService == null) {
// we are running in parallel so create a cached thread pool which grows/shrinks automatic
executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, "Split");
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java Sat Mar 20 14:54:11 2010
@@ -70,11 +70,13 @@ public class ThreadsDefinition extends O
@Override
public Processor createProcessor(RouteContext routeContext) throws Exception {
+ // The threads name
+ String name = getThreadName() != null ? getThreadName() : "Threads";
+
// prefer any explicit configured executor service
- executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
+ executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, name, this);
if (executorService == null) {
// none was configured so create an executor based on the other parameters
- String name = getThreadName() != null ? getThreadName() : "Threads";
if (poolSize == null || poolSize <= 0) {
// use the cached thread pool
executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, name);
@@ -92,8 +94,7 @@ public class ThreadsDefinition extends O
}
executorService = routeContext.getCamelContext().getExecutorServiceStrategy()
- .newThreadPool(this, name, poolSize, max, keepAlive, tu,
- maxQueue, rejected, true);
+ .newThreadPool(this, name, poolSize, max, keepAlive, tu, maxQueue, rejected, true);
}
}
Processor childProcessor = routeContext.createProcessor(this);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java Sat Mar 20 14:54:11 2010
@@ -103,7 +103,7 @@ public class ToDefinition extends SendDe
SendAsyncProcessor async = new SendAsyncProcessor(endpoint, getPattern(), uow);
- executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
+ executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, "ToAsync", this);
if (executorService != null) {
async.setExecutorService(executorService);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java Sat Mar 20 14:54:11 2010
@@ -67,7 +67,7 @@ public class WireTapDefinition extends S
public Processor createProcessor(RouteContext routeContext) throws Exception {
Endpoint endpoint = resolveEndpoint(routeContext);
- executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
+ executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, "WireTap", this);
if (executorService == null) {
executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, "WireTap");
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java Sat Mar 20 14:54:11 2010
@@ -100,13 +100,15 @@ public interface ExecutorServiceStrategy
void setThreadNamePattern(String pattern) throws IllegalArgumentException;
/**
- * Lookup a {@link java.util.concurrent.ExecutorService} from the {@link org.apache.camel.spi.Registry}.
+ * Lookup a {@link java.util.concurrent.ExecutorService} from the {@link org.apache.camel.spi.Registry}
+ * and from known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.
*
* @param source the source object, usually it should be <tt>this</tt> passed in as parameter
+ * @param name name which is appended to the thread name
* @param executorServiceRef reference to lookup
* @return the {@link java.util.concurrent.ExecutorService} or <tt>null</tt> if not found
*/
- ExecutorService lookup(Object source, String executorServiceRef);
+ ExecutorService lookup(Object source, String name, String executorServiceRef);
/**
* Creates a new thread pool using the default thread pool profile.
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Sat Mar 20 14:54:11 2010
@@ -222,18 +222,22 @@ public final class ExecutorServiceHelper
* <p/>
* This method will lookup for configured thread pool in the following order
* <ul>
- * <li>from the definition if any explicit configured executor service.</li>
- * <li>if none found, then <tt>null</tt> is returned.</li>
+ * <li>from the definition if any explicit configured executor service.</li>
+ * <li>from the {@link org.apache.camel.spi.Registry} if found</li>
+ * <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.</li>
+ * <li>if none found, then <tt>null</tt> is returned.</li>
* </ul>
* The various {@link ExecutorServiceAwareDefinition} should use this helper method to ensure they support
* configured executor services in the same coherent way.
*
- * @param routeContext the rout context
- * @param definition the node definition which may leverage executor service.
+ * @param routeContext the rout context
+ * @param name name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService}
+ * is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}.
+ * @param definition the node definition which may leverage executor service.
* @return the configured executor service, or <tt>null</tt> if none was configured.
* @throws IllegalArgumentException is thrown if lookup of executor service in {@link org.apache.camel.spi.Registry} was not found
*/
- public static ExecutorService getConfiguredExecutorService(RouteContext routeContext,
+ public static ExecutorService getConfiguredExecutorService(RouteContext routeContext, String name,
ExecutorServiceAwareDefinition definition) throws IllegalArgumentException {
ExecutorServiceStrategy strategy = routeContext.getCamelContext().getExecutorServiceStrategy();
ObjectHelper.notNull(strategy, "ExecutorServiceStrategy", routeContext.getCamelContext());
@@ -242,7 +246,7 @@ public final class ExecutorServiceHelper
if (definition.getExecutorService() != null) {
return definition.getExecutorService();
} else if (definition.getExecutorServiceRef() != null) {
- ExecutorService answer = strategy.lookup(definition, definition.getExecutorServiceRef());
+ ExecutorService answer = strategy.lookup(definition, name, definition.getExecutorServiceRef());
if (answer == null) {
throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " not found in registry.");
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java Sat Mar 20 14:54:11 2010
@@ -37,17 +37,6 @@ public class ThreadPoolBuilderTest exten
return jndi;
}
- public void testThreadPoolBuilderLookup() throws Exception {
- ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
- ExecutorService executor = builder.lookup(this, "someonesPool");
- assertNotNull(executor);
-
- assertEquals(false, executor.isShutdown());
- context.stop();
- // you need to manage this pool yourself
- assertEquals(false, executor.isShutdown());
- }
-
public void testThreadPoolBuilderDefault() throws Exception {
ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
ExecutorService executor = builder.build(this, "myPool");
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java Sat Mar 20 14:54:11 2010
@@ -267,4 +267,33 @@ public class DefaultExecutorServiceStrat
assertTrue(tp.isShutdown());
}
+ public void testLookupThreadPoolProfile() throws Exception {
+ ExecutorService pool = context.getExecutorServiceStrategy().lookup(this, "Cool", "fooProfile");
+ // does not exists yet
+ assertNull(pool);
+
+ assertNull(context.getExecutorServiceStrategy().getThreadPoolProfile("fooProfile"));
+
+ ThreadPoolProfileSupport foo = new ThreadPoolProfileSupport("fooProfile");
+ foo.setKeepAliveTime(20L);
+ foo.setMaxPoolSize(40);
+ foo.setPoolSize(5);
+ foo.setMaxQueueSize(2000);
+
+ context.getExecutorServiceStrategy().registerThreadPoolProfile(foo);
+
+ pool = context.getExecutorServiceStrategy().lookup(this, "Cool", "fooProfile");
+ assertNotNull(pool);
+
+ ThreadPoolExecutor tp = assertIsInstanceOf(ThreadPoolExecutor.class, pool);
+ assertEquals(20, tp.getKeepAliveTime(TimeUnit.SECONDS));
+ assertEquals(40, tp.getMaximumPoolSize());
+ assertEquals(5, tp.getCorePoolSize());
+ assertFalse(tp.isShutdown());
+
+ context.stop();
+
+ assertTrue(tp.isShutdown());
+ }
+
}
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateThreadPoolProfileTest.java (from r925107, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateThreadPoolProfileTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateThreadPoolProfileTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutTest.java&r1=925107&r2=925619&rev=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateThreadPoolProfileTest.java Sat Mar 20 14:54:11 2010
@@ -16,28 +16,24 @@
*/
package org.apache.camel.processor.aggregator;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ThreadPoolRejectedPolicy;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.ThreadPoolProfileSupport;
import org.apache.camel.processor.BodyInAggregatingStrategy;
+import org.apache.camel.spi.ThreadPoolProfile;
/**
* @version $Revision$
*/
-public class AggregateExpressionTimeoutTest extends ContextTestSupport {
+public class AggregateThreadPoolProfileTest extends ContextTestSupport {
- public void testAggregateExpressionTimeout() throws Exception {
+ public void testAggregateThreadPoolProfile() throws Exception {
getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C");
- Map headers = new HashMap();
- headers.put("id", 123);
- headers.put("timeout", 2000);
-
- template.sendBodyAndHeaders("direct:start", "A", headers);
- template.sendBodyAndHeaders("direct:start", "B", headers);
- template.sendBodyAndHeaders("direct:start", "C", headers);
+ template.sendBodyAndHeader("direct:start", "A", "id", 123);
+ template.sendBodyAndHeader("direct:start", "B", "id", 123);
+ template.sendBodyAndHeader("direct:start", "C", "id", 123);
assertMockEndpointsSatisfied();
}
@@ -47,15 +43,19 @@ public class AggregateExpressionTimeoutT
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- // START SNIPPET: e1
+ // create and register thread pool profile
+ ThreadPoolProfile profile = new ThreadPoolProfileSupport("myProfile");
+ profile.setPoolSize(2);
+ profile.setMaxPoolSize(8);
+ profile.setRejectedPolicy(ThreadPoolRejectedPolicy.Abort);
+ context.getExecutorServiceStrategy().registerThreadPoolProfile(profile);
+
from("direct:start")
- // aggregate all exchanges correlated by the id header.
- // Aggregate them using the BodyInAggregatingStrategy strategy which
- // and the timeout header contains the timeout in millis of inactivity them timeout and complete the aggregation
- // and send it to mock:aggregated
- .aggregate(header("id"), new BodyInAggregatingStrategy()).completionTimeout(header("timeout"))
+ .aggregate(header("id"), new BodyInAggregatingStrategy())
+ // use our custom thread pool profile
+ .completionSize(3).executorServiceRef("myProfile")
+ .to("log:foo")
.to("mock:aggregated");
- // END SNIPPET: e1
}
};
}
Copied: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateThreadPoolProfileTest.java (from r925107, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateThreadPoolProfileTest.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateThreadPoolProfileTest.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.java&r1=925107&r2=925619&rev=925619&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateThreadPoolProfileTest.java Sat Mar 20 14:54:11 2010
@@ -17,17 +17,17 @@
package org.apache.camel.spring.processor.aggregator;
import org.apache.camel.CamelContext;
-import org.apache.camel.processor.aggregator.AggregateExpressionSizeFallbackTest;
+import org.apache.camel.processor.aggregator.AggregateThreadPoolProfileTest;
import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
/**
* @version $Revision$
*/
-public class SpringAggregateExpressionSizeFallbackTest extends AggregateExpressionSizeFallbackTest {
+public class SpringAggregateThreadPoolProfileTest extends AggregateThreadPoolProfileTest {
protected CamelContext createCamelContext() throws Exception {
- return createSpringCamelContext(this, "org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.xml");
+ return createSpringCamelContext(this, "org/apache/camel/spring/processor/aggregator/SpringAggregateThreadPoolProfileTest.xml");
}
}
\ No newline at end of file
Copied: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateThreadPoolProfileTest.xml (from r925107, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.xml)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateThreadPoolProfileTest.xml?p2=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateThreadPoolProfileTest.xml&p1=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.xml&r1=925107&r2=925619&rev=925619&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.xml (original)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateThreadPoolProfileTest.xml Sat Mar 20 14:54:11 2010
@@ -24,21 +24,24 @@
<!-- START SNIPPET: e1 -->
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
+
+ <!-- define a thread pool profile -->
+ <threadPoolProfile id="myProfile" poolSize="2" maxQueueSize="8" rejectedPolicy="Abort"/>
+
<route>
<from uri="direct:start"/>
- <aggregate strategyRef="aggregatorStrategy" completionSize="3">
+ <!-- use the custom thread pool profile in this aggregate EIP, by referring to it -->
+ <aggregate strategyRef="aggregatorStrategy" completionSize="3" executorServiceRef="myProfile">
<correlationExpression>
<simple>header.id</simple>
- </correlationExpression>
- <completionSize>
- <header>mySize</header>
- </completionSize>
+ </correlationExpression>
+ <to uri="log:foo"/>
<to uri="mock:aggregated"/>
</aggregate>
</route>
</camelContext>
+ <!-- END SNIPPET: e1 -->
<bean id="aggregatorStrategy" class="org.apache.camel.processor.BodyInAggregatingStrategy"/>
- <!-- END SNIPPET: e1 -->
</beans>