You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cs...@apache.org on 2011/08/18 20:19:35 UTC

svn commit: r1159342 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/builder/ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/model/ camel-core/src/main/java/org/apache/camel/spi/ camel-core/src/te...

Author: cschneider
Date: Thu Aug 18 18:19:35 2011
New Revision: 1159342

URL: http://svn.apache.org/viewvc?rev=1159342&view=rev
Log:
CAMEL-4244 Add ThreadPoolProfileBuilder and change ThreadPoolFactory to use the ThreadPoolProfile

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolProfileBuilder.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultThreadPoolFactory.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolFactory.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CustomThreadPoolFactoryTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
    camel/trunk/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelThreadPoolFactoryBean.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/CustomThreadPoolFactoryTest.java

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolProfileBuilder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolProfileBuilder.java?rev=1159342&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolProfileBuilder.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolProfileBuilder.java Thu Aug 18 18:19:35 2011
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ThreadPoolRejectedPolicy;
+import org.apache.camel.spi.ThreadPoolProfile;
+
+public class ThreadPoolProfileBuilder {
+    private final ThreadPoolProfile profile;
+
+    public ThreadPoolProfileBuilder(String id) {
+        this.profile = new ThreadPoolProfile(id);
+    }
+
+    public ThreadPoolProfileBuilder(String id, ThreadPoolProfile origProfile) {
+        this.profile = origProfile.clone();
+        this.profile.setId(id);
+    }
+    
+    public ThreadPoolProfileBuilder defaultProfile(Boolean defaultProfile) {
+        this.profile.setDefaultProfile(defaultProfile);
+        return this;
+    }
+
+
+    public ThreadPoolProfileBuilder poolSize(Integer poolSize) {
+        profile.setPoolSize(poolSize);
+        return this;
+    }
+
+    public ThreadPoolProfileBuilder maxPoolSize(Integer maxPoolSize) {
+        profile.setMaxPoolSize(maxPoolSize);
+        return this;
+    }
+
+    public ThreadPoolProfileBuilder keepAliveTime(Long keepAliveTime, TimeUnit timeUnit) {
+        profile.setKeepAliveTime(keepAliveTime);
+        profile.setTimeUnit(timeUnit);
+        return this;
+    }
+
+    public ThreadPoolProfileBuilder keepAliveTime(Long keepAliveTime) {
+        profile.setKeepAliveTime(keepAliveTime);
+        return this;
+    }
+    
+    public ThreadPoolProfileBuilder maxQueueSize(Integer maxQueueSize) {
+        if (maxQueueSize != null) {
+            profile.setMaxQueueSize(maxQueueSize);
+        }
+        return this;
+    }
+
+    public ThreadPoolProfileBuilder rejectedPolicy(ThreadPoolRejectedPolicy rejectedPolicy) {
+        profile.setRejectedPolicy(rejectedPolicy);
+        return this;
+    }
+
+    /**
+     * Builds the new thread pool
+     * 
+     * @return the created thread pool
+     * @throws Exception is thrown if error building the thread pool
+     */
+    public ThreadPoolProfile build() {
+        return profile;
+    }
+
+
+}

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java?rev=1159342&r1=1159341&r2=1159342&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java Thu Aug 18 18:19:35 2011
@@ -22,7 +22,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -30,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.ThreadPoolRejectedPolicy;
+import org.apache.camel.builder.ThreadPoolProfileBuilder;
 import org.apache.camel.model.OptionalIdentifiedDefinition;
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.model.ProcessorDefinitionHelper;
@@ -56,21 +56,20 @@ public class DefaultExecutorServiceManag
     private String threadNamePattern;
     private String defaultThreadPoolProfileId = "defaultThreadPoolProfile";
     private final Map<String, ThreadPoolProfile> threadPoolProfiles = new HashMap<String, ThreadPoolProfile>();
