You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/03/16 10:24:47 UTC

svn commit: r923642 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/builder/ main/java/org/apache/camel/component/bean/ main/java/org/apache/camel/impl/ main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/ main/java/org/...

Author: davsclaus
Date: Tue Mar 16 09:24:46 2010
New Revision: 923642

URL: http://svn.apache.org/viewvc?rev=923642&view=rev
Log:
CAMEL-1588: Introduced ThreadPoolProfile with a sensible default used by EIPs. Introduced ThreadPoolBuilder to easily create pools from Java.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolBuilder.java   (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java   (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java   (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java   (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/RedeliveryPolicyDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
    camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShutdownThreadPoolTest.java

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolBuilder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolBuilder.java?rev=923642&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolBuilder.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolBuilder.java Tue Mar 16 09:24:46 2010
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.model.ThreadPoolProfileDefinition;
+
+/**
+ * A builder to create thread pools.
+ *
+ * @version $Revision$
+ */
+public final class ThreadPoolBuilder {
+
+    private final CamelContext camelContext;
+    private ThreadPoolProfileDefinition threadPoolDefinition;
+
+    public ThreadPoolBuilder(CamelContext camelContext) {
+        this.camelContext = camelContext;
+        // use the default thread profile as the base
+        this.threadPoolDefinition = new ThreadPoolProfileDefinition(camelContext.getExecutorServiceStrategy().getDefaultThreadPoolProfile());
+    }
+
+    public ThreadPoolBuilder poolSize(int poolSize) {
+        threadPoolDefinition.poolSize(poolSize);
+        return this;
+    }
+
+    public ThreadPoolBuilder maxPoolSize(int maxPoolSize) {
+        threadPoolDefinition.maxPoolSize(maxPoolSize);
+        return this;
+    }
+
+    public ThreadPoolBuilder keepAliveTime(long keepAliveTime) {
+        threadPoolDefinition.keepAliveTime(keepAliveTime);
+        return this;
+    }
+
+    public ThreadPoolBuilder timeUnit(TimeUnit timeUnit) {
+        threadPoolDefinition.timeUnit(timeUnit);
+        return this;
+    }
+
+    public ThreadPoolBuilder maxQueueSize(int maxQueueSize) {
+        threadPoolDefinition.maxQueueSize(maxQueueSize);
+        return this;
+    }
+
+    /**
+     * Lookup a {@link java.util.concurrent.ExecutorService} from the {@link org.apache.camel.spi.Registry}.
+     *
+     * @param source             the source object, usually it should be <tt>this</tt> passed in as parameter
+     * @param executorServiceRef reference to lookup
+     * @return the {@link java.util.concurrent.ExecutorService} or <tt>null</tt> if not found
+     */
+    public ExecutorService lookup(Object source, String executorServiceRef) {
+        return camelContext.getExecutorServiceStrategy().lookup(source, executorServiceRef);
+    }
+
+    /**
+     * Builds the new thread pool
+     *
+     * @param name name which is appended to the thread name
+     * @return the created thread pool
+     */
+    public ExecutorService build(String name) {
+        return build(null, name);
+    }
+
+    /**
+     * Builds the new thread pool
+     *
+     * @param source the source object, usually it should be <tt>this</tt> passed in as parameter
+     * @param name   name which is appended to the thread name
+     * @return the created thread pool
+     */
+    public ExecutorService build(Object source, String name) {
+        ExecutorService answer = camelContext.getExecutorServiceStrategy().newThreadPool(source, name,
+                threadPoolDefinition.getPoolSize(), threadPoolDefinition.getMaxPoolSize(),
+                threadPoolDefinition.getKeepAliveTime(), threadPoolDefinition.getTimeUnit(),
+                threadPoolDefinition.getMaxQueueSize(), false);
+
+        return answer;
+    }
+
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolBuilder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolBuilder.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java Tue Mar 16 09:24:46 2010
@@ -91,8 +91,8 @@ public class MethodInfo {
             }
 
             if (annotation.parallelProcessoing() && recipientList.getExecutorService() == null) {
-                // we are running in parallel so create a cached thread pool which grows/shrinks automatic
-                ExecutorService executor = camelContext.getExecutorServiceStrategy().newCachedThreadPool(this, "@RecipientList");
+                // we are running in parallel so we need a thread pool
+                ExecutorService executor = camelContext.getExecutorServiceStrategy().newDefaultThreadPool(this, "@RecipientList");
                 recipientList.setExecutorService(executor);
             }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java Tue Mar 16 09:24:46 2010
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.spi.ExecutorServiceStrategy;
+import org.apache.camel.spi.ThreadPoolProfile;
 import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,9 +38,26 @@ public class DefaultExecutorServiceStrat
     private final List<ExecutorService> executorServices = new ArrayList<ExecutorService>();
     private final CamelContext camelContext;
     private String threadNamePattern = "Camel Thread ${counter} - ${name}";
+    private ThreadPoolProfile defaultThreadPoolProfile;
 
     public DefaultExecutorServiceStrategy(CamelContext camelContext) {
         this.camelContext = camelContext;
+        this.defaultThreadPoolProfile = new ThreadPoolProfileSupport();
+        this.defaultThreadPoolProfile.setDefaultProfile(true);
+    }
+
+    public ThreadPoolProfile getDefaultThreadPoolProfile() {
+        return defaultThreadPoolProfile;
+    }
+
+    public void setDefaultThreadPoolProfile(ThreadPoolProfile defaultThreadPoolProfile) {
+        // the old is no long default
+        if (this.defaultThreadPoolProfile != null) {
+            this.defaultThreadPoolProfile.setDefaultProfile(false);
+        }
+        // and replace with the new default profile
+        this.defaultThreadPoolProfile = defaultThreadPoolProfile;
+        this.defaultThreadPoolProfile.setDefaultProfile(true);
     }
 
     public String getThreadName(String name) {
@@ -61,6 +79,15 @@ public class DefaultExecutorServiceStrat
         return camelContext.getRegistry().lookup(executorServiceRef, ExecutorService.class);
     }
 
+    public ExecutorService newDefaultThreadPool(Object source, String name) {
+        ExecutorService answer = ExecutorServiceHelper.newThreadPool(threadNamePattern, name,
+            defaultThreadPoolProfile.getPoolSize(), defaultThreadPoolProfile.getMaxPoolSize(),
+            defaultThreadPoolProfile.getKeepAliveTime(), defaultThreadPoolProfile.getTimeUnit(),
+            defaultThreadPoolProfile.getMaxQueueSize(), false);
+        onNewExecutorService(answer);
+        return answer;
+    }
+
     public ExecutorService newCachedThreadPool(Object source, String name) {
         ExecutorService answer = ExecutorServiceHelper.newCachedThreadPool(threadNamePattern, name, true);
         onNewExecutorService(answer);
@@ -91,8 +118,10 @@ public class DefaultExecutorServiceStrat
         return answer;
     }
 
-    public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, boolean daemon) {
-        ExecutorService answer = ExecutorServiceHelper.newThreadPool(threadNamePattern, name, corePoolSize, maxPoolSize, keepAliveTime, timeUnit, daemon);
+    public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize, long keepAliveTime,
+                                         TimeUnit timeUnit, int maxQueueSize, boolean daemon) {
+        ExecutorService answer = ExecutorServiceHelper.newThreadPool(threadNamePattern, name, corePoolSize, maxPoolSize, 
+                                                                     keepAliveTime, timeUnit, maxQueueSize, daemon);
         onNewExecutorService(answer);
         return answer;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java Tue Mar 16 09:24:46 2010
@@ -53,7 +53,7 @@ public class DefaultProducerTemplate ext
     public DefaultProducerTemplate(CamelContext context) {
         this.context = context;
         this.producerCache = new ProducerCache(context);
-        this.executor = context.getExecutorServiceStrategy().newCachedThreadPool(this, "ProducerTemplate");
+        this.executor = context.getExecutorServiceStrategy().newDefaultThreadPool(this, "ProducerTemplate");
     }
 
     public DefaultProducerTemplate(CamelContext context, ExecutorService executor) {
@@ -683,7 +683,7 @@ public class DefaultProducerTemplate ext
         super.start();
         ServiceHelper.startService(producerCache);
         if (executor == null) {
-            executor = context.getExecutorServiceStrategy().newCachedThreadPool(this, "ProducerTemplate");
+            executor = context.getExecutorServiceStrategy().newDefaultThreadPool(this, "ProducerTemplate");
         }
     }
 

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java?rev=923642&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java Tue Mar 16 09:24:46 2010
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.spi.ThreadPoolProfile;
+
+/**
+ * @version $Revision$
+ */
+public class ThreadPoolProfileSupport implements ThreadPoolProfile {
+
+    private Boolean defaultProfile;
+    private Integer poolSize = 10;
+    private Integer maxPoolSize = 20;
+    private Long keepAliveTime = 60L;
+    private TimeUnit timeUnit = TimeUnit.SECONDS;
+    private Integer maxQueueSize = -1;
+
+    public Boolean isDefaultProfile() {
+        return defaultProfile;
+    }
+
+    public void setDefaultProfile(Boolean defaultProfile) {
+        this.defaultProfile = defaultProfile;
+    }
+
+    public Integer getPoolSize() {
+        return poolSize;
+    }
+
+    public void setPoolSize(Integer poolSize) {
+        this.poolSize = poolSize;
+    }
+
+    public Integer getMaxPoolSize() {
+        return maxPoolSize;
+    }
+
+    public void setMaxPoolSize(Integer maxPoolSize) {
+        this.maxPoolSize = maxPoolSize;
+    }
+
+    public Long getKeepAliveTime() {
+        return keepAliveTime;
+    }
+
+    public void setKeepAliveTime(Long keepAliveTime) {
+        this.keepAliveTime = keepAliveTime;
+    }
+
+    public TimeUnit getTimeUnit() {
+        return timeUnit;
+    }
+
+    public void setTimeUnit(TimeUnit timeUnit) {
+        this.timeUnit = timeUnit;
+    }
+
+    public Integer getMaxQueueSize() {
+        return maxQueueSize;
+    }
+
+    public void setMaxQueueSize(Integer maxQueueSize) {
+        this.maxQueueSize = maxQueueSize;
+    }
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Tue Mar 16 09:24:46 2010
@@ -154,7 +154,7 @@ public class AggregateDefinition extends
         if (executorService == null) {
             if (isParallelProcessing()) {
                 // we are running in parallel so create a cached thread pool which grows/shrinks automatic
-                executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, "Aggregator");
+                executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, "Aggregator");
             } else {
                 // use a single threaded if we are not running in parallel
                 executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newSingleThreadExecutor(this, "Aggregator");

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java Tue Mar 16 09:24:46 2010
@@ -151,7 +151,7 @@ public class MulticastDefinition extends
         executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
         if (isParallelProcessing() && executorService == null) {
             // we are running in parallel so create a cached thread pool which grows/shrinks automatic
-            executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, "Multicast");
+            executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, "Multicast");
         }
 
         return new MulticastProcessor(routeContext.getCamelContext(), list, aggregationStrategy, isParallelProcessing(),

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java Tue Mar 16 09:24:46 2010
@@ -99,7 +99,7 @@ public class OnCompletionDefinition exte
 
         executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
         if (executorService == null) {
-            executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, "OnCompletion");
+            executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, "OnCompletion");
         }
         OnCompletionProcessor answer = new OnCompletionProcessor(routeContext.getCamelContext(), childProcessor, executorService,
                                                                  onCompleteOnly, onFailureOnly, when);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java Tue Mar 16 09:24:46 2010
@@ -366,7 +366,7 @@ public class OnExceptionDefinition exten
      * Sets whether to log exhausted exceptions
      */
     public OnExceptionDefinition logExhausted(boolean logExhausted) {
-        getOrCreateRedeliveryPolicy().setLogExhasted(logExhausted);
+        getOrCreateRedeliveryPolicy().setLogExhausted(logExhausted);
         return this;
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java Tue Mar 16 09:24:46 2010
@@ -98,7 +98,7 @@ public class RecipientListDefinition ext
         executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
         if (isParallelProcessing() && executorService == null) {
             // we are running in parallel so create a cached thread pool which grows/shrinks automatic
-            executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, "RecipientList");
+            executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, "RecipientList");
         }
         answer.setExecutorService(executorService);
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RedeliveryPolicyDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RedeliveryPolicyDefinition.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RedeliveryPolicyDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RedeliveryPolicyDefinition.java Tue Mar 16 09:24:46 2010
@@ -63,7 +63,7 @@ public class RedeliveryPolicyDefinition 
     @XmlAttribute
     private Boolean logHandled;
     @XmlAttribute
-    private Boolean logExhasted;
+    private Boolean logExhausted;
     @XmlAttribute
     private Boolean disableRedelivery;
 
@@ -115,8 +115,8 @@ public class RedeliveryPolicyDefinition 
         if (logRetryAttempted != null) {
             answer.setLogRetryAttempted(logRetryAttempted);
         }
-        if (logExhasted != null) {
-            answer.setLogExhausted(logExhasted);
+        if (logExhausted != null) {
+            answer.setLogExhausted(logExhausted);
         }
         if (disableRedelivery != null && disableRedelivery) {
             answer.setMaximumRedeliveries(0);
@@ -253,7 +253,7 @@ public class RedeliveryPolicyDefinition 
      * @return the builder
      */
     public RedeliveryPolicyDefinition logExhausted(boolean logExhausted) {
-        setLogExhasted(logExhausted);
+        setLogExhausted(logExhausted);
         return this;
     }
 
@@ -438,11 +438,11 @@ public class RedeliveryPolicyDefinition 
         this.logRetryAttempted = logRetryAttempted;
     }
 
-    public Boolean isLogExhasted() {
-        return logExhasted;
+    public Boolean isLogExhausted() {
+        return logExhausted;
     }
 
-    public void setLogExhasted(Boolean logExhasted) {
-        this.logExhasted = logExhasted;
+    public void setLogExhausted(Boolean logExhausted) {
+        this.logExhausted = logExhausted;
     }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java Tue Mar 16 09:24:46 2010
@@ -91,7 +91,7 @@ public class SplitDefinition extends Exp
         executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
         if (isParallelProcessing() && executorService == null) {
             // we are running in parallel so create a cached thread pool which grows/shrinks automatic
-            executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, "Split");
+            executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, "Split");
         }
 
         Expression exp = getExpression().createExpression(routeContext);

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java?rev=923642&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java Tue Mar 16 09:24:46 2010
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model;
+
+import java.util.concurrent.TimeUnit;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+import org.apache.camel.builder.xml.TimeUnitAdapter;
+import org.apache.camel.spi.ThreadPoolProfile;
+
+/**
+ * Represents an XML &lt;threadPoolProfile/&gt; element
+ *
+ * @version $Revision$
+ */
+@XmlRootElement(name = "threadPoolProfile")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class ThreadPoolProfileDefinition extends OptionalIdentifiedDefinition implements ThreadPoolProfile {
+
+    @XmlAttribute()
+    private Boolean defaultProfile = false;
+    @XmlAttribute()
+    private Integer poolSize = 10;
+    @XmlAttribute()
+    private Integer maxPoolSize = 20;
+    @XmlAttribute()
+    private Long keepAliveTime = 60L;
+    @XmlJavaTypeAdapter(TimeUnitAdapter.class)
+    private TimeUnit timeUnit = TimeUnit.SECONDS;
+    @XmlAttribute()
+    private Integer maxQueueSize = -1;
+
+    public ThreadPoolProfileDefinition() {
+    }
+
+    public ThreadPoolProfileDefinition(ThreadPoolProfile threadPoolProfile) {
+        setDefaultProfile(threadPoolProfile.isDefaultProfile());
+        setPoolSize(threadPoolProfile.getPoolSize());
+        setMaxPoolSize(threadPoolProfile.getMaxPoolSize());
+        setKeepAliveTime(threadPoolProfile.getKeepAliveTime());
+        setTimeUnit(threadPoolProfile.getTimeUnit());
+        setMaxQueueSize(threadPoolProfile.getMaxQueueSize());
+    }
+
+    public ThreadPoolProfileDefinition poolSize(int poolSize) {
+        setPoolSize(poolSize);
+        return this;
+    }
+
+    public ThreadPoolProfileDefinition maxPoolSize(int maxPoolSize) {
+        setMaxPoolSize(maxPoolSize);
+        return this;
+    }
+
+    public ThreadPoolProfileDefinition keepAliveTime(long keepAliveTime) {
+        setKeepAliveTime(keepAliveTime);
+        return this;
+    }
+
+    public ThreadPoolProfileDefinition timeUnit(TimeUnit timeUnit) {
+        setTimeUnit(timeUnit);
+        return this;
+    }
+
+    public ThreadPoolProfileDefinition maxQueueSize(int maxQueueSize) {
+        setMaxQueueSize(maxQueueSize);
+        return this;
+    }
+
+    public Boolean isDefaultProfile() {
+        return defaultProfile;
+    }
+
+    public void setDefaultProfile(Boolean defaultProfile) {
+        this.defaultProfile = defaultProfile;
+    }
+
+    public Integer getPoolSize() {
+        return poolSize;
+    }
+
+    public void setPoolSize(Integer poolSize) {
+        this.poolSize = poolSize;
+    }
+
+    public Integer getMaxPoolSize() {
+        return maxPoolSize;
+    }
+
+    public void setMaxPoolSize(Integer maxPoolSize) {
+        this.maxPoolSize = maxPoolSize;
+    }
+
+    public Long getKeepAliveTime() {
+        return keepAliveTime;
+    }
+
+    public void setKeepAliveTime(Long keepAliveTime) {
+        this.keepAliveTime = keepAliveTime;
+    }
+
+    public TimeUnit getTimeUnit() {
+        return timeUnit;
+    }
+
+    public void setTimeUnit(TimeUnit timeUnit) {
+        this.timeUnit = timeUnit;
+    }
+
+    public Integer getMaxQueueSize() {
+        return maxQueueSize;
+    }
+
+    public void setMaxQueueSize(Integer maxQueueSize) {
+        this.maxQueueSize = maxQueueSize;
+    }
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java Tue Mar 16 09:24:46 2010
@@ -44,20 +44,20 @@ public class ThreadsDefinition extends O
 
     @XmlTransient
     private ExecutorService executorService;
-    @XmlAttribute(required = false)
+    @XmlAttribute
     private String executorServiceRef;
-    @XmlAttribute(required = false)
+    @XmlAttribute
     private Integer poolSize;
-    @XmlAttribute(required = false)
+    @XmlAttribute
     private Integer maxPoolSize;
-    @XmlAttribute(required = false)
+    @XmlAttribute
     private Integer keepAliveTime = 60;
-    @XmlAttribute(required = false)
+    @XmlAttribute
     @XmlJavaTypeAdapter(TimeUnitAdapter.class)
     private TimeUnit units = TimeUnit.SECONDS;
-    @XmlAttribute(required = false)
+    @XmlAttribute
     private String threadName;
-    @XmlAttribute(required = false)
+    @XmlAttribute
     private WaitForTaskToComplete waitForTaskToComplete = WaitForTaskToComplete.IfReplyExpected;
 
     @Override
@@ -69,12 +69,12 @@ public class ThreadsDefinition extends O
             String name = getThreadName() != null ? getThreadName() : "Threads";
             if (poolSize == null || poolSize <= 0) {
                 // use the cached thread pool
-                executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, name);
+                executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, name);
             } else {
                 // use a custom pool based on the settings
                 int max = getMaxPoolSize() != null ? getMaxPoolSize() : poolSize;
                 executorService = routeContext.getCamelContext().getExecutorServiceStrategy()
-                                        .newThreadPool(this, name, poolSize, max, getKeepAliveTime(), getUnits(), true);
+                                        .newThreadPool(this, name, poolSize, max, getKeepAliveTime(), getUnits(), -1, true);
             }
         }
         Processor childProcessor = routeContext.createProcessor(this);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java Tue Mar 16 09:24:46 2010
@@ -69,7 +69,7 @@ public class WireTapDefinition extends S
 
         executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
         if (executorService == null) {
-            executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, "WireTap");
+            executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, "WireTap");
         }
         WireTapProcessor answer = new WireTapProcessor(endpoint, getPattern(), executorService);
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java Tue Mar 16 09:24:46 2010
@@ -188,7 +188,7 @@ public class SendAsyncProcessor extends 
         if (producerExecutorService == null) {
             // use a cached pool for the producers which can grow/schrink itself
             producerExecutorService = destination.getCamelContext().getExecutorServiceStrategy()
-                                        .newCachedThreadPool(this, "SendAsyncProcessor-Producer");
+                                        .newDefaultThreadPool(this, "SendAsyncProcessor-Producer");
         }
         return producerExecutorService;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java Tue Mar 16 09:24:46 2010
