You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/03/05 12:27:36 UTC

svn commit: r919382 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/impl/ main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/ main/java/org/apache/camel/processor/aggregate/ main/java/org/apache/camel/util/concurrent/ t...

Author: davsclaus
Date: Fri Mar  5 11:27:36 2010
New Revision: 919382

URL: http://svn.apache.org/viewvc?rev=919382&view=rev
Log:
CAMEL-1588: Prefer to use CachedExecutorService instead of a fixed size pool. The cached can grow/shrink and is recommended as the best general purpose pool.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java Fri Mar  5 11:27:36 2010
@@ -46,7 +46,6 @@
  * @version $Revision$
  */
 public class DefaultProducerTemplate extends ServiceSupport implements ProducerTemplate {
-    private static final int DEFAULT_THREADPOOL_SIZE = 10;
     private final CamelContext context;
     private final ProducerCache producerCache;
     private Endpoint defaultEndpoint;
@@ -55,7 +54,7 @@
     public DefaultProducerTemplate(CamelContext context) {
         this.context = context;
         this.producerCache = new ProducerCache(context);
-        this.executor = ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, "ProducerTemplate", true);
+        this.executor = ExecutorServiceHelper.newCachedThreadPool("ProducerTemplate", true);
     }
 
     public DefaultProducerTemplate(CamelContext context, ExecutorService executor) {
@@ -684,7 +683,7 @@
         super.start();
         ServiceHelper.startService(producerCache);
         if (executor == null || executor.isShutdown()) {
-            executor = ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, "ProducerTemplate", true);
+            executor = ExecutorServiceHelper.newCachedThreadPool("ProducerTemplate", true);
         }
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java Fri Mar  5 11:27:36 2010
@@ -123,7 +123,7 @@
         }
         if (executorService == null) {
             // fall back and use default
-            executorService = ExecutorServiceHelper.newScheduledThreadPool(10, "RecipientList", true);
+            executorService = ExecutorServiceHelper.newCachedThreadPool("RecipientList", true);
         }
         return executorService;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java Fri Mar  5 11:27:36 2010