+    private ThreadPoolProfile builtIndefaultProfile;
 
     public DefaultExecutorServiceManager(CamelContext camelContext) {
         this.camelContext = camelContext;
 
-        // create and register the default profile
-        ThreadPoolProfile defaultProfile = new ThreadPoolProfile(defaultThreadPoolProfileId);
-        // the default profile has the following values
-        defaultProfile.setDefaultProfile(true);
-        defaultProfile.setPoolSize(10);
-        defaultProfile.setMaxPoolSize(20);
-        defaultProfile.setKeepAliveTime(60L);
-        defaultProfile.setTimeUnit(TimeUnit.SECONDS);
-        defaultProfile.setMaxQueueSize(1000);
-        defaultProfile.setRejectedPolicy(ThreadPoolRejectedPolicy.CallerRuns);
-        registerThreadPoolProfile(defaultProfile);
+        builtIndefaultProfile = new ThreadPoolProfileBuilder(defaultThreadPoolProfileId)
+            .defaultProfile(true)
+            .poolSize(10)
+            .maxPoolSize(20)
+            .keepAliveTime(60L, TimeUnit.SECONDS)
+            .maxQueueSize(1000)
+            .rejectedPolicy(ThreadPoolRejectedPolicy.CallerRuns)
+            .build();
+        registerThreadPoolProfile(builtIndefaultProfile);
     }
 
     @Override
@@ -102,46 +101,12 @@ public class DefaultExecutorServiceManag
 
     @Override
     public void setDefaultThreadPoolProfile(ThreadPoolProfile defaultThreadPoolProfile) {
-        ThreadPoolProfile oldProfile = threadPoolProfiles.remove(defaultThreadPoolProfileId);
-        if (oldProfile != null) {
-            // the old is no longer default
-            oldProfile.setDefaultProfile(false);
-
-            // fallback and use old default values for new default profile if absent (convention over configuration)
-            if (defaultThreadPoolProfile.getKeepAliveTime() == null) {
-                defaultThreadPoolProfile.setKeepAliveTime(oldProfile.getKeepAliveTime());
-            }
-            if (defaultThreadPoolProfile.getMaxPoolSize() == null) {
-                defaultThreadPoolProfile.setMaxPoolSize(oldProfile.getMaxPoolSize());
-            }
-            if (defaultThreadPoolProfile.getRejectedPolicy() == null) {
-                defaultThreadPoolProfile.setRejectedPolicy(oldProfile.getRejectedPolicy());
-            }
-            if (defaultThreadPoolProfile.getMaxQueueSize() == null) {
-                defaultThreadPoolProfile.setMaxQueueSize(oldProfile.getMaxQueueSize());
-            }
-            if (defaultThreadPoolProfile.getPoolSize() == null) {
-                defaultThreadPoolProfile.setPoolSize(oldProfile.getPoolSize());
-            }
-            if (defaultThreadPoolProfile.getTimeUnit() == null) {
-                defaultThreadPoolProfile.setTimeUnit(oldProfile.getTimeUnit());
-            }
-        }
-
-        // validate that all options has been given as its mandatory for a default thread pool profile
-        // as it is used as fallback for other profiles if they do not have that particular value
-        ObjectHelper.notEmpty(defaultThreadPoolProfile.getId(), "id", defaultThreadPoolProfile);
-        ObjectHelper.notNull(defaultThreadPoolProfile.getKeepAliveTime(), "keepAliveTime", defaultThreadPoolProfile);
-        ObjectHelper.notNull(defaultThreadPoolProfile.getMaxPoolSize(), "maxPoolSize", defaultThreadPoolProfile);
-        ObjectHelper.notNull(defaultThreadPoolProfile.getMaxQueueSize(), "maxQueueSize", defaultThreadPoolProfile);
-        ObjectHelper.notNull(defaultThreadPoolProfile.getPoolSize(), "poolSize", defaultThreadPoolProfile);
-        ObjectHelper.notNull(defaultThreadPoolProfile.getTimeUnit(), "timeUnit", defaultThreadPoolProfile);
+        threadPoolProfiles.remove(defaultThreadPoolProfileId);
+        defaultThreadPoolProfile.addDefaults(builtIndefaultProfile);
 
         LOG.info("Using custom DefaultThreadPoolProfile: " + defaultThreadPoolProfile);
 
-        // and replace with the new default profile
         this.defaultThreadPoolProfileId = defaultThreadPoolProfile.getId();
-        // and mark the new profile as default
         defaultThreadPoolProfile.setDefaultProfile(true);
         registerThreadPoolProfile(defaultThreadPoolProfile);
     }
@@ -170,12 +135,7 @@ public class DefaultExecutorServiceManag
 
     @Override
     public ScheduledExecutorService newDefaultScheduledThreadPool(Object source, String name) {
-        ThreadPoolProfile defaultProfile = getDefaultThreadPoolProfile();
-
-        ThreadFactory threadFactory = createThreadFactory(name, true);
-        ScheduledExecutorService executorService = threadPoolFactory.newScheduledThreadPool(defaultProfile.getPoolSize(), threadFactory);
-        onThreadPoolCreated(executorService, source, null);
-        return executorService;
+        return newScheduledThreadPool(source, name, getDefaultThreadPoolProfile());
     }
 
     @Override