@@ -33,12 +33,29 @@ import org.apache.camel.ShutdownableServ
  * do not have to exactly create those kind of pools. Feel free to return a shared or different kind of pool.
  * <p/>
  * However there are two types of pools: regular and scheduled.
+ * <p/>
+ * If you use the <tt>newXXX</tt> methods to create thread pools, then Camel will by default take care of
+ * shutting down those created pools when {@link org.apache.camel.CamelContext} is shutting down.
  *
  * @version $Revision$
  */
 public interface ExecutorServiceStrategy extends ShutdownableService {
 
     /**
+     * Gets the default thread pool profile
+     *
+     * @return the default profile
+     */
+    ThreadPoolProfile getDefaultThreadPoolProfile();
+
+    /**
+     * Sets the default thread pool profile
+     *
+     * @param defaultThreadPoolProfile the new default thread pool profile
+     */
+    void setDefaultThreadPoolProfile(ThreadPoolProfile defaultThreadPoolProfile);
+
+    /**
      * Creates a full thread name
      *
      * @param name  name which is appended to the full thread name
@@ -76,11 +93,20 @@ public interface ExecutorServiceStrategy
     ExecutorService lookup(Object source, String executorServiceRef);
 
     /**
+     * Creates a new thread pool using the default thread pool profile.
+     *
+     * @param source      the source object, usually it should be <tt>this</tt> passed in as parameter
+     * @param name        name which is appended to the thread name
+     * @return the created thread pool
+     */
+    ExecutorService newDefaultThreadPool(Object source, String name);
+
+    /**
      * Creates a new cached thread pool.
      *
      * @param source      the source object, usually it should be <tt>this</tt> passed in as parameter
      * @param name        name which is appended to the thread name
-     * @return the thread pool
+     * @return the created thread pool
      */
     ExecutorService newCachedThreadPool(Object source, String name);
 
@@ -90,7 +116,7 @@ public interface ExecutorServiceStrategy
      * @param source      the source object, usually it should be <tt>this</tt> passed in as parameter
      * @param name        name which is appended to the thread name
      * @param poolSize    the core pool size
-     * @return the thread pool
+     * @return the created thread pool
      */
     ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize);
 
