You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/03/05 12:27:36 UTC
svn commit: r919382 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/impl/ main/java/org/apache/camel/model/
main/java/org/apache/camel/processor/
main/java/org/apache/camel/processor/aggregate/
main/java/org/apache/camel/util/concurrent/ t...
Author: davsclaus
Date: Fri Mar 5 11:27:36 2010
New Revision: 919382
URL: http://svn.apache.org/viewvc?rev=919382&view=rev
Log:
CAMEL-1588: Prefer to use CachedExecutorService instead of a fixed size pool. The cached can grow/shrink and is recommended as the best general purpose pool.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java Fri Mar 5 11:27:36 2010
@@ -46,7 +46,6 @@
* @version $Revision$
*/
public class DefaultProducerTemplate extends ServiceSupport implements ProducerTemplate {
- private static final int DEFAULT_THREADPOOL_SIZE = 10;
private final CamelContext context;
private final ProducerCache producerCache;
private Endpoint defaultEndpoint;
@@ -55,7 +54,7 @@
public DefaultProducerTemplate(CamelContext context) {
this.context = context;
this.producerCache = new ProducerCache(context);
- this.executor = ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, "ProducerTemplate", true);
+ this.executor = ExecutorServiceHelper.newCachedThreadPool("ProducerTemplate", true);
}
public DefaultProducerTemplate(CamelContext context, ExecutorService executor) {
@@ -684,7 +683,7 @@
super.start();
ServiceHelper.startService(producerCache);
if (executor == null || executor.isShutdown()) {
- executor = ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, "ProducerTemplate", true);
+ executor = ExecutorServiceHelper.newCachedThreadPool("ProducerTemplate", true);
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java Fri Mar 5 11:27:36 2010
@@ -123,7 +123,7 @@
}
if (executorService == null) {
// fall back and use default
- executorService = ExecutorServiceHelper.newScheduledThreadPool(10, "RecipientList", true);
+ executorService = ExecutorServiceHelper.newCachedThreadPool("RecipientList", true);
}
return executorService;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java Fri Mar 5 11:27:36 2010
@@ -105,14 +105,18 @@
}
private ExecutorService createExecutorService(RouteContext routeContext) {
- if (executorServiceRef != null) {
+ if (executorService == null && executorServiceRef != null) {
executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
+ if (executorService == null) {
+ throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef + " not found in registry.");
+ }
}
if (executorService == null) {
- executorService = ExecutorServiceHelper.newScheduledThreadPool(10, "Split", true);
+ // fall back and use default
+ executorService = ExecutorServiceHelper.newCachedThreadPool("Split", true);
}
return executorService;
- }
+ }
// Fluent API
// -------------------------------------------------------------------------
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java Fri Mar 5 11:27:36 2010
@@ -50,11 +50,14 @@
@Override
public Processor createProcessor(RouteContext routeContext) throws Exception {
- if (executorServiceRef != null) {
+ if (executorService == null && executorServiceRef != null) {
executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
+ if (executorService == null) {
+ throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef + " not found in registry.");
+ }
}
if (executorService == null && poolSize != null) {
- executorService = ExecutorServiceHelper.newScheduledThreadPool(poolSize, "Threads", true);
+ executorService = ExecutorServiceHelper.newThreadPool("Threads", poolSize, poolSize);
}
Processor childProcessor = routeContext.createProcessor(this);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java Fri Mar 5 11:27:36 2010
@@ -97,7 +97,7 @@
}
}
if (executorService == null && poolSize != null) {
- executorService = ExecutorServiceHelper.newScheduledThreadPool(poolSize, "ToAsync[" + getLabel() + "]", true);
+ executorService = ExecutorServiceHelper.newThreadPool("ToAsync[" + getLabel() + "]", poolSize, poolSize);
}
// create the child processor which is the async route
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Fri Mar 5 11:27:36 2010
@@ -61,7 +61,6 @@
*/
public class MulticastProcessor extends ServiceSupport implements Processor, Navigate<Processor>, Traceable {
- private static final int DEFAULT_THREADPOOL_SIZE = 10;
private static final transient Log LOG = LogFactory.getLog(MulticastProcessor.class);
/**
@@ -125,11 +124,8 @@
this.streaming = streaming;
this.stopOnException = stopOnException;
- if (isParallelProcessing()) {
- if (this.executorService == null) {
- // setup default executor as parallel processing requires an executor
- this.executorService = ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, "Multicast", true);
- }
+ if (isParallelProcessing() && getExecutorService() == null) {
+ this.executorService = ExecutorServiceHelper.newCachedThreadPool("Multicast", true);
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java Fri Mar 5 11:27:36 2010
@@ -36,7 +36,6 @@
*/
public class OnCompletionProcessor extends ServiceSupport implements Processor, Traceable {
- private static final int DEFAULT_THREADPOOL_SIZE = 10;
private static final transient Log LOG = LogFactory.getLog(OnCompletionProcessor.class);
private ExecutorService executorService;
private Processor processor;
@@ -176,8 +175,8 @@
return executorService;
}
- private ExecutorService createExecutorService() {
- return ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, this.toString(), true);
+ protected ExecutorService createExecutorService() {
+ return ExecutorServiceHelper.newCachedThreadPool(this.toString(), true);
}
public void setExecutorService(ExecutorService executorService) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java Fri Mar 5 11:27:36 2010
@@ -123,7 +123,7 @@
}
/**
- * The producer is <b>not</b> capable of async processing so lets simulate this by transfering the task
+ * The producer is <b>not</b> capable of async processing so lets simulate this by transferring the task
* to another {@link ExecutorService} for async processing.
*
* @param producer the producer
@@ -165,7 +165,7 @@
public ExecutorService getExecutorService() {
if (executorService == null) {
- executorService = createExecutorService("SendAsyncProcessor-Consumer");
+ executorService = ExecutorServiceHelper.newThreadPool("SendAsyncProcessor-Consumer", poolSize, poolSize);
}
return executorService;
}
@@ -263,10 +263,6 @@
}
}
- protected ExecutorService createExecutorService(String name) {
- return ExecutorServiceHelper.newScheduledThreadPool(poolSize, name, true);
- }
-
protected void doStart() throws Exception {
super.doStart();
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java Fri Mar 5 11:27:36 2010
@@ -39,7 +39,6 @@
*/
public class ThreadsProcessor extends DelegateProcessor implements Processor {
- protected static final int DEFAULT_THREADPOOL_SIZE = 10;
protected ExecutorService executorService;
protected WaitForTaskToComplete waitForTaskToComplete;
@@ -105,7 +104,7 @@
}
protected ExecutorService createExecutorService() {
- return ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, "Threads", true);
+ return ExecutorServiceHelper.newCachedThreadPool("Threads", true);
}
protected void doStop() throws Exception {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java Fri Mar 5 11:27:36 2010
@@ -38,7 +38,6 @@
*/
public class WireTapProcessor extends SendProcessor {
- private static final int DEFAULT_THREADPOOL_SIZE = 10;
private ExecutorService executorService;
// expression or processor used for populating a new exchange to send
@@ -161,8 +160,8 @@
return executorService;
}
- private ExecutorService createExecutorService() {
- return ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, this.toString(), true);
+ protected ExecutorService createExecutorService() {
+ return ExecutorServiceHelper.newCachedThreadPool(this.toString(), true);
}
public void setExecutorService(ExecutorService executorService) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Fri Mar 5 11:27:36 2010
@@ -470,17 +470,18 @@
if (executorService == null) {
if (isParallelProcessing()) {
- // we are running in parallel so create a default thread pool
- executorService = ExecutorServiceHelper.newFixedThreadPool(10, "Aggregator", true);
+ // we are running in parallel so create a cached thread pool which grows/shrinks automatic
+ executorService = ExecutorServiceHelper.newCachedThreadPool("Aggregator", true);
} else {
// use a single threaded if we are not running in parallel
- executorService = ExecutorServiceHelper.newFixedThreadPool(1, "Aggregator", true);
+ executorService = ExecutorServiceHelper.newSingleThreadExecutor("Aggregator", true);
}
}
// start timeout service if its in use
if (getCompletionTimeout() > 0 || getCompletionTimeoutExpression() != null) {
ScheduledExecutorService scheduler = ExecutorServiceHelper.newScheduledThreadPool(1, "AggregateTimeoutChecker", true);
+ // check for timed out aggregated messages once every second
timeoutMap = new AggregationTimeoutMap(scheduler, 1000L);
ServiceHelper.startService(timeoutMap);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Fri Mar 5 11:27:36 2010
@@ -18,8 +18,11 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -37,6 +40,9 @@
/**
* Creates a new thread name with the given prefix
+ *
+ * @param name the prefix
+ * @return the thread name, which is unique
*/
public static String getThreadName(String name) {
return "Camel thread " + nextThreadCounter() + ": " + name;
@@ -46,6 +52,14 @@
return threadCounter.getAndIncrement();
}
+ /**
+ * Creates a new scheduled thread pool which can schedule threads.
+ *
+ * @param poolSize the core pool size
+ * @param name part of the thread name
+ * @param daemon whether the threads is daemon or not
+ * @return the created pool
+ */
public static ScheduledExecutorService newScheduledThreadPool(final int poolSize, final String name, final boolean daemon) {
return Executors.newScheduledThreadPool(poolSize, new ThreadFactory() {
public Thread newThread(Runnable r) {
@@ -76,6 +90,13 @@
});
}
+ /**
+ * Creates a new cached thread pool which should be the most commonly used.
+ *
+ * @param name part of the thread name
+ * @param daemon whether the threads is daemon or not
+ * @return the created pool
+ */
public static ExecutorService newCachedThreadPool(final String name, final boolean daemon) {
return Executors.newCachedThreadPool(new ThreadFactory() {
public Thread newThread(Runnable r) {
@@ -86,4 +107,41 @@
});
}
+ /**
+ * Creates a new custom thread pool using 60 seconds as keep alive
+ *
+ * @param name part of the thread name
+ * @param corePoolSize the core size
+ * @param maxPoolSize the maximum pool size
+ * @return the created pool
+ */
+ public static ExecutorService newThreadPool(final String name, int corePoolSize, int maxPoolSize) {
+ return ExecutorServiceHelper.newThreadPool(name, corePoolSize, maxPoolSize, 60, TimeUnit.SECONDS, true);
+ }
+
+ /**
+ * Creates a new custom thread pool
+ *
+ * @param name part of the thread name
+ * @param corePoolSize the core size
+ * @param maxPoolSize the maximum pool size
+ * @param keepAliveTime keep alive
+ * @param timeUnit keep alive time unit
+ * @param daemon whether the threads is daemon or not
+ * @return the created pool
+ */
+ public static ExecutorService newThreadPool(final String name, int corePoolSize, int maxPoolSize,
+ long keepAliveTime, TimeUnit timeUnit, final boolean daemon) {
+ ThreadPoolExecutor answer = new ThreadPoolExecutor(corePoolSize, maxPoolSize,
+ keepAliveTime, timeUnit, new LinkedBlockingQueue<Runnable>());
+ answer.setThreadFactory(new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ Thread answer = new Thread(r, getThreadName(name));
+ answer.setDaemon(daemon);
+ return answer;
+ }
+ });
+ return answer;
+ }
+
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java Fri Mar 5 11:27:36 2010
@@ -104,7 +104,7 @@
}
public void testExecutor() throws Exception {
- ScheduledExecutorService e = ExecutorServiceHelper.newScheduledThreadPool(1, "foo", true);
+ ScheduledExecutorService e = ExecutorServiceHelper.newScheduledThreadPool(2, "foo", true);
DefaultTimeoutMap map = new DefaultTimeoutMap(e, 500);
assertEquals(500, map.getPurgePollTime());