@@ -194,35 +154,22 @@ public class DefaultExecutorServiceManag
         ObjectHelper.notNull(profile, "ThreadPoolProfile");
 
         ThreadPoolProfile defaultProfile = getDefaultThreadPoolProfile();
-        // fallback to use values from default profile if not specified
-        Integer poolSize = profile.getPoolSize() != null ? profile.getPoolSize() : defaultProfile.getPoolSize();
-        Integer maxPoolSize = profile.getMaxPoolSize() != null ? profile.getMaxPoolSize() : defaultProfile.getMaxPoolSize();
-        Long keepAliveTime = profile.getKeepAliveTime() != null ? profile.getKeepAliveTime() : defaultProfile.getKeepAliveTime();
-        TimeUnit timeUnit = profile.getTimeUnit() != null ? profile.getTimeUnit() : defaultProfile.getTimeUnit();
-        Integer maxQueueSize = profile.getMaxQueueSize() != null ? profile.getMaxQueueSize() : defaultProfile.getMaxQueueSize();
-        RejectedExecutionHandler handler = profile.getRejectedExecutionHandler() != null ? profile.getRejectedExecutionHandler() : defaultProfile.getRejectedExecutionHandler();
+        profile.addDefaults(defaultProfile);
 
         ThreadFactory threadFactory = createThreadFactory(name, true);
-        ExecutorService executorService = threadPoolFactory.newThreadPool(poolSize, maxPoolSize,
-                keepAliveTime, timeUnit, maxQueueSize, handler, threadFactory);
+        ExecutorService executorService = threadPoolFactory.newThreadPool(profile, threadFactory);
         onThreadPoolCreated(executorService, source, profile.getId());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Created new ThreadPool for source: {} with name: {}. -> {}", new Object[]{source, name, executorService});
+        }
+
         return executorService;
     }
 
     @Override
     public ExecutorService newThreadPool(Object source, String name, int poolSize, int maxPoolSize) {
-        ThreadPoolProfile defaultProfile = getDefaultThreadPoolProfile();
-
-        // fallback to use values from default profile
-        ExecutorService answer = threadPoolFactory.newThreadPool(poolSize, maxPoolSize,
-                defaultProfile.getKeepAliveTime(), defaultProfile.getTimeUnit(), defaultProfile.getMaxQueueSize(),
-                defaultProfile.getRejectedExecutionHandler(), new CamelThreadFactory(threadNamePattern, name, true));
-        onThreadPoolCreated(answer, source, null);
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Created new ThreadPool for source: {} with name: {}. -> {}", new Object[]{source, name, answer});
-        }
-        return answer;
+        ThreadPoolProfile profile = new ThreadPoolProfileBuilder(name).poolSize(poolSize).maxPoolSize(maxPoolSize).build();
+        return  newThreadPool(source, name, profile);
     }
 
     @Override