@@ -100,7 +126,7 @@ public interface ExecutorServiceStrategy
      * @param source      the source object, usually it should be <tt>this</tt> passed in as parameter
      * @param name        name which is appended to the thread name
      * @param poolSize    the core pool size
-     * @return the thread pool
+     * @return the created thread pool
      */
     ExecutorService newFixedThreadPool(Object source, String name, int poolSize);
 
@@ -109,7 +135,7 @@ public interface ExecutorServiceStrategy
      *
      * @param source      the source object, usually it should be <tt>this</tt> passed in as parameter
      * @param name        name which is appended to the thread name
-     * @return the thread pool
+     * @return the created thread pool
      */
     ExecutorService newSingleThreadExecutor(Object source, String name);
 
@@ -122,7 +148,7 @@ public interface ExecutorServiceStrategy
      * @param name          name which is appended to the thread name
      * @param corePoolSize  the core pool size
      * @param maxPoolSize   the maximum pool size
-     * @return the thread pool
+     * @return the created thread pool
      */
     ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize);
 
@@ -135,11 +161,12 @@ public interface ExecutorServiceStrategy
      * @param maxPoolSize   the maximum pool size
      * @param keepAliveTime keep alive time for idle threads
      * @param timeUnit      time unit for keep alive time
+     * @param maxQueueSize  the maximum number of tasks in the queue, use <tt>Integer.MAX_INT</tt> or <tt>-1</tt> to indicate unbounded
      * @param daemon        whether or not the created threads is daemon or not
