You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cs...@apache.org on 2011/08/18 20:19:35 UTC
svn commit: r1159342 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/builder/
camel-core/src/main/java/org/apache/camel/impl/
camel-core/src/main/java/org/apache/camel/model/
camel-core/src/main/java/org/apache/camel/spi/ camel-core/src/te...
Author: cschneider
Date: Thu Aug 18 18:19:35 2011
New Revision: 1159342
URL: http://svn.apache.org/viewvc?rev=1159342&view=rev
Log:
CAMEL-4244 Add ThreadPoolProfileBuilder and change ThreadPoolFactory to use the ThreadPoolProfile
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolProfileBuilder.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultThreadPoolFactory.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolFactory.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CustomThreadPoolFactoryTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
camel/trunk/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelThreadPoolFactoryBean.java
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/CustomThreadPoolFactoryTest.java
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolProfileBuilder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolProfileBuilder.java?rev=1159342&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolProfileBuilder.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolProfileBuilder.java Thu Aug 18 18:19:35 2011
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ThreadPoolRejectedPolicy;
+import org.apache.camel.spi.ThreadPoolProfile;
+
+public class ThreadPoolProfileBuilder {
+ private final ThreadPoolProfile profile;
+
+ public ThreadPoolProfileBuilder(String id) {
+ this.profile = new ThreadPoolProfile(id);
+ }
+
+ public ThreadPoolProfileBuilder(String id, ThreadPoolProfile origProfile) {
+ this.profile = origProfile.clone();
+ this.profile.setId(id);
+ }
+
+ public ThreadPoolProfileBuilder defaultProfile(Boolean defaultProfile) {
+ this.profile.setDefaultProfile(defaultProfile);
+ return this;
+ }
+
+
+ public ThreadPoolProfileBuilder poolSize(Integer poolSize) {
+ profile.setPoolSize(poolSize);
+ return this;
+ }
+
+ public ThreadPoolProfileBuilder maxPoolSize(Integer maxPoolSize) {
+ profile.setMaxPoolSize(maxPoolSize);
+ return this;
+ }
+
+ public ThreadPoolProfileBuilder keepAliveTime(Long keepAliveTime, TimeUnit timeUnit) {
+ profile.setKeepAliveTime(keepAliveTime);
+ profile.setTimeUnit(timeUnit);
+ return this;
+ }
+
+ public ThreadPoolProfileBuilder keepAliveTime(Long keepAliveTime) {
+ profile.setKeepAliveTime(keepAliveTime);
+ return this;
+ }
+
+ public ThreadPoolProfileBuilder maxQueueSize(Integer maxQueueSize) {
+ if (maxQueueSize != null) {
+ profile.setMaxQueueSize(maxQueueSize);
+ }
+ return this;
+ }
+
+ public ThreadPoolProfileBuilder rejectedPolicy(ThreadPoolRejectedPolicy rejectedPolicy) {
+ profile.setRejectedPolicy(rejectedPolicy);
+ return this;
+ }
+
+ /**
+ * Builds the new thread pool
+ *
+ * @return the created thread pool
+ * @throws Exception is thrown if error building the thread pool
+ */
+ public ThreadPoolProfile build() {
+ return profile;
+ }
+
+
+}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java?rev=1159342&r1=1159341&r2=1159342&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java Thu Aug 18 18:19:35 2011
@@ -22,7 +22,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@@ -30,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
import org.apache.camel.ThreadPoolRejectedPolicy;
+import org.apache.camel.builder.ThreadPoolProfileBuilder;
import org.apache.camel.model.OptionalIdentifiedDefinition;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.ProcessorDefinitionHelper;
@@ -56,21 +56,20 @@ public class DefaultExecutorServiceManag
private String threadNamePattern;
private String defaultThreadPoolProfileId = "defaultThreadPoolProfile";
private final Map<String, ThreadPoolProfile> threadPoolProfiles = new HashMap<String, ThreadPoolProfile>();
+ private ThreadPoolProfile builtIndefaultProfile;
public DefaultExecutorServiceManager(CamelContext camelContext) {
this.camelContext = camelContext;
- // create and register the default profile
- ThreadPoolProfile defaultProfile = new ThreadPoolProfile(defaultThreadPoolProfileId);
- // the default profile has the following values
- defaultProfile.setDefaultProfile(true);
- defaultProfile.setPoolSize(10);
- defaultProfile.setMaxPoolSize(20);
- defaultProfile.setKeepAliveTime(60L);
- defaultProfile.setTimeUnit(TimeUnit.SECONDS);
- defaultProfile.setMaxQueueSize(1000);
- defaultProfile.setRejectedPolicy(ThreadPoolRejectedPolicy.CallerRuns);
- registerThreadPoolProfile(defaultProfile);
+ builtIndefaultProfile = new ThreadPoolProfileBuilder(defaultThreadPoolProfileId)
+ .defaultProfile(true)
+ .poolSize(10)
+ .maxPoolSize(20)
+ .keepAliveTime(60L, TimeUnit.SECONDS)
+ .maxQueueSize(1000)
+ .rejectedPolicy(ThreadPoolRejectedPolicy.CallerRuns)
+ .build();
+ registerThreadPoolProfile(builtIndefaultProfile);
}
@Override
@@ -102,46 +101,12 @@ public class DefaultExecutorServiceManag
@Override
public void setDefaultThreadPoolProfile(ThreadPoolProfile defaultThreadPoolProfile) {
- ThreadPoolProfile oldProfile = threadPoolProfiles.remove(defaultThreadPoolProfileId);
- if (oldProfile != null) {
- // the old is no longer default
- oldProfile.setDefaultProfile(false);
-
- // fallback and use old default values for new default profile if absent (convention over configuration)
- if (defaultThreadPoolProfile.getKeepAliveTime() == null) {
- defaultThreadPoolProfile.setKeepAliveTime(oldProfile.getKeepAliveTime());
- }
- if (defaultThreadPoolProfile.getMaxPoolSize() == null) {
- defaultThreadPoolProfile.setMaxPoolSize(oldProfile.getMaxPoolSize());
- }
- if (defaultThreadPoolProfile.getRejectedPolicy() == null) {
- defaultThreadPoolProfile.setRejectedPolicy(oldProfile.getRejectedPolicy());
- }
- if (defaultThreadPoolProfile.getMaxQueueSize() == null) {
- defaultThreadPoolProfile.setMaxQueueSize(oldProfile.getMaxQueueSize());
- }
- if (defaultThreadPoolProfile.getPoolSize() == null) {
- defaultThreadPoolProfile.setPoolSize(oldProfile.getPoolSize());
- }
- if (defaultThreadPoolProfile.getTimeUnit() == null) {
- defaultThreadPoolProfile.setTimeUnit(oldProfile.getTimeUnit());
- }
- }
-
- // validate that all options has been given as its mandatory for a default thread pool profile
- // as it is used as fallback for other profiles if they do not have that particular value
- ObjectHelper.notEmpty(defaultThreadPoolProfile.getId(), "id", defaultThreadPoolProfile);
- ObjectHelper.notNull(defaultThreadPoolProfile.getKeepAliveTime(), "keepAliveTime", defaultThreadPoolProfile);
- ObjectHelper.notNull(defaultThreadPoolProfile.getMaxPoolSize(), "maxPoolSize", defaultThreadPoolProfile);
- ObjectHelper.notNull(defaultThreadPoolProfile.getMaxQueueSize(), "maxQueueSize", defaultThreadPoolProfile);
- ObjectHelper.notNull(defaultThreadPoolProfile.getPoolSize(), "poolSize", defaultThreadPoolProfile);
- ObjectHelper.notNull(defaultThreadPoolProfile.getTimeUnit(), "timeUnit", defaultThreadPoolProfile);
+ threadPoolProfiles.remove(defaultThreadPoolProfileId);
+ defaultThreadPoolProfile.addDefaults(builtIndefaultProfile);
LOG.info("Using custom DefaultThreadPoolProfile: " + defaultThreadPoolProfile);
- // and replace with the new default profile
this.defaultThreadPoolProfileId = defaultThreadPoolProfile.getId();
- // and mark the new profile as default
defaultThreadPoolProfile.setDefaultProfile(true);
registerThreadPoolProfile(defaultThreadPoolProfile);
}
@@ -170,12 +135,7 @@ public class DefaultExecutorServiceManag
@Override
public ScheduledExecutorService newDefaultScheduledThreadPool(Object source, String name) {
- ThreadPoolProfile defaultProfile = getDefaultThreadPoolProfile();
-
- ThreadFactory threadFactory = createThreadFactory(name, true);
- ScheduledExecutorService executorService = threadPoolFactory.newScheduledThreadPool(defaultProfile.getPoolSize(), threadFactory);
- onThreadPoolCreated(executorService, source, null);
- return executorService;
+ return newScheduledThreadPool(source, name, getDefaultThreadPoolProfile());
}
@Override
@@ -194,35 +154,22 @@ public class DefaultExecutorServiceManag
ObjectHelper.notNull(profile, "ThreadPoolProfile");
ThreadPoolProfile defaultProfile = getDefaultThreadPoolProfile();
- // fallback to use values from default profile if not specified
- Integer poolSize = profile.getPoolSize() != null ? profile.getPoolSize() : defaultProfile.getPoolSize();
- Integer maxPoolSize = profile.getMaxPoolSize() != null ? profile.getMaxPoolSize() : defaultProfile.getMaxPoolSize();
- Long keepAliveTime = profile.getKeepAliveTime() != null ? profile.getKeepAliveTime() : defaultProfile.getKeepAliveTime();
- TimeUnit timeUnit = profile.getTimeUnit() != null ? profile.getTimeUnit() : defaultProfile.getTimeUnit();
- Integer maxQueueSize = profile.getMaxQueueSize() != null ? profile.getMaxQueueSize() : defaultProfile.getMaxQueueSize();
- RejectedExecutionHandler handler = profile.getRejectedExecutionHandler() != null ? profile.getRejectedExecutionHandler() : defaultProfile.getRejectedExecutionHandler();
+ profile.addDefaults(defaultProfile);
ThreadFactory threadFactory = createThreadFactory(name, true);
- ExecutorService executorService = threadPoolFactory.newThreadPool(poolSize, maxPoolSize,
- keepAliveTime, timeUnit, maxQueueSize, handler, threadFactory);
+ ExecutorService executorService = threadPoolFactory.newThreadPool(profile, threadFactory);
onThreadPoolCreated(executorService, source, profile.getId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created new ThreadPool for source: {} with name: {}. -> {}", new Object[]{source, name, executorService});
+ }
+
return executorService;
}
@Override
public ExecutorService newThreadPool(Object source, String name, int poolSize, int maxPoolSize) {
- ThreadPoolProfile defaultProfile = getDefaultThreadPoolProfile();
-
- // fallback to use values from default profile
- ExecutorService answer = threadPoolFactory.newThreadPool(poolSize, maxPoolSize,
- defaultProfile.getKeepAliveTime(), defaultProfile.getTimeUnit(), defaultProfile.getMaxQueueSize(),
- defaultProfile.getRejectedExecutionHandler(), new CamelThreadFactory(threadNamePattern, name, true));
- onThreadPoolCreated(answer, source, null);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Created new ThreadPool for source: {} with name: {}. -> {}", new Object[]{source, name, answer});
- }
- return answer;
+ ThreadPoolProfile profile = new ThreadPoolProfileBuilder(name).poolSize(poolSize).maxPoolSize(maxPoolSize).build();
+ return newThreadPool(source, name, profile);
}
@Override
@@ -232,7 +179,7 @@ public class DefaultExecutorServiceManag
@Override
public ExecutorService newCachedThreadPool(Object source, String name) {
- ExecutorService answer = threadPoolFactory.newCachedThreadPool(new CamelThreadFactory(threadNamePattern, name, true));
+ ExecutorService answer = threadPoolFactory.newCachedThreadPool(createThreadFactory(name, true));
onThreadPoolCreated(answer, source, null);
if (LOG.isDebugEnabled()) {
@@ -243,29 +190,32 @@ public class DefaultExecutorServiceManag
@Override
public ExecutorService newFixedThreadPool(Object source, String name, int poolSize) {
- ExecutorService answer = threadPoolFactory.newFixedThreadPool(poolSize, new CamelThreadFactory(threadNamePattern, name, true));
- onThreadPoolCreated(answer, source, null);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Created new FixedThreadPool for source: {} with name: {}. -> {}", new Object[]{source, name, answer});
- }
- return answer;
+ ThreadPoolProfile profile = new ThreadPoolProfileBuilder(name).poolSize(poolSize).maxPoolSize(poolSize).keepAliveTime(0L).build();
+ return newThreadPool(source, name, profile);
}
@Override
public ScheduledExecutorService newSingleThreadScheduledExecutor(Object source, String name) {
return newScheduledThreadPool(source, name, 1);
}
-
+
@Override
- public ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize) {
- ScheduledExecutorService answer = threadPoolFactory.newScheduledThreadPool(poolSize, new CamelThreadFactory(threadNamePattern, name, true));
+ public ScheduledExecutorService newScheduledThreadPool(Object source, String name, ThreadPoolProfile profile) {
+ profile.addDefaults(getDefaultThreadPoolProfile());
+ ScheduledExecutorService answer = threadPoolFactory.newScheduledThreadPool(profile, createThreadFactory(name, true));
onThreadPoolCreated(answer, source, null);
if (LOG.isDebugEnabled()) {
LOG.debug("Created new ScheduledThreadPool for source: {} with name: {}. -> {}", new Object[]{source, name, answer});
}
return answer;
+
+ }
+
+ @Override
+ public ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize) {
+ ThreadPoolProfile profile = new ThreadPoolProfileBuilder(name).poolSize(poolSize).build();
+ return newScheduledThreadPool(source, name, profile);
}
@Override
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultThreadPoolFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultThreadPoolFactory.java?rev=1159342&r1=1159341&r2=1159342&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultThreadPoolFactory.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultThreadPoolFactory.java Thu Aug 18 18:19:35 2011
@@ -28,6 +28,7 @@ import java.util.concurrent.ThreadPoolEx
import java.util.concurrent.TimeUnit;
import org.apache.camel.spi.ThreadPoolFactory;
+import org.apache.camel.spi.ThreadPoolProfile;
/**
* Factory for thread pools that uses the JDK {@link Executors} for creating the thread pools.
@@ -37,13 +38,16 @@ public class DefaultThreadPoolFactory im
public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return Executors.newCachedThreadPool(threadFactory);
}
-
- public ExecutorService newFixedThreadPool(int poolSize, ThreadFactory threadFactory) {
- return Executors.newFixedThreadPool(poolSize, threadFactory);
- }
-
- public ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) throws IllegalArgumentException {
- return Executors.newScheduledThreadPool(corePoolSize, threadFactory);
+
+ @Override
+ public ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory factory) {
+ return newThreadPool(profile.getPoolSize(),
+ profile.getMaxPoolSize(),
+ profile.getKeepAliveTime(),
+ profile.getTimeUnit(),
+ profile.getMaxQueueSize(),
+ profile.getRejectedExecutionHandler(),
+ factory);
}
public ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit,
@@ -84,5 +88,13 @@ public class DefaultThreadPoolFactory im
answer.setRejectedExecutionHandler(rejectedExecutionHandler);
return answer;
}
+
+ /* (non-Javadoc)
+ * @see org.apache.camel.impl.ThreadPoolFactory#newScheduledThreadPool(java.lang.Integer, java.util.concurrent.ThreadFactory)
+ */
+ @Override
+ public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
+ return Executors.newScheduledThreadPool(profile.getPoolSize(), threadFactory);
+ }
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=1159342&r1=1159341&r2=1159342&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java Thu Aug 18 18:19:35 2011
@@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
@@ -29,10 +30,11 @@ import javax.xml.bind.annotation.adapter
import org.apache.camel.Processor;
import org.apache.camel.ThreadPoolRejectedPolicy;
-import org.apache.camel.builder.ThreadPoolBuilder;
+import org.apache.camel.builder.ThreadPoolProfileBuilder;
import org.apache.camel.builder.xml.TimeUnitAdapter;
import org.apache.camel.processor.Pipeline;
import org.apache.camel.processor.ThreadsProcessor;
+import org.apache.camel.spi.ExecutorServiceManager;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.spi.ThreadPoolProfile;
@@ -81,24 +83,16 @@ public class ThreadsDefinition extends O
executorService = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, name, this);
// if no explicit then create from the options
if (executorService == null) {
- ThreadPoolProfile profile = routeContext.getCamelContext().getExecutorServiceManager().getDefaultThreadPoolProfile();
- // use the default thread pool profile as base and then override with values
- // use a custom pool based on the settings
- int core = getPoolSize() != null ? getPoolSize() : profile.getPoolSize();
- int max = getMaxPoolSize() != null ? getMaxPoolSize() : profile.getMaxPoolSize();
- long keepAlive = getKeepAliveTime() != null ? getKeepAliveTime() : profile.getKeepAliveTime();
- int maxQueue = getMaxQueueSize() != null ? getMaxQueueSize() : profile.getMaxQueueSize();
- TimeUnit tu = getTimeUnit() != null ? getTimeUnit() : profile.getTimeUnit();
- ThreadPoolRejectedPolicy rejected = getRejectedPolicy() != null ? getRejectedPolicy() : profile.getRejectedPolicy();
-
+ ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
// create the thread pool using a builder
- executorService = new ThreadPoolBuilder(routeContext.getCamelContext())
- .poolSize(core)
- .maxPoolSize(max)
- .keepAliveTime(keepAlive, tu)
- .maxQueueSize(maxQueue)
- .rejectedPolicy(rejected)
- .build(this, name);
+ ThreadPoolProfile profile = new ThreadPoolProfileBuilder(name)
+ .poolSize(getPoolSize())
+ .maxPoolSize(getMaxPoolSize())
+ .keepAliveTime(getKeepAliveTime(), getTimeUnit())
+ .maxQueueSize(getMaxQueueSize())
+ .rejectedPolicy(getRejectedPolicy())
+ .build();
+ executorService = manager.newThreadPool(this, name, profile);
}
ThreadsProcessor thread = new ThreadsProcessor(routeContext.getCamelContext(), executorService);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java?rev=1159342&r1=1159341&r2=1159342&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java Thu Aug 18 18:19:35 2011
@@ -220,6 +220,8 @@ public interface ExecutorServiceManager
* @return the created thread pool
*/
ScheduledExecutorService newSingleThreadScheduledExecutor(Object source, String name);
+
+ ScheduledExecutorService newScheduledThreadPool(Object source, String name, ThreadPoolProfile profile);
/**
* Shutdown the given executor service.
@@ -237,4 +239,5 @@ public interface ExecutorServiceManager
* @see java.util.concurrent.ExecutorService#shutdownNow()
*/
List<Runnable> shutdownNow(ExecutorService executorService);
+
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolFactory.java?rev=1159342&r1=1159341&r2=1159342&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolFactory.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolFactory.java Thu Aug 18 18:19:35 2011
@@ -17,73 +17,41 @@
package org.apache.camel.spi;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
/**
- * Factory to crate {@link ExecutorService} and {@link ScheduledExecutorService} instances
- * <p/>
- * This interface allows to customize the creation of these objects to adapt Camel
- * for application servers and other environments where thread pools should
- * not be created with the JDK methods, as provided by the {@link org.apache.camel.impl.DefaultThreadPoolFactory}.
- *
- * @see ExecutorServiceManager
+ * Creates ExecutorService and ScheduledExecutorService objects that work with a thread pool for a given ThreadPoolProfile and ThreadFactory.
+ *
+ * This interface allows to customize the creation of these objects to adapt camel for application servers and other environments where thread pools
+ * should not be created with the jdk methods
*/
public interface ThreadPoolFactory {
-
/**
* Creates a new cached thread pool
* <p/>
* The cached thread pool is a term from the JDK from the method {@link java.util.concurrent.Executors#newCachedThreadPool()}.
- * Implementators of this interface, may create a different kind of pool than the cached, or check the source code
- * of the JDK to create a pool using the same settings.
+ * Typically it will have no size limit (this is why it is handled separately
*
* @param threadFactory factory for creating threads
* @return the created thread pool
*/
ExecutorService newCachedThreadPool(ThreadFactory threadFactory);
-
- /**
- * Creates a new fixed thread pool
- * <p/>
- * The fixed thread pool is a term from the JDK from the method {@link java.util.concurrent.Executors#newFixedThreadPool(int)}.
- * Implementators of this interface, may create a different kind of pool than the fixed, or check the source code
- * of the JDK to create a pool using the same settings.
- *
- * @param poolSize the number of threads in the pool
- * @param threadFactory factory for creating threads
- * @return the created thread pool
- */
- ExecutorService newFixedThreadPool(int poolSize, ThreadFactory threadFactory);
-
+
/**
- * Creates a new scheduled thread pool
- *
- * @param corePoolSize the core pool size
- * @param threadFactory factory for creating threads
- * @return the created thread pool
- * @throws IllegalArgumentException if parameters is not valid
+ * Create a thread pool using the given thread pool profile
+ *
+ * @param profile
+ * @param threadFactory
+ * @return
*/
- ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) throws IllegalArgumentException;
-
+ ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory);
+
/**
- * Creates a new thread pool
- *
- * @param corePoolSize the core pool size
- * @param maxPoolSize the maximum pool size
- * @param keepAliveTime keep alive time
- * @param timeUnit keep alive time unit
- * @param maxQueueSize the maximum number of tasks in the queue, use <tt>Integer.MAX_VALUE</tt> or <tt>-1</tt> to indicate unbounded
- * @param rejectedExecutionHandler the handler for tasks which cannot be executed by the thread pool.
- * If <tt>null</tt> is provided then {@link java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy CallerRunsPolicy} is used.
- * @param threadFactory factory for creating threads
- * @return the created thread pool
- * @throws IllegalArgumentException if parameters is not valid
+ * Create a scheduled thread pool using the given thread pool profile
+ * @param profile
+ * @param threadFactory
+ * @return
*/
- ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit,
- int maxQueueSize, RejectedExecutionHandler rejectedExecutionHandler,
- ThreadFactory threadFactory) throws IllegalArgumentException;
-
-}
+ ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory);
+}
\ No newline at end of file
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java?rev=1159342&r1=1159341&r2=1159342&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java Thu Aug 18 18:19:35 2011
@@ -219,6 +219,49 @@ public class ThreadPoolProfile implement
this.rejectedPolicy = rejectedPolicy;
}
+ /**
+ * Overwrites each attribute that is null with the attribute from defaultProfile
+ *
+ * @param defaultProfile2
+ */
+ public void addDefaults(ThreadPoolProfile defaultProfile2) {
+ if (defaultProfile2 == null) {
+ return;
+ }
+ if (poolSize == null) {
+ poolSize = defaultProfile2.getPoolSize();
+ }
+ if (maxPoolSize == null) {
+ maxPoolSize = defaultProfile2.getMaxPoolSize();
+ }
+ if (keepAliveTime == null) {
+ keepAliveTime = defaultProfile2.getKeepAliveTime();
+ }
+ if (timeUnit == null) {
+ timeUnit = defaultProfile2.getTimeUnit();
+ }
+ if (maxQueueSize == null) {
+ maxQueueSize = defaultProfile2.getMaxQueueSize();
+ }
+ if (rejectedPolicy == null) {
+ rejectedPolicy = defaultProfile2.getRejectedPolicy();
+ }
+ }
+
+ @Override
+ public ThreadPoolProfile clone() {
+ ThreadPoolProfile cloned = new ThreadPoolProfile();
+ cloned.setDefaultProfile(defaultProfile);
+ cloned.setId(id);
+ cloned.setKeepAliveTime(keepAliveTime);
+ cloned.setMaxPoolSize(maxPoolSize);
+ cloned.setMaxQueueSize(maxQueueSize);
+ cloned.setPoolSize(maxPoolSize);
+ cloned.setRejectedPolicy(rejectedPolicy);
+ cloned.setTimeUnit(timeUnit);
+ return cloned;
+ }
+
@Override
public String toString() {
return "ThreadPoolProfile[" + id + " (" + defaultProfile + ") size:" + poolSize + "-" + maxPoolSize
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CustomThreadPoolFactoryTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CustomThreadPoolFactoryTest.java?rev=1159342&r1=1159341&r2=1159342&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CustomThreadPoolFactoryTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CustomThreadPoolFactoryTest.java Thu Aug 18 18:19:35 2011
@@ -18,7 +18,6 @@ package org.apache.camel.impl;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@@ -55,18 +54,6 @@ public class CustomThreadPoolFactoryTest
}
@Override
- public ExecutorService newFixedThreadPool(int poolSize, ThreadFactory threadFactory) {
- invoked = true;
- return super.newFixedThreadPool(poolSize, threadFactory);
- }
-
- @Override
- public ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) throws IllegalArgumentException {
- invoked = true;
- return super.newScheduledThreadPool(corePoolSize, threadFactory);
- }
-
- @Override
public ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize,
RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory) throws IllegalArgumentException {
invoked = true;
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java?rev=1159342&r1=1159341&r2=1159342&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java Thu Aug 18 18:19:35 2011
@@ -357,8 +357,8 @@ public class DefaultExecutorServiceManag
ThreadPoolExecutor tp = assertIsInstanceOf(ThreadPoolExecutor.class, pool);
// a fixed dont use keep alive
- assertEquals(0, tp.getKeepAliveTime(TimeUnit.SECONDS));
- assertEquals(5, tp.getMaximumPoolSize());
+ assertEquals("keepAliveTime", 0, tp.getKeepAliveTime(TimeUnit.SECONDS));
+ assertEquals("maximumPoolSize", 5, tp.getMaximumPoolSize());
assertEquals(5, tp.getCorePoolSize());
assertFalse(tp.isShutdown());
@@ -373,8 +373,8 @@ public class DefaultExecutorServiceManag
ThreadPoolExecutor tp = assertIsInstanceOf(ThreadPoolExecutor.class, pool);
// a single dont use keep alive
- assertEquals(0, tp.getKeepAliveTime(TimeUnit.SECONDS));
- assertEquals(1, tp.getMaximumPoolSize());
+ assertEquals("keepAliveTime", 0, tp.getKeepAliveTime(TimeUnit.SECONDS));
+ assertEquals("maximumPoolSize", 1, tp.getMaximumPoolSize());
assertEquals(1, tp.getCorePoolSize());
assertFalse(tp.isShutdown());
Modified: camel/trunk/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelThreadPoolFactoryBean.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelThreadPoolFactoryBean.java?rev=1159342&r1=1159341&r2=1159342&view=diff
==============================================================================
--- camel/trunk/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelThreadPoolFactoryBean.java (original)
+++ camel/trunk/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelThreadPoolFactoryBean.java Thu Aug 18 18:19:35 2011
@@ -25,8 +25,9 @@ import javax.xml.bind.annotation.adapter
import org.apache.camel.CamelContext;
import org.apache.camel.ThreadPoolRejectedPolicy;
-import org.apache.camel.builder.ThreadPoolBuilder;
+import org.apache.camel.builder.ThreadPoolProfileBuilder;
import org.apache.camel.builder.xml.TimeUnitAdapter;
+import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.util.CamelContextHelper;
/**
@@ -74,10 +75,14 @@ public abstract class AbstractCamelThrea
queueSize = CamelContextHelper.parseInteger(getCamelContext(), maxQueueSize);
}
- ExecutorService answer = new ThreadPoolBuilder(getCamelContext())
- .poolSize(size).maxPoolSize(max).keepAliveTime(keepAlive, getTimeUnit())
- .maxQueueSize(queueSize).rejectedPolicy(getRejectedPolicy())
- .build(getId(), getThreadName());
+ ThreadPoolProfile profile = new ThreadPoolProfileBuilder(getId())
+ .poolSize(size)
+ .maxPoolSize(max)
+ .keepAliveTime(keepAlive, timeUnit)
+ .maxQueueSize(queueSize)
+ .rejectedPolicy(rejectedPolicy)
+ .build();
+ ExecutorService answer = getCamelContext().getExecutorServiceManager().newThreadPool(getId(), getThreadName(), profile);
return answer;
}
Modified: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/CustomThreadPoolFactoryTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/CustomThreadPoolFactoryTest.java?rev=1159342&r1=1159341&r2=1159342&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/CustomThreadPoolFactoryTest.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/CustomThreadPoolFactoryTest.java Thu Aug 18 18:19:35 2011
@@ -18,7 +18,6 @@ package org.apache.camel.spring.config;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@@ -57,18 +56,6 @@ public class CustomThreadPoolFactoryTest
}
@Override
- public ExecutorService newFixedThreadPool(int poolSize, ThreadFactory threadFactory) {
- invoked = true;
- return super.newFixedThreadPool(poolSize, threadFactory);
- }
-
- @Override
- public ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) throws IllegalArgumentException {
- invoked = true;
- return super.newScheduledThreadPool(corePoolSize, threadFactory);
- }
-
- @Override
public ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize,
RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory) throws IllegalArgumentException {
invoked = true;