@@ -232,7 +179,7 @@ public class DefaultExecutorServiceManag
 
     @Override
     public ExecutorService newCachedThreadPool(Object source, String name) {
-        ExecutorService answer = threadPoolFactory.newCachedThreadPool(new CamelThreadFactory(threadNamePattern, name, true));
+        ExecutorService answer = threadPoolFactory.newCachedThreadPool(createThreadFactory(name, true));
         onThreadPoolCreated(answer, source, null);
 
         if (LOG.isDebugEnabled()) {
@@ -243,29 +190,32 @@ public class DefaultExecutorServiceManag
 
     @Override
     public ExecutorService newFixedThreadPool(Object source, String name, int poolSize) {
-        ExecutorService answer = threadPoolFactory.newFixedThreadPool(poolSize, new CamelThreadFactory(threadNamePattern, name, true));
-        onThreadPoolCreated(answer, source, null);
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Created new FixedThreadPool for source: {} with name: {}. -> {}", new Object[]{source, name, answer});
-        }
-        return answer;
+        ThreadPoolProfile profile = new ThreadPoolProfileBuilder(name).poolSize(poolSize).maxPoolSize(poolSize).keepAliveTime(0L).build();
+        return newThreadPool(source, name, profile);
     }
 
     @Override
     public ScheduledExecutorService newSingleThreadScheduledExecutor(Object source, String name) {
         return newScheduledThreadPool(source, name, 1);
     }
-
+    
     @Override
-    public ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize) {
-        ScheduledExecutorService answer = threadPoolFactory.newScheduledThreadPool(poolSize, new CamelThreadFactory(threadNamePattern, name, true));
+    public ScheduledExecutorService newScheduledThreadPool(Object source, String name, ThreadPoolProfile profile) {
+        profile.addDefaults(getDefaultThreadPoolProfile());
+        ScheduledExecutorService answer = threadPoolFactory.newScheduledThreadPool(profile, createThreadFactory(name, true));
         onThreadPoolCreated(answer, source, null);
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Created new ScheduledThreadPool for source: {} with name: {}. -> {}", new Object[]{source, name, answer});
         }
         return answer;
+
+    }
+
+    @Override
+    public ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize) {
+        ThreadPoolProfile profile = new ThreadPoolProfileBuilder(name).poolSize(poolSize).build();
+        return newScheduledThreadPool(source, name, profile);
     }
 
     @Override

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultThreadPoolFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultThreadPoolFactory.java?rev=1159342&r1=1159341&r2=1159342&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultThreadPoolFactory.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultThreadPoolFactory.java Thu Aug 18 18:19:35 2011
@@ -28,6 +28,7 @@ import java.util.concurrent.ThreadPoolEx
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.spi.ThreadPoolFactory;
+import org.apache.camel.spi.ThreadPoolProfile;
 
 /**
  * Factory for thread pools that uses the JDK {@link Executors} for creating the thread pools.
@@ -37,13 +38,16 @@ public class DefaultThreadPoolFactory im
     public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
         return Executors.newCachedThreadPool(threadFactory);
     }
-
-    public ExecutorService newFixedThreadPool(int poolSize, ThreadFactory threadFactory) {
-        return Executors.newFixedThreadPool(poolSize, threadFactory);
-    }
-
-    public ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) throws IllegalArgumentException {
-        return Executors.newScheduledThreadPool(corePoolSize, threadFactory);
+    
+    @Override
+    public ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory factory) {
+        return newThreadPool(profile.getPoolSize(), 
+                             profile.getMaxPoolSize(), 
+                             profile.getKeepAliveTime(),
+                             profile.getTimeUnit(),
+                             profile.getMaxQueueSize(), 
+                             profile.getRejectedExecutionHandler(),
+                             factory);
     }
 
     public ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit,
@@ -84,5 +88,13 @@ public class DefaultThreadPoolFactory im
         answer.setRejectedExecutionHandler(rejectedExecutionHandler);
         return answer;
     }
+    
+    /* (non-Javadoc)
+     * @see org.apache.camel.impl.ThreadPoolFactory#newScheduledThreadPool(java.lang.Integer, java.util.concurrent.ThreadFactory)
+     */
+    @Override
+    public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
+        return Executors.newScheduledThreadPool(profile.getPoolSize(), threadFactory);
+    }
 
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=1159342&r1=1159341&r2=1159342&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java Thu Aug 18 18:19:35 2011
@@ -20,6 +20,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
@@ -29,10 +30,11 @@ import javax.xml.bind.annotation.adapter
 
 import org.apache.camel.Processor;
 import org.apache.camel.ThreadPoolRejectedPolicy;
-import org.apache.camel.builder.ThreadPoolBuilder;
+import org.apache.camel.builder.ThreadPoolProfileBuilder;
 import org.apache.camel.builder.xml.TimeUnitAdapter;
 import org.apache.camel.processor.Pipeline;
 import org.apache.camel.processor.ThreadsProcessor;
+import org.apache.camel.spi.ExecutorServiceManager;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.ThreadPoolProfile;
 