@@ -105,14 +105,18 @@
     }        
     
     private ExecutorService createExecutorService(RouteContext routeContext) {
-        if (executorServiceRef != null) {
+        if (executorService == null && executorServiceRef != null) {
             executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
+            if (executorService == null) {
+                throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef + " not found in registry.");
+            }
         }
         if (executorService == null) {
-            executorService = ExecutorServiceHelper.newScheduledThreadPool(10, "Split", true);
+            // fall back and use default
+            executorService = ExecutorServiceHelper.newCachedThreadPool("Split", true);
         }
         return executorService;
-    }         
+    }
     
     // Fluent API
     // -------------------------------------------------------------------------

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java Fri Mar  5 11:27:36 2010
@@ -50,11 +50,14 @@
 
     @Override
     public Processor createProcessor(RouteContext routeContext) throws Exception {
-        if (executorServiceRef != null) {
+        if (executorService == null && executorServiceRef != null) {
             executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
+            if (executorService == null) {
+                throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef + " not found in registry.");
+            }
         }
         if (executorService == null && poolSize != null) {
-            executorService = ExecutorServiceHelper.newScheduledThreadPool(poolSize, "Threads", true);
+            executorService = ExecutorServiceHelper.newThreadPool("Threads", poolSize, poolSize);
         }
         Processor childProcessor = routeContext.createProcessor(this);
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java Fri Mar  5 11:27:36 2010
@@ -97,7 +97,7 @@
             }
         }
         if (executorService == null && poolSize != null) {
-            executorService = ExecutorServiceHelper.newScheduledThreadPool(poolSize, "ToAsync[" + getLabel() + "]", true);
+            executorService = ExecutorServiceHelper.newThreadPool("ToAsync[" + getLabel() + "]", poolSize, poolSize);
         }
 
         // create the child processor which is the async route

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Fri Mar  5 11:27:36 2010
@@ -61,7 +61,6 @@
  */
 public class MulticastProcessor extends ServiceSupport implements Processor, Navigate<Processor>, Traceable {
 
-    private static final int DEFAULT_THREADPOOL_SIZE = 10;
     private static final transient Log LOG = LogFactory.getLog(MulticastProcessor.class);
 
     /**
@@ -125,11 +124,8 @@
         this.streaming = streaming;
         this.stopOnException = stopOnException;
 
-        if (isParallelProcessing()) {
-            if (this.executorService == null) {
-                // setup default executor as parallel processing requires an executor
-                this.executorService = ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, "Multicast", true);
-            }
+        if (isParallelProcessing() && getExecutorService() == null) {
+            this.executorService = ExecutorServiceHelper.newCachedThreadPool("Multicast", true);
         }
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java Fri Mar  5 11:27:36 2010
@@ -36,7 +36,6 @@
  */
 public class OnCompletionProcessor extends ServiceSupport implements Processor, Traceable {
 
-    private static final int DEFAULT_THREADPOOL_SIZE = 10;
     private static final transient Log LOG = LogFactory.getLog(OnCompletionProcessor.class);
     private ExecutorService executorService;
     private Processor processor;
@@ -176,8 +175,8 @@
         return executorService;
     }
 
-    private ExecutorService createExecutorService() {
-        return ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, this.toString(), true);
+    protected ExecutorService createExecutorService() {
+        return ExecutorServiceHelper.newCachedThreadPool(this.toString(), true);
     }
 
     public void setExecutorService(ExecutorService executorService) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java Fri Mar  5 11:27:36 2010
@@ -123,7 +123,7 @@
     }
 
     /**
-     * The producer is <b>not</b> capable of async processing so lets simulate this by transfering the task
+     * The producer is <b>not</b> capable of async processing so lets simulate this by transferring the task
      * to another {@link ExecutorService} for async processing.
      *
      * @param producer the producer
@@ -165,7 +165,7 @@
 
     public ExecutorService getExecutorService() {
         if (executorService == null) {
-            executorService = createExecutorService("SendAsyncProcessor-Consumer");
+            executorService = ExecutorServiceHelper.newThreadPool("SendAsyncProcessor-Consumer", poolSize, poolSize);
         }
         return executorService;
     }
@@ -263,10 +263,6 @@
         }
     }
 
-    protected ExecutorService createExecutorService(String name) {
-        return ExecutorServiceHelper.newScheduledThreadPool(poolSize, name, true);
-    }
-
     protected void doStart() throws Exception {
         super.doStart();
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java Fri Mar  5 11:27:36 2010
@@ -39,7 +39,6 @@
  */
 public class ThreadsProcessor extends DelegateProcessor implements Processor {
 
-    protected static final int DEFAULT_THREADPOOL_SIZE = 10;
     protected ExecutorService executorService;
     protected WaitForTaskToComplete waitForTaskToComplete;
 
@@ -105,7 +104,7 @@
     }
 
     protected ExecutorService createExecutorService() {
-        return ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, "Threads", true);
+        return ExecutorServiceHelper.newCachedThreadPool("Threads", true);
     }
 
     protected void doStop() throws Exception {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java Fri Mar  5 11:27:36 2010
@@ -38,7 +38,6 @@
  */
 public class WireTapProcessor extends SendProcessor {
 
-    private static final int DEFAULT_THREADPOOL_SIZE = 10;
     private ExecutorService executorService;
 
     // expression or processor used for populating a new exchange to send
@@ -161,8 +160,8 @@
         return executorService;
     }
 
-    private ExecutorService createExecutorService() {
-        return ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, this.toString(), true);
+    protected ExecutorService createExecutorService() {
+        return ExecutorServiceHelper.newCachedThreadPool(this.toString(), true);
     }
 
     public void setExecutorService(ExecutorService executorService) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Fri Mar  5 11:27:36 2010
@@ -470,17 +470,18 @@
 
         if (executorService == null) {
             if (isParallelProcessing()) {
-                // we are running in parallel so create a default thread pool
-                executorService = ExecutorServiceHelper.newFixedThreadPool(10, "Aggregator", true);
+                // we are running in parallel so create a cached thread pool which grows/shrinks automatic
+                executorService = ExecutorServiceHelper.newCachedThreadPool("Aggregator", true);
             } else {
                 // use a single threaded if we are not running in parallel
-                executorService = ExecutorServiceHelper.newFixedThreadPool(1, "Aggregator", true);
+                executorService = ExecutorServiceHelper.newSingleThreadExecutor("Aggregator", true);
             }
         }
 
         // start timeout service if its in use
         if (getCompletionTimeout() > 0 || getCompletionTimeoutExpression() != null) {
             ScheduledExecutorService scheduler = ExecutorServiceHelper.newScheduledThreadPool(1, "AggregateTimeoutChecker", true);
+            // check for timed out aggregated messages once every second
             timeoutMap = new AggregationTimeoutMap(scheduler, 1000L);
             ServiceHelper.startService(timeoutMap);
         }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Fri Mar  5 11:27:36 2010
@@ -18,8 +18,11 @@
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -37,6 +40,9 @@
 
     /**
      * Creates a new thread name with the given prefix
+     *
+     * @param name the prefix
+     * @return the thread name, which is unique
      */
     public static String getThreadName(String name) {
         return "Camel thread " + nextThreadCounter() + ": " + name;
@@ -46,6 +52,14 @@
         return threadCounter.getAndIncrement();
     }
 
+    /**
+     * Creates a new scheduled thread pool which can schedule threads.
+     *
+     * @param poolSize the core pool size
+     * @param name     part of the thread name
+     * @param daemon   whether the threads is daemon or not
+     * @return the created pool
+     */
     public static ScheduledExecutorService newScheduledThreadPool(final int poolSize, final String name, final boolean daemon) {
         return Executors.newScheduledThreadPool(poolSize, new ThreadFactory() {
             public Thread newThread(Runnable r) {
@@ -76,6 +90,13 @@
         });
     }
 
+    /**
+     * Creates a new cached thread pool which should be the most commonly used.
+     *
+     * @param name    part of the thread name
+     * @param daemon  whether the threads is daemon or not
+     * @return the created pool
+     */
     public static ExecutorService newCachedThreadPool(final String name, final boolean daemon) {
         return Executors.newCachedThreadPool(new ThreadFactory() {
             public Thread newThread(Runnable r) {
@@ -86,4 +107,41 @@
         });
     }
 
+    /**
+     * Creates a new custom thread pool using 60 seconds as keep alive
+     *
+     * @param name          part of the thread name
+     * @param corePoolSize  the core size
+     * @param maxPoolSize   the maximum pool size
+     * @return the created pool
+     */
+    public static ExecutorService newThreadPool(final String name, int corePoolSize, int maxPoolSize) {
+        return ExecutorServiceHelper.newThreadPool(name, corePoolSize, maxPoolSize, 60, TimeUnit.SECONDS, true);
+    }
+
+    /**
+     * Creates a new custom thread pool
+     *
+     * @param name          part of the thread name
+     * @param corePoolSize  the core size
+     * @param maxPoolSize   the maximum pool size
+     * @param keepAliveTime keep alive
+     * @param timeUnit      keep alive time unit
+     * @param daemon        whether the threads is daemon or not
+     * @return the created pool
+     */
+    public static ExecutorService newThreadPool(final String name, int corePoolSize, int maxPoolSize,
+                                                long keepAliveTime, TimeUnit timeUnit, final boolean daemon) {
+        ThreadPoolExecutor answer = new ThreadPoolExecutor(corePoolSize, maxPoolSize,
+                                                           keepAliveTime, timeUnit, new LinkedBlockingQueue<Runnable>());
+        answer.setThreadFactory(new ThreadFactory() {
+            public Thread newThread(Runnable r) {
+                Thread answer = new Thread(r, getThreadName(name));
+                answer.setDaemon(daemon);
+                return answer;
+            }
+        });
+        return answer;
+    }
+
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java Fri Mar  5 11:27:36 2010
@@ -104,7 +104,7 @@
     }
 
     public void testExecutor() throws Exception {
-        ScheduledExecutorService e = ExecutorServiceHelper.newScheduledThreadPool(1, "foo", true);
+        ScheduledExecutorService e = ExecutorServiceHelper.newScheduledThreadPool(2, "foo", true);
 
         DefaultTimeoutMap map = new DefaultTimeoutMap(e, 500);
         assertEquals(500, map.getPurgePollTime());