-     * @return the thread pool
+     * @return the created thread pool
      */
     ExecutorService newThreadPool(Object source, final String name, int corePoolSize, int maxPoolSize,
-                                  long keepAliveTime, TimeUnit timeUnit, boolean daemon);
+                                  long keepAliveTime, TimeUnit timeUnit, int maxQueueSize, boolean daemon);
 
     /**
      * Shutdown the given executor service.

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java?rev=923642&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java Tue Mar 16 09:24:46 2010
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A profile which defines thread pool settings.
+ *
+ * @version $Revision$
+ */
+public interface ThreadPoolProfile {
+
+    /**
+     * Whether this profile is the default profile (there can only be one).
+     *
+     * @return <tt>true</tt> if its the default profile, <tt>false</tt> otherwise
+     */
+    Boolean isDefaultProfile();
+
+    /**
+     * Sets whether this profile is the default profile (there can only be one).
+     *
+     * @param defaultProfile the option
+     */
+    void setDefaultProfile(Boolean defaultProfile);
+
+    /**
+     * Gets the core pool size (threads to keep minimum in pool)
+     *
+     * @return the pool size
+     */
+    Integer getPoolSize();
+
+    /**
+     * Sets the core pool size (threads to keep minimum in pool)
+     *
+     * @param poolSize the pool size
+     */
+    void setPoolSize(Integer poolSize);
+
+    /**
+     * Gets the maximum pool size
+     *
+     * @return the maximum pool size
+     */
+    Integer getMaxPoolSize();
+
+    /**
+     * Sets the maximum pool size
+     *
+     * @param maxPoolSize the maximum pool size
+     */
+    void setMaxPoolSize(Integer maxPoolSize);
+
+    /**
+     * Gets the keep alive time for inactive threads
+     *
+     * @return the keep alive time
+     */
+    Long getKeepAliveTime();
+
+    /**
+     * Sets the keep alive time for inactive threads
+     *
+     * @param keepAliveTime the keep alive time
+     */
+    void setKeepAliveTime(Long keepAliveTime);
+
+    /**
+     * Gets the time unit used for keep alive time
+     *
+     * @return the time unit
+     */
+    TimeUnit getTimeUnit();
+
+    /**
+     * Sets the time unit used for keep alive time
+     *
+     * @param timeUnit the time unit
+     */
+    void setTimeUnit(TimeUnit timeUnit);
+
+    /**
+     * Gets the maximum number of tasks in the work queue.
+     * <p/>
+     * Use <tt>-1</tt> or <tt>Integer.MAX_VALUE</tt> for an unbounded queue
+     *
+     * @return the max queue size
+     */
+    Integer getMaxQueueSize();
+
+    /**
+     * Sets the maximum number of tasks in the work queue.
+     * <p/>
+     * Use <tt>-1</tt> or <tt>Integer.MAX_VALUE</tt> for an unbounded queue
+     *
+     * @param maxQueueSize the max queue size
+     */
+    void setMaxQueueSize(Integer maxQueueSize);
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Tue Mar 16 09:24:46 2010
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.util.concurrent;
 
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -132,7 +133,7 @@ public final class ExecutorServiceHelper
     }
 
     /**
-     * Creates a new cached thread pool which should be the most commonly used.
+     * Creates a new cached thread pool
      *
      * @param pattern  pattern of the thread name
      * @param name     ${name} in the pattern name
@@ -150,7 +151,7 @@ public final class ExecutorServiceHelper
     }
 
     /**
-     * Creates a new custom thread pool using 60 seconds as keep alive
+     * Creates a new custom thread pool using 60 seconds as keep alive and with an unbounded queue.
      *
      * @param pattern       pattern of the thread name
      * @param name          ${name} in the pattern name
@@ -159,7 +160,7 @@ public final class ExecutorServiceHelper
      * @return the created pool
      */
     public static ExecutorService newThreadPool(final String pattern, final String name, int corePoolSize, int maxPoolSize) {
-        return ExecutorServiceHelper.newThreadPool(pattern, name, corePoolSize, maxPoolSize, 60, TimeUnit.SECONDS, true);
+        return ExecutorServiceHelper.newThreadPool(pattern, name, corePoolSize, maxPoolSize, 60, TimeUnit.SECONDS, -1, true);
     }
 
     /**
@@ -171,20 +172,30 @@ public final class ExecutorServiceHelper
      * @param maxPoolSize   the maximum pool size
      * @param keepAliveTime keep alive
      * @param timeUnit      keep alive time unit
+     * @param maxQueueSize  the maximum number of tasks in the queue, use <tt>Integer.MAX_VALUE</tt> or <tt>-1</tt> to indicate unbounded
      * @param daemon        whether the threads is daemon or not
      * @return the created pool
      * @throws IllegalArgumentException if parameters is not valid
      */
     public static ExecutorService newThreadPool(final String pattern, final String name, int corePoolSize, int maxPoolSize,
-                                                long keepAliveTime, TimeUnit timeUnit, final boolean daemon) {
+                                                long keepAliveTime, TimeUnit timeUnit, int maxQueueSize, final boolean daemon) {
 
         // validate max >= core
         if (maxPoolSize < corePoolSize) {
             throw new IllegalArgumentException("MaxPoolSize must be >= corePoolSize, was " + maxPoolSize + " >= " + corePoolSize);
         }
 
-        ThreadPoolExecutor answer = new ThreadPoolExecutor(corePoolSize, maxPoolSize,
-                                                           keepAliveTime, timeUnit, new LinkedBlockingQueue<Runnable>());
+        if (maxQueueSize == 0) {
+            throw new IllegalArgumentException("MaxQueueSize cannot be 0.");
+        }
+
+        BlockingQueue<Runnable> queue;
+        if (maxQueueSize < 0) {
+            queue = new LinkedBlockingQueue<Runnable>();
+        } else {
+            queue = new LinkedBlockingQueue<Runnable>(maxQueueSize);
+        }
+        ThreadPoolExecutor answer = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, queue);
         answer.setThreadFactory(new ThreadFactory() {
             public Thread newThread(Runnable r) {
                 Thread answer = new Thread(r, getThreadName(pattern, name));

Modified: camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index (original)
+++ camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index Tue Mar 16 09:24:46 2010
@@ -69,6 +69,7 @@ SetPropertyDefinition
 SortDefinition
 SplitDefinition
 StopDefinition
+ThreadPoolProfileDefinition
 ThreadsDefinition
 ThrottleDefinition
 ThrowExceptionDefinition

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java?rev=923642&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java Tue Mar 16 09:24:46 2010
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.impl.JndiRegistry;
+
+/**
+ * @version $Revision$
+ */
+public class ThreadPoolBuilderTest extends ContextTestSupport {
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        ExecutorService someone = Executors.newCachedThreadPool();
+        jndi.bind("someonesPool", someone);
+        return jndi;
+    }
+
+    public void testThreadPoolBuilderLookup() throws Exception {
+        ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
+        ExecutorService executor = builder.lookup(this, "someonesPool");
+        assertNotNull(executor);
+
+        assertEquals(false, executor.isShutdown());
+        context.stop();
+        // you need to manage this pool yourself
+        assertEquals(false, executor.isShutdown());
+    }
+
+    public void testThreadPoolBuilderDefault() throws Exception {
+        ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
+        ExecutorService executor = builder.build(this, "myPool");
+        assertNotNull(executor);
+
+        assertEquals(false, executor.isShutdown());
+        context.stop();
+        assertEquals(true, executor.isShutdown());
+    }
+
+    public void testThreadPoolBuilderMaxQueueSize() throws Exception {
+        ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
+        ExecutorService executor = builder.maxQueueSize(2000).build(this, "myPool");
+        assertNotNull(executor);
+
+        assertEquals(false, executor.isShutdown());
+        context.stop();
+        assertEquals(true, executor.isShutdown());
+    }
+
+    public void testThreadPoolBuilderMax() throws Exception {
+        ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
+        ExecutorService executor = builder.maxPoolSize(100).build(this, "myPool");
+        assertNotNull(executor);
+
+        assertEquals(false, executor.isShutdown());
+        context.stop();
+        assertEquals(true, executor.isShutdown());
+    }
+
+    public void testThreadPoolBuilderCoreAndMax() throws Exception {
+        ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
+        ExecutorService executor = builder.poolSize(50).maxPoolSize(100).build(this, "myPool");
+        assertNotNull(executor);
+
+        assertEquals(false, executor.isShutdown());
+        context.stop();
+        assertEquals(true, executor.isShutdown());
+    }
+
+    public void testThreadPoolBuilderKeepAlive() throws Exception {
+        ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
+        ExecutorService executor = builder.keepAliveTime(30).build(this, "myPool");
+        assertNotNull(executor);
+
+        assertEquals(false, executor.isShutdown());
+        context.stop();
+        assertEquals(true, executor.isShutdown());
+    }
+
+    public void testThreadPoolBuilderKeepAliveTimeUnit() throws Exception {
+        ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
+        ExecutorService executor = builder.keepAliveTime(20000).timeUnit(TimeUnit.MILLISECONDS).build(this, "myPool");
+        assertNotNull(executor);
+
+        assertEquals(false, executor.isShutdown());
+        context.stop();
+        assertEquals(true, executor.isShutdown());
+    }
+
+    public void testThreadPoolBuilderAll() throws Exception {
+        ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
+        ExecutorService executor = builder.poolSize(50).maxPoolSize(100).maxQueueSize(2000)
+                .keepAliveTime(20000).timeUnit(TimeUnit.MILLISECONDS).build(this, "myPool");
+        assertNotNull(executor);
+
+        assertEquals(false, executor.isShutdown());
+        context.stop();
+        assertEquals(true, executor.isShutdown());
+    }
+
+    public void testThreadPoolBuilderTwoPoolsDefault() throws Exception {
+        ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
+        ExecutorService executor = builder.build(this, "myPool");
+        ExecutorService executor2 = builder.build(this, "myOtherPool");
+
+        assertNotNull(executor);
+        assertNotNull(executor2);
+
+        assertEquals(false, executor.isShutdown());
+        assertEquals(false, executor2.isShutdown());
+        context.stop();
+        assertEquals(true, executor.isShutdown());
+        assertEquals(true, executor2.isShutdown());
+    }
+
+
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java Tue Mar 16 09:24:46 2010
@@ -16,6 +16,10 @@
  */
 package org.apache.camel.impl;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.ContextTestSupport;
 
 /**
@@ -68,4 +72,44 @@ public class DefaultExecutorServiceStrat
         // reset it so we can shutdown properly
         context.getExecutorServiceStrategy().setThreadNamePattern("Camel Thread ${counter} - ${name}");
     }
+
+    public void testDefaultThreadPool() throws Exception {
+        ExecutorService myPool = context.getExecutorServiceStrategy().newDefaultThreadPool(this, "myPool");
+        assertEquals(false, myPool.isShutdown());
+
+        // should use default settings
+        ThreadPoolExecutor executor = (ThreadPoolExecutor) myPool;
+        assertEquals(10, executor.getCorePoolSize());
+        assertEquals(20, executor.getMaximumPoolSize());
+        assertEquals(60, executor.getKeepAliveTime(TimeUnit.SECONDS));
+        assertEquals(Integer.MAX_VALUE, executor.getQueue().remainingCapacity());
+
+        context.stop();
+        assertEquals(true, myPool.isShutdown());
+    }
+
+    public void testCustomDefaultThreadPool() throws Exception {
+        ThreadPoolProfileSupport custom = new ThreadPoolProfileSupport();
+        custom.setKeepAliveTime(20L);
+        custom.setMaxPoolSize(40);
+        custom.setPoolSize(5);
+        custom.setMaxQueueSize(2000);
+
+        context.getExecutorServiceStrategy().setDefaultThreadPoolProfile(custom);
+        assertEquals(true, custom.isDefaultProfile().booleanValue());
+
+        ExecutorService myPool = context.getExecutorServiceStrategy().newDefaultThreadPool(this, "myPool");
+        assertEquals(false, myPool.isShutdown());
+
+        // should use default settings
+        ThreadPoolExecutor executor = (ThreadPoolExecutor) myPool;
+        assertEquals(5, executor.getCorePoolSize());
+        assertEquals(40, executor.getMaximumPoolSize());
+        assertEquals(20, executor.getKeepAliveTime(TimeUnit.SECONDS));
+        assertEquals(2000, executor.getQueue().remainingCapacity());
+
+        context.stop();
+        assertEquals(true, myPool.isShutdown());
+    }
+
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShutdownThreadPoolTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShutdownThreadPoolTest.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShutdownThreadPoolTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShutdownThreadPoolTest.java Tue Mar 16 09:24:46 2010
@@ -94,7 +94,7 @@ public class AggregateShutdownThreadPool
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                myPool = context.getExecutorServiceStrategy().newCachedThreadPool(this, "myPool");
+                myPool = context.getExecutorServiceStrategy().newDefaultThreadPool(this, "myPool");
 
                 from("direct:foo").routeId("foo")
                     .aggregate(header("id"), new BodyInAggregatingStrategy()).completionSize(3)