@@ -81,24 +83,16 @@ public class ThreadsDefinition extends O
         executorService = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, name, this);
         // if no explicit then create from the options
         if (executorService == null) {
-            ThreadPoolProfile profile = routeContext.getCamelContext().getExecutorServiceManager().getDefaultThreadPoolProfile();
-            // use the default thread pool profile as base and then override with values
-            // use a custom pool based on the settings
-            int core = getPoolSize() != null ? getPoolSize() : profile.getPoolSize();
-            int max = getMaxPoolSize() != null ? getMaxPoolSize() : profile.getMaxPoolSize();
-            long keepAlive = getKeepAliveTime() != null ? getKeepAliveTime() : profile.getKeepAliveTime();
-            int maxQueue = getMaxQueueSize() != null ? getMaxQueueSize() : profile.getMaxQueueSize();
-            TimeUnit tu = getTimeUnit() != null ? getTimeUnit() : profile.getTimeUnit();
-            ThreadPoolRejectedPolicy rejected = getRejectedPolicy() != null ? getRejectedPolicy() : profile.getRejectedPolicy();
-
+            ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
             // create the thread pool using a builder
-            executorService = new ThreadPoolBuilder(routeContext.getCamelContext())
-                    .poolSize(core)
-                    .maxPoolSize(max)
-                    .keepAliveTime(keepAlive, tu)
-                    .maxQueueSize(maxQueue)
-                    .rejectedPolicy(rejected)
-                    .build(this, name);
+            ThreadPoolProfile profile = new ThreadPoolProfileBuilder(name)
+                    .poolSize(getPoolSize())
+                    .maxPoolSize(getMaxPoolSize())
+                    .keepAliveTime(getKeepAliveTime(), getTimeUnit())
+                    .maxQueueSize(getMaxQueueSize())
+                    .rejectedPolicy(getRejectedPolicy())
+                    .build();
+            executorService = manager.newThreadPool(this, name, profile);
         }
 
         ThreadsProcessor thread = new ThreadsProcessor(routeContext.getCamelContext(), executorService);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java?rev=1159342&r1=1159341&r2=1159342&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java Thu Aug 18 18:19:35 2011
@@ -220,6 +220,8 @@ public interface ExecutorServiceManager 
      * @return the created thread pool
      */
     ScheduledExecutorService newSingleThreadScheduledExecutor(Object source, String name);
+    
+    ScheduledExecutorService newScheduledThreadPool(Object source, String name, ThreadPoolProfile profile);
 
     /**
      * Shutdown the given executor service.
@@ -237,4 +239,5 @@ public interface ExecutorServiceManager 
      * @see java.util.concurrent.ExecutorService#shutdownNow()
      */
     List<Runnable> shutdownNow(ExecutorService executorService);
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolFactory.java?rev=1159342&r1=1159341&r2=1159342&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolFactory.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolFactory.java Thu Aug 18 18:19:35 2011
@@ -17,73 +17,41 @@
 package org.apache.camel.spi;
 
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
 
 /**
- * Factory to crate {@link ExecutorService} and {@link ScheduledExecutorService} instances
- * <p/>
- * This interface allows to customize the creation of these objects to adapt Camel
- * for application servers and other environments where thread pools should
- * not be created with the JDK methods, as provided by the {@link org.apache.camel.impl.DefaultThreadPoolFactory}.
- *
- * @see ExecutorServiceManager
+ * Creates ExecutorService and ScheduledExecutorService objects that work with a thread pool for a given ThreadPoolProfile and ThreadFactory.
+ * 
+ * This interface allows to customize the creation of these objects to adapt camel for application servers and other environments where thread pools
+ * should not be created with the jdk methods
  */
 public interface ThreadPoolFactory {
-
     /**
      * Creates a new cached thread pool
      * <p/>
      * The cached thread pool is a term from the JDK from the method {@link java.util.concurrent.Executors#newCachedThreadPool()}.
-     * Implementators of this interface, may create a different kind of pool than the cached, or check the source code
-     * of the JDK to create a pool using the same settings.
+     * Typically it will have no size limit (this is why it is handled separately
      *
      * @param threadFactory factory for creating threads
      * @return the created thread pool
      */
     ExecutorService newCachedThreadPool(ThreadFactory threadFactory);
-
-    /**
-     * Creates a new fixed thread pool
-     * <p/>
-     * The fixed thread pool is a term from the JDK from the method {@link java.util.concurrent.Executors#newFixedThreadPool(int)}.
-     * Implementators of this interface, may create a different kind of pool than the fixed, or check the source code
-     * of the JDK to create a pool using the same settings.
-     *
-     * @param poolSize  the number of threads in the pool
-     * @param threadFactory factory for creating threads
-     * @return the created thread pool
-     */
-    ExecutorService newFixedThreadPool(int poolSize, ThreadFactory threadFactory);
-
+    
     /**
-     * Creates a new scheduled thread pool
-     *
-     * @param corePoolSize  the core pool size
-     * @param threadFactory factory for creating threads
-     * @return the created thread pool
-     * @throws IllegalArgumentException if parameters is not valid
+     * Create a thread pool using the given thread pool profile
+     * 
+     * @param profile
+     * @param threadFactory
+     * @return
      */
-    ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) throws IllegalArgumentException;
-
+    ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory);
+    
     /**
-     * Creates a new thread pool
-     *
-     * @param corePoolSize             the core pool size
-     * @param maxPoolSize              the maximum pool size
-     * @param keepAliveTime            keep alive time
-     * @param timeUnit                 keep alive time unit
-     * @param maxQueueSize             the maximum number of tasks in the queue, use <tt>Integer.MAX_VALUE</tt> or <tt>-1</tt> to indicate unbounded
-     * @param rejectedExecutionHandler the handler for tasks which cannot be executed by the thread pool.
-     *                                 If <tt>null</tt> is provided then {@link java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy CallerRunsPolicy} is used.
-     * @param threadFactory            factory for creating threads
-     * @return the created thread pool
-     * @throws IllegalArgumentException if parameters is not valid
+     * Create a scheduled thread pool using the given thread pool profile
+     * @param profile
+     * @param threadFactory
+     * @return
      */
-    ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit,
-                                  int maxQueueSize, RejectedExecutionHandler rejectedExecutionHandler,
-                                  ThreadFactory threadFactory) throws IllegalArgumentException;
-
-}
+    ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory);
+}
\ No newline at end of file

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java?rev=1159342&r1=1159341&r2=1159342&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java Thu Aug 18 18:19:35 2011
@@ -219,6 +219,49 @@ public class ThreadPoolProfile implement
         this.rejectedPolicy = rejectedPolicy;
     }
 
+    /**
+     * Overwrites each attribute that is null with the attribute from defaultProfile 
+     * 
+     * @param defaultProfile2
+     */
+    public void addDefaults(ThreadPoolProfile defaultProfile2) {
+        if (defaultProfile2 == null) {
+            return;
+        }
+        if (poolSize == null) {
+            poolSize = defaultProfile2.getPoolSize();
+        }
+        if (maxPoolSize == null) {
+            maxPoolSize = defaultProfile2.getMaxPoolSize();
+        }
+        if (keepAliveTime == null) {
+            keepAliveTime = defaultProfile2.getKeepAliveTime();
+        }
+        if (timeUnit == null) {
+            timeUnit = defaultProfile2.getTimeUnit();
+        }
+        if (maxQueueSize == null) {
+            maxQueueSize = defaultProfile2.getMaxQueueSize();
+        }
+        if (rejectedPolicy == null) {
+            rejectedPolicy = defaultProfile2.getRejectedPolicy();
+        }
+    }
+
+    @Override
+    public ThreadPoolProfile clone() {
+        ThreadPoolProfile cloned = new ThreadPoolProfile();
+        cloned.setDefaultProfile(defaultProfile);
+        cloned.setId(id);
+        cloned.setKeepAliveTime(keepAliveTime);
+        cloned.setMaxPoolSize(maxPoolSize);
+        cloned.setMaxQueueSize(maxQueueSize);
+        cloned.setPoolSize(maxPoolSize);
+        cloned.setRejectedPolicy(rejectedPolicy);
+        cloned.setTimeUnit(timeUnit);
+        return cloned;
+    }
+
     @Override
     public String toString() {
         return "ThreadPoolProfile[" + id + " (" + defaultProfile + ") size:" + poolSize + "-" + maxPoolSize

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CustomThreadPoolFactoryTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CustomThreadPoolFactoryTest.java?rev=1159342&r1=1159341&r2=1159342&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CustomThreadPoolFactoryTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CustomThreadPoolFactoryTest.java Thu Aug 18 18:19:35 2011
@@ -18,7 +18,6 @@ package org.apache.camel.impl;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
@@ -55,18 +54,6 @@ public class CustomThreadPoolFactoryTest
         }
 
         @Override
-        public ExecutorService newFixedThreadPool(int poolSize, ThreadFactory threadFactory) {
-            invoked = true;
-            return super.newFixedThreadPool(poolSize, threadFactory);
-        }
-
-        @Override
-        public ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) throws IllegalArgumentException {
-            invoked = true;
-            return super.newScheduledThreadPool(corePoolSize, threadFactory);
-        }
-
-        @Override
         public ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize,
                                              RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory) throws IllegalArgumentException {
             invoked = true;

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java?rev=1159342&r1=1159341&r2=1159342&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java Thu Aug 18 18:19:35 2011
@@ -357,8 +357,8 @@ public class DefaultExecutorServiceManag
 
         ThreadPoolExecutor tp = assertIsInstanceOf(ThreadPoolExecutor.class, pool);
         // a fixed dont use keep alive
-        assertEquals(0, tp.getKeepAliveTime(TimeUnit.SECONDS));
-        assertEquals(5, tp.getMaximumPoolSize());
+        assertEquals("keepAliveTime", 0, tp.getKeepAliveTime(TimeUnit.SECONDS));
+        assertEquals("maximumPoolSize", 5, tp.getMaximumPoolSize());
         assertEquals(5, tp.getCorePoolSize());
         assertFalse(tp.isShutdown());
 
@@ -373,8 +373,8 @@ public class DefaultExecutorServiceManag
 
         ThreadPoolExecutor tp = assertIsInstanceOf(ThreadPoolExecutor.class, pool);
         // a single dont use keep alive
-        assertEquals(0, tp.getKeepAliveTime(TimeUnit.SECONDS));
-        assertEquals(1, tp.getMaximumPoolSize());
+        assertEquals("keepAliveTime", 0, tp.getKeepAliveTime(TimeUnit.SECONDS));
+        assertEquals("maximumPoolSize", 1, tp.getMaximumPoolSize());
         assertEquals(1, tp.getCorePoolSize());
         assertFalse(tp.isShutdown());
 

Modified: camel/trunk/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelThreadPoolFactoryBean.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelThreadPoolFactoryBean.java?rev=1159342&r1=1159341&r2=1159342&view=diff
==============================================================================
--- camel/trunk/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelThreadPoolFactoryBean.java (original)
+++ camel/trunk/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelThreadPoolFactoryBean.java Thu Aug 18 18:19:35 2011
@@ -25,8 +25,9 @@ import javax.xml.bind.annotation.adapter
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.ThreadPoolRejectedPolicy;
-import org.apache.camel.builder.ThreadPoolBuilder;
+import org.apache.camel.builder.ThreadPoolProfileBuilder;
 import org.apache.camel.builder.xml.TimeUnitAdapter;
+import org.apache.camel.spi.ThreadPoolProfile;
 import org.apache.camel.util.CamelContextHelper;
 
 /**
@@ -74,10 +75,14 @@ public abstract class AbstractCamelThrea
             queueSize = CamelContextHelper.parseInteger(getCamelContext(), maxQueueSize);
         }
 
-        ExecutorService answer = new ThreadPoolBuilder(getCamelContext())
-                .poolSize(size).maxPoolSize(max).keepAliveTime(keepAlive, getTimeUnit())
-                .maxQueueSize(queueSize).rejectedPolicy(getRejectedPolicy())
-                .build(getId(), getThreadName());
+        ThreadPoolProfile profile = new ThreadPoolProfileBuilder(getId())
+                .poolSize(size)
+                .maxPoolSize(max)
+                .keepAliveTime(keepAlive, timeUnit)
+                .maxQueueSize(queueSize)
+                .rejectedPolicy(rejectedPolicy)
+                .build();
+        ExecutorService answer = getCamelContext().getExecutorServiceManager().newThreadPool(getId(), getThreadName(), profile);
         return answer;
     }
 

Modified: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/CustomThreadPoolFactoryTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/CustomThreadPoolFactoryTest.java?rev=1159342&r1=1159341&r2=1159342&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/CustomThreadPoolFactoryTest.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/CustomThreadPoolFactoryTest.java Thu Aug 18 18:19:35 2011
@@ -18,7 +18,6 @@ package org.apache.camel.spring.config;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
@@ -57,18 +56,6 @@ public class CustomThreadPoolFactoryTest
         }
 
         @Override
-        public ExecutorService newFixedThreadPool(int poolSize, ThreadFactory threadFactory) {
-            invoked = true;
-            return super.newFixedThreadPool(poolSize, threadFactory);
-        }
-
-        @Override
-        public ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) throws IllegalArgumentException {
-            invoked = true;
-            return super.newScheduledThreadPool(corePoolSize, threadFactory);
-        }
-
-        @Override
         public ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize, 
                                              RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory) throws IllegalArgumentException {
             invoked = true;