You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/08/12 17:32:52 UTC
svn commit: r1157157 [2/3] - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/builder/
camel-core/src/main/java/org/apache/camel/component/bean/
camel-core/src/main/java/org/apache/camel/component/dataset/
camel-core/src/main/java/org/apache/...
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1157157&r1=1157156&r2=1157157&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Fri Aug 12 15:32:50 2011
@@ -812,7 +812,7 @@ public class AggregateProcessor extends
}
// create a background recover thread to check every interval
- recoverService = camelContext.getExecutorServiceManager().getScheduledExecutorService("AggregateRecoverChecker", this);
+ recoverService = camelContext.getExecutorServiceManager().newScheduledThreadPool(this, "AggregateRecoverChecker", 1);
Runnable recoverTask = new RecoverTask(recoverable);
LOG.info("Using RecoverableAggregationRepository by scheduling recover checker to run every " + interval + " millis.");
// use fixed delay so there is X interval between each run
@@ -842,7 +842,7 @@ public class AggregateProcessor extends
}
if (getCompletionInterval() > 0) {
LOG.info("Using CompletionInterval to run every " + getCompletionInterval() + " millis.");
- ScheduledExecutorService scheduler = camelContext.getExecutorServiceManager().getScheduledExecutorService("AggregateTimeoutChecker", this);
+ ScheduledExecutorService scheduler = camelContext.getExecutorServiceManager().newScheduledThreadPool(this, "AggregateTimeoutChecker", 1);
// trigger completion based on interval
scheduler.scheduleAtFixedRate(new AggregationIntervalTask(), 1000L, getCompletionInterval(), TimeUnit.MILLISECONDS);
}
@@ -850,7 +850,7 @@ public class AggregateProcessor extends
// start timeout service if its in use
if (getCompletionTimeout() > 0 || getCompletionTimeoutExpression() != null) {
LOG.info("Using CompletionTimeout to trigger after " + getCompletionTimeout() + " millis of inactivity.");
- ScheduledExecutorService scheduler = camelContext.getExecutorServiceManager().getScheduledExecutorService("AggregateTimeoutChecker", this);
+ ScheduledExecutorService scheduler = camelContext.getExecutorServiceManager().newScheduledThreadPool(this, "AggregateTimeoutChecker", 1);
// check for timed out aggregated messages once every second
timeoutMap = new AggregationTimeoutMap(scheduler, 1000L);
// fill in existing timeout values from the aggregation repository, for example if a restart occurred, then we
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java?rev=1157157&r1=1157156&r2=1157157&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java Fri Aug 12 15:32:50 2011
@@ -25,23 +25,56 @@ import org.apache.camel.ShutdownableServ
/**
* Strategy to create thread pools.
* <p/>
- * This strategy is pluggable so you can plugin a custom provider, for example if you want to leverage
- * the WorkManager for a J2EE server.
+ * This manager is pluggable so you can plugin a custom provider, for example if you want to leverage
+ * the WorkManager for a JEE server.
* <p/>
- * This strategy has fine grained methods for creating various thread pools, however custom strategies
- * do not have to exactly create those kind of pools. Feel free to return a shared or different kind of pool.
+ * You may want to just implement a custom {@link ThreadPoolFactory} and rely on the
+ * {@link org.apache.camel.impl.DefaultExecutorServiceManager}, if that is sufficient. The {@link ThreadPoolFactory}
+ * is always used for creating the actual thread pools. You can implement a custom {@link ThreadPoolFactory}
+ * to leverage the WorkManager for a JEE server.
+ * <p/>
+ * The {@link ThreadPoolFactory} has pure JDK API, where as this {@link ExecutorServiceManager} has Camel API
+ * concepts such as {@link ThreadPoolProfile}. Therefore it may be easier to only implement a custom
+ * {@link ThreadPoolFactory}.
* <p/>
- * However there are two types of pools: regular and scheduled.
+ * This manager has fine grained methods for creating various thread pools, however custom strategies
+ * do not have to exactly create those kind of pools. Feel free to return a shared or different kind of pool.
* <p/>
* If you use the <tt>newXXX</tt> methods to create thread pools, then Camel will by default take care of
* shutting down those created pools when {@link org.apache.camel.CamelContext} is shutting down.
- *
- * @version
+ * <p/>
+ * @see ThreadPoolFactory
*/
public interface ExecutorServiceManager extends ShutdownableService {
-
+
+ /**
+ * Gets the {@link ThreadPoolFactory} to use for creating the thread pools.
+ *
+ * @return the thread pool factory
+ */
+ ThreadPoolFactory getThreadPoolFactory();
+
+ /**
+ * Sets a custom {@link ThreadPoolFactory} to use
+ *
+ * @param threadPoolFactory the thread pool factory
+ */
+ void setThreadPoolFactory(ThreadPoolFactory threadPoolFactory);
+
+ /**
+ * Creates a full thread name
+ *
+ * @param name name which is appended to the full thread name
+ * @return the full thread name
+ */
String resolveThreadName(String name);
+ /**
+ * Gets the thread pool profile by the given id
+ *
+ * @param id id of the thread pool profile to get
+ * @return the found profile, or <tt>null</tt> if not found
+ */
ThreadPoolProfile getThreadPoolProfile(String id);
/**
@@ -50,13 +83,19 @@ public interface ExecutorServiceManager
* @param profile the profile
*/
void registerThreadPoolProfile(ThreadPoolProfile profile);
+
/**
* Sets the default thread pool profile
*
* @param defaultThreadPoolProfile the new default thread pool profile
*/
void setDefaultThreadPoolProfile(ThreadPoolProfile defaultThreadPoolProfile);
-
+
+ /**
+ * Gets the default thread pool profile
+ *
+ * @return the default profile which are newer <tt>null</tt>
+ */
ThreadPoolProfile getDefaultThreadPoolProfile();
/**
@@ -69,27 +108,140 @@ public interface ExecutorServiceManager
* <br/>and <tt>${name}</tt> is the regular thread name.
* <br/>You can also use <tt>${longName}</tt> is the long thread name which can includes endpoint parameters etc.
*
- * @param pattern the pattern
+ * @param pattern the pattern
* @throws IllegalArgumentException if the pattern is invalid.
*/
void setThreadNamePattern(String pattern) throws IllegalArgumentException;
-
+
+ /**
+ * Gets the thread name patter to use
+ *
+ * @return the pattern
+ */
+ String getThreadNamePattern();
+
+ /**
+ * Lookup a {@link java.util.concurrent.ExecutorService} from the {@link org.apache.camel.spi.Registry}
+ * and from known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.
+ *
+ * @param source the source object, usually it should be <tt>this</tt> passed in as parameter
+ * @param name name which is appended to the thread name
+ * @param executorServiceRef reference to lookup
+ * @return the {@link java.util.concurrent.ExecutorService} or <tt>null</tt> if not found
+ */
+ ExecutorService lookup(Object source, String name, String executorServiceRef);
+
+ /**
+ * Lookup a {@link java.util.concurrent.ScheduledExecutorService} from the {@link org.apache.camel.spi.Registry}
+ * and from known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.
+ *
+ * @param source the source object, usually it should be <tt>this</tt> passed in as parameter
+ * @param name name which is appended to the thread name
+ * @param executorServiceRef reference to lookup
+ * @return the {@link java.util.concurrent.ScheduledExecutorService} or <tt>null</tt> if not found
+ */
+ ScheduledExecutorService lookupScheduled(Object source, String name, String executorServiceRef);
+
/**
- * Creates an executorservice with a default thread pool
- *
- * @param ref
- * @param source
- * @return
- */
- ExecutorService getDefaultExecutorService(String ref, Object source);
-
- ExecutorService getExecutorService(ThreadPoolProfile profile, Object source);
-
- ExecutorService createExecutorService(ThreadPoolProfile profile, Object source);
-
- ScheduledExecutorService getScheduledExecutorService(String ref, Object source);
-
- ScheduledExecutorService getScheduledExecutorService(ThreadPoolProfile profile, Object source);
+ * Creates a new thread pool using the default thread pool profile.
+ *
+ * @param source the source object, usually it should be <tt>this</tt> passed in as parameter
+ * @param name name which is appended to the thread name
+ * @return the created thread pool
+ */
+ ExecutorService newDefaultThreadPool(Object source, String name);
+
+ /**
+ * Creates a new scheduled thread pool using the default thread pool profile.
+ *
+ * @param source the source object, usually it should be <tt>this</tt> passed in as parameter
+ * @param name name which is appended to the thread name
+ * @return the created thread pool
+ */
+ ScheduledExecutorService newDefaultScheduledThreadPool(Object source, String name);
+
+ /**
+ * Creates a new thread pool using the given profile
+ *
+ * @param source the source object, usually it should be <tt>this</tt> passed in as parameter
+ * @param name name which is appended to the thread name
+ * @param profile the profile with the thread pool settings to use
+ * @return the created thread pool
+ */
+ ExecutorService newThreadPool(Object source, String name, ThreadPoolProfile profile);
+
+ /**
+ * Creates a new thread pool using using the given profile id
+ *
+ * @param source the source object, usually it should be <tt>this</tt> passed in as parameter
+ * @param name name which is appended to the thread name
+ * @param profileId the id of the profile with the thread pool settings to use
+ * @return the created thread pool, or <tt>null</tt> if the thread pool profile could not be found
+ */
+ ExecutorService newThreadPool(Object source, String name, String profileId);
+
+ /**
+ * Creates a new thread pool.
+ * <p/>
+ * Will fallback and use values from the default thread pool profile for keep alive time, rejection policy
+ * and other parameters which cannot be specified.
+ *
+ * @param source the source object, usually it should be <tt>this</tt> passed in as parameter
+ * @param name name which is appended to the thread name
+ * @param poolSize the core pool size
+ * @param maxPoolSize the maximum pool size
+ * @return the created thread pool
+ */
+ ExecutorService newThreadPool(Object source, String name, int poolSize, int maxPoolSize);
+
+ /**
+ * Creates a new single-threaded thread pool. This is often used for background threads.
+ *
+ * @param source the source object, usually it should be <tt>this</tt> passed in as parameter
+ * @param name name which is appended to the thread name
+ * @return the created thread pool
+ */
+ ExecutorService newSingleThreadExecutor(Object source, String name);
+
+ /**
+ * Creates a new cached thread pool.
+ * <p/>
+ * <b>Important:</b> Using cached thread pool is discouraged as they have no upper bound and can overload the JVM.
+ *
+ * @param source the source object, usually it should be <tt>this</tt> passed in as parameter
+ * @param name name which is appended to the thread name
+ * @return the created thread pool
+ */
+ ExecutorService newCachedThreadPool(Object source, String name);
+
+ /**
+ * Creates a new fixed thread pool.
+ *
+ * @param source the source object, usually it should be <tt>this</tt> passed in as parameter
+ * @param name name which is appended to the thread name
+ * @param poolSize the core pool size
+ * @return the created thread pool
+ */
+ ExecutorService newFixedThreadPool(Object source, String name, int poolSize);
+
+ /**
+ * Creates a new scheduled thread pool.
+ *
+ * @param source the source object, usually it should be <tt>this</tt> passed in as parameter
+ * @param name name which is appended to the thread name
+ * @param poolSize the core pool size
+ * @return the created thread pool
+ */
+ ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize);
+
+ /**
+ * Creates a new single-threaded thread pool. This is often used for background threads.
+ *
+ * @param source the source object, usually it should be <tt>this</tt> passed in as parameter
+ * @param name name which is appended to the thread name
+ * @return the created thread pool
+ */
+ ScheduledExecutorService newSingleThreadScheduledExecutor(Object source, String name);
/**
* Shutdown the given executor service.
@@ -107,9 +259,4 @@ public interface ExecutorServiceManager
* @see java.util.concurrent.ExecutorService#shutdownNow()
*/
List<Runnable> shutdownNow(ExecutorService executorService);
-
-
- ExecutorService newCachedThreadPool(Object source, String name);
-
- ExecutorService newSynchronousExecutorService(String string, Object source);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolFactory.java?rev=1157157&r1=1157156&r2=1157157&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolFactory.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolFactory.java Fri Aug 12 15:32:50 2011
@@ -17,16 +17,73 @@
package org.apache.camel.spi;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
/**
- * Creates ExecutorService and ScheduledExecutorService objects that work with a thread pool for a given ThreadPoolProfile and ThreadFactory.
- *
- * This interface allows to customize the creation of these objects to adapt camel for application servers and other environments where thread pools
- * should not be created with the jdk methods
+ * Factory to crate {@link ExecutorService} and {@link ScheduledExecutorService} instances
+ * <p/>
+ * This interface allows to customize the creation of these objects to adapt Camel
+ * for application servers and other environments where thread pools should
+ * not be created with the JDK methods, as provided by the {@link org.apache.camel.impl.DefaultThreadPoolFactory}.
+ *
+ * @see ExecutorServiceManager
*/
public interface ThreadPoolFactory {
- ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory);
- ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory);
+
+ /**
+ * Creates a new cached thread pool
+ * <p/>
+ * The cached thread pool is a term from the JDK from the method {@link java.util.concurrent.Executors#newCachedThreadPool()}.
+ * Implementators of this interface, may create a different kind of pool than the cached, or check the source code
+ * of the JDK to create a pool using the same settings.
+ *
+ * @param threadFactory factory for creating threads
+ * @return the created thread pool
+ */
+ ExecutorService newCachedThreadPool(ThreadFactory threadFactory);
+
+ /**
+ * Creates a new fixed thread pool
+ * <p/>
+ * The fixed thread pool is a term from the JDK from the method {@link java.util.concurrent.Executors#newFixedThreadPool(int)}.
+ * Implementators of this interface, may create a different kind of pool than the fixed, or check the source code
+ * of the JDK to create a pool using the same settings.
+ *
+ * @param poolSize the number of threads in the pool
+ * @param threadFactory factory for creating threads
+ * @return the created thread pool
+ */
+ ExecutorService newFixedThreadPool(int poolSize, ThreadFactory threadFactory);
+
+ /**
+ * Creates a new scheduled thread pool
+ *
+ * @param corePoolSize the core pool size
+ * @param threadFactory factory for creating threads
+ * @return the created thread pool
+ * @throws IllegalArgumentException if parameters is not valid
+ */
+ ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) throws IllegalArgumentException;
+
+ /**
+ * Creates a new thread pool
+ *
+ * @param corePoolSize the core pool size
+ * @param maxPoolSize the maximum pool size
+ * @param keepAliveTime keep alive time
+ * @param timeUnit keep alive time unit
+ * @param maxQueueSize the maximum number of tasks in the queue, use <tt>Integer.MAX_VALUE</tt> or <tt>-1</tt> to indicate unbounded
+ * @param rejectedExecutionHandler the handler for tasks which cannot be executed by the thread pool.
+ * If <tt>null</tt> is provided then {@link java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy CallerRunsPolicy} is used.
+ * @param threadFactory factory for creating threads
+ * @return the created thread pool
+ * @throws IllegalArgumentException if parameters is not valid
+ */
+ ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit,
+ int maxQueueSize, RejectedExecutionHandler rejectedExecutionHandler,
+ ThreadFactory threadFactory) throws IllegalArgumentException;
+
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java?rev=1157157&r1=1157156&r2=1157157&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java Fri Aug 12 15:32:50 2011
@@ -36,22 +36,36 @@ public class ThreadPoolProfile {
private TimeUnit timeUnit;
private Integer maxQueueSize;
private ThreadPoolRejectedPolicy rejectedPolicy;
- private Boolean shared;
- private Boolean daemon;
- private String threadName;
+ /**
+ * Creates a new thread pool profile, with no id set.
+ */
public ThreadPoolProfile() {
}
-
+
+ /**
+ * Creates a new thread pool profile
+ *
+ * @param id id of the profile
+ */
public ThreadPoolProfile(String id) {
this.id = id;
- this.threadName = id;
}
+ /**
+ * Gets the id of this profile
+ *
+ * @return the id of this profile
+ */
public String getId() {
return id;
}
+ /**
+ * Sets the id of this profile
+ *
+ * @param id profile id
+ */
public void setId(String id) {
this.id = id;
}
@@ -83,6 +97,11 @@ public class ThreadPoolProfile {
return poolSize;
}
+ /**
+ * Sets the core pool size (threads to keep minimum in pool)
+ *
+ * @param poolSize the pool size
+ */
public void setPoolSize(Integer poolSize) {
this.poolSize = poolSize;
}
@@ -97,9 +116,9 @@ public class ThreadPoolProfile {
}
/**
- * Sets the core pool size (threads to keep minimum in pool)
+ * Sets the maximum pool size
*
- * @param poolSize the pool size
+ * @param maxPoolSize the max pool size
*/
public void setMaxPoolSize(Integer maxPoolSize) {
this.maxPoolSize = maxPoolSize;
@@ -124,18 +143,18 @@ public class ThreadPoolProfile {
}
/**
- * Sets the time unit used for keep alive time
+ * Gets the time unit used for keep alive time
*
- * @param timeUnit the time unit
+ * @return the time unit
*/
public TimeUnit getTimeUnit() {
return timeUnit;
}
/**
- * Gets the time unit used for keep alive time
+ * Sets the time unit used for keep alive time
*
- * @return the time unit
+ * @param timeUnit the time unit
*/
public void setTimeUnit(TimeUnit timeUnit) {
this.timeUnit = timeUnit;
@@ -164,7 +183,7 @@ public class ThreadPoolProfile {
}
/**
- * Gets the handler for tasks which cannot be executed by the thread pool.
+ * Gets the policy for tasks which cannot be executed by the thread pool.
*
* @return the policy for the handler
*/
@@ -193,74 +212,10 @@ public class ThreadPoolProfile {
this.rejectedPolicy = rejectedPolicy;
}
- public String getThreadName() {
- return threadName;
- }
-
- public void setThreadName(String threadName) {
- this.threadName = threadName;
- }
-
- public boolean isShared() {
- return shared;
- }
-
- public void setShared(boolean shared) {
- this.shared = shared;
- }
-
@Override
public String toString() {
return "ThreadPoolProfile[" + id + ", " + defaultProfile + ", " + poolSize + ", " + maxPoolSize + ", "
+ keepAliveTime + " " + timeUnit + ", " + maxPoolSize + ", " + rejectedPolicy + "]";
}
- public void setDaemon(boolean daemon) {
- this.daemon = daemon;
- }
-
- public boolean isDaemon() {
- return daemon == null ? false : daemon;
- }
-
- public ThreadPoolProfile getEffectiveProfile(ThreadPoolProfile profile) {
- ThreadPoolProfile defaultProfile = this;
- ThreadPoolProfile eProfile = new ThreadPoolProfile();
- eProfile.setPoolSize(profile.getPoolSize() != null ? profile.getPoolSize() : defaultProfile.getPoolSize());
- eProfile.setMaxPoolSize(profile.getMaxPoolSize() != null ? profile.getMaxPoolSize() : defaultProfile.getMaxPoolSize());
- eProfile.setKeepAliveTime(profile.getKeepAliveTime() != null ? profile.getKeepAliveTime() : defaultProfile.getKeepAliveTime());
- eProfile.setTimeUnit(profile.getTimeUnit() != null ? profile.getTimeUnit() : defaultProfile.getTimeUnit());
- eProfile.setMaxQueueSize(profile.getMaxQueueSize() != null ? profile.getMaxQueueSize() : defaultProfile.getMaxQueueSize());
- eProfile.setRejectedPolicy(profile.getRejectedPolicy() != null ? profile.getRejectedPolicy() : defaultProfile.getRejectedPolicy());
- return eProfile;
- }
-
- /**
- * Overwrites each attribute that is null with the attribute from defaultProfile
- *
- * @param defaultProfile2
- */
- public void addDefaults(ThreadPoolProfile defaultProfile2) {
- if (defaultProfile2 == null) {
- return;
- }
- if (poolSize == null) {
- poolSize = defaultProfile2.getPoolSize();
- }
- if (maxPoolSize == null) {
- maxPoolSize = defaultProfile2.getMaxPoolSize();
- }
- if (keepAliveTime == null) {
- keepAliveTime = defaultProfile2.getKeepAliveTime();
- }
- if (timeUnit == null) {
- timeUnit = defaultProfile2.getTimeUnit();
- }
- if (maxQueueSize == null) {
- maxQueueSize = defaultProfile2.getMaxQueueSize();
- }
- if (rejectedPolicy == null) {
- rejectedPolicy = defaultProfile2.getRejectedPolicy();
- }
- }
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SynchronousExecutorService.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SynchronousExecutorService.java?rev=1157157&r1=1157156&r2=1157157&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SynchronousExecutorService.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SynchronousExecutorService.java Fri Aug 12 15:32:50 2011
@@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit;
* <p/>
* This implementation is very simple and does not support waiting for tasks to complete during shutdown.
*
- * @version
+ * @version
*/
public class SynchronousExecutorService extends AbstractExecutorService {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ThreadHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ThreadHelper.java?rev=1157157&r1=1157156&r2=1157157&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ThreadHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ThreadHelper.java Fri Aug 12 15:32:50 2011
@@ -20,6 +20,9 @@ import java.util.concurrent.atomic.Atomi
import org.apache.camel.util.ObjectHelper;
+/**
+ * Various helper method for thread naming.
+ */
public final class ThreadHelper {
public static final String DEFAULT_PATTERN = "Camel Thread ${counter} - ${name}";
@@ -33,7 +36,7 @@ public final class ThreadHelper {
}
/**
- * Creates a new thread name with the given prefix
+ * Creates a new thread name with the given pattern
*
* @param pattern the pattern
* @param name the name
@@ -67,5 +70,4 @@ public final class ThreadHelper {
return answer;
}
-
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java?rev=1157157&r1=1157156&r2=1157157&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java Fri Aug 12 15:32:50 2011
@@ -23,7 +23,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.ThreadPoolRejectedPolicy;
import org.apache.camel.impl.JndiRegistry;
-import org.apache.camel.spi.ThreadPoolProfile;
/**
* @version
@@ -37,9 +36,10 @@ public class ThreadPoolBuilderTest exten
jndi.bind("someonesPool", someone);
return jndi;
}
-
- private void getAndShutdown(ThreadPoolProfile profile) throws Exception {
- ExecutorService executor = context.getExecutorServiceManager().getExecutorService(profile, this);
+
+ public void testThreadPoolBuilderDefault() throws Exception {
+ ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
+ ExecutorService executor = builder.build(this, "myPool");
assertNotNull(executor);
assertEquals(false, executor.isShutdown());
@@ -47,48 +47,73 @@ public class ThreadPoolBuilderTest exten
assertEquals(true, executor.isShutdown());
}
- public void testThreadPoolBuilderDefault() throws Exception {
- ThreadPoolProfile profile = new ThreadPoolBuilder("myPool").build();
- getAndShutdown(profile);
- }
-
public void testThreadPoolBuilderMaxQueueSize() throws Exception {
- ThreadPoolProfile profile = new ThreadPoolBuilder("myPool").maxQueueSize(2000).build();
- getAndShutdown(profile);
+ ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
+ ExecutorService executor = builder.maxQueueSize(2000).build(this, "myPool");
+ assertNotNull(executor);
+
+ assertEquals(false, executor.isShutdown());
+ context.stop();
+ assertEquals(true, executor.isShutdown());
}
public void testThreadPoolBuilderMax() throws Exception {
- ThreadPoolProfile profile = new ThreadPoolBuilder("myPool").maxPoolSize(100).build();
- getAndShutdown(profile);
+ ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
+ ExecutorService executor = builder.maxPoolSize(100).build(this, "myPool");
+ assertNotNull(executor);
+
+ assertEquals(false, executor.isShutdown());
+ context.stop();
+ assertEquals(true, executor.isShutdown());
}
public void testThreadPoolBuilderCoreAndMax() throws Exception {
- ThreadPoolProfile profile = new ThreadPoolBuilder("myPool").poolSize(50).maxPoolSize(100).build();
- getAndShutdown(profile);
+ ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
+ ExecutorService executor = builder.poolSize(50).maxPoolSize(100).build(this, "myPool");
+ assertNotNull(executor);
+
+ assertEquals(false, executor.isShutdown());
+ context.stop();
+ assertEquals(true, executor.isShutdown());
}
public void testThreadPoolBuilderKeepAlive() throws Exception {
- ThreadPoolProfile profile = new ThreadPoolBuilder("myPool").keepAliveTime(30).build();
- getAndShutdown(profile);
+ ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
+ ExecutorService executor = builder.keepAliveTime(30).build(this, "myPool");
+ assertNotNull(executor);
+
+ assertEquals(false, executor.isShutdown());
+ context.stop();
+ assertEquals(true, executor.isShutdown());
}
public void testThreadPoolBuilderKeepAliveTimeUnit() throws Exception {
- ThreadPoolProfile profile = new ThreadPoolBuilder("myPool").keepAliveTime(20000).timeUnit(TimeUnit.MILLISECONDS).build();
- getAndShutdown(profile);
+ ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
+ ExecutorService executor = builder.keepAliveTime(20000, TimeUnit.MILLISECONDS).build(this, "myPool");
+ assertNotNull(executor);
+
+ assertEquals(false, executor.isShutdown());
+ context.stop();
+ assertEquals(true, executor.isShutdown());
}
public void testThreadPoolBuilderAll() throws Exception {
- ThreadPoolProfile profile = new ThreadPoolBuilder("myPool").poolSize(50).maxPoolSize(100).maxQueueSize(2000)
- .keepAliveTime(20000)
- .timeUnit(TimeUnit.MILLISECONDS)
- .rejectedPolicy(ThreadPoolRejectedPolicy.DiscardOldest)
- .build();
- getAndShutdown(profile);
+ ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
+ ExecutorService executor = builder.poolSize(50).maxPoolSize(100).maxQueueSize(2000)
+ .keepAliveTime(20000, TimeUnit.MILLISECONDS)
+ .rejectedPolicy(ThreadPoolRejectedPolicy.DiscardOldest)
+ .build(this, "myPool");
+ assertNotNull(executor);
+
+ assertEquals(false, executor.isShutdown());
+ context.stop();
+ assertEquals(true, executor.isShutdown());
}
public void testThreadPoolBuilderTwoPoolsDefault() throws Exception {
- ExecutorService executor = context.getExecutorServiceManager().getExecutorService(new ThreadPoolBuilder("myPool").build(), this);
- ExecutorService executor2 = context.getExecutorServiceManager().getExecutorService(new ThreadPoolBuilder("myOtherPool").build(), this);
+ ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
+ ExecutorService executor = builder.build(this, "myPool");
+ ExecutorService executor2 = builder.build(this, "myOtherPool");
assertNotNull(executor);
assertNotNull(executor2);
@@ -100,5 +125,4 @@ public class ThreadPoolBuilderTest exten
assertEquals(true, executor2.isShutdown());
}
-
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java?rev=1157157&r1=1157156&r2=1157157&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java Fri Aug 12 15:32:50 2011
@@ -23,21 +23,13 @@ import java.util.concurrent.TimeUnit;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.ThreadPoolRejectedPolicy;
import org.apache.camel.spi.ThreadPoolProfile;
-import org.apache.camel.util.concurrent.ThreadHelper;
/**
* @version
*/
public class DefaultExecutorServiceManagerTest extends ContextTestSupport {
- public void testGetThreadName() {
- String name = ThreadHelper.resolveThreadName("Camel Thread ${counter} - ${name}", "foo");
-
- assertTrue(name.startsWith("Camel Thread"));
- assertTrue(name.endsWith("foo"));
- }
-
- public void testGetThreadNameDefaultPattern() throws Exception {
+ public void testResolveThreadNameDefaultPattern() throws Exception {
String foo = context.getExecutorServiceManager().resolveThreadName("foo");
String bar = context.getExecutorServiceManager().resolveThreadName("bar");
@@ -50,6 +42,7 @@ public class DefaultExecutorServiceManag
public void testGetThreadNameCustomPattern() throws Exception {
context.getExecutorServiceManager().setThreadNamePattern("#${counter} - ${name}");
+ assertEquals("#${counter} - ${name}", context.getExecutorServiceManager().getThreadNamePattern());
String foo = context.getExecutorServiceManager().resolveThreadName("foo");
String bar = context.getExecutorServiceManager().resolveThreadName("bar");
@@ -127,7 +120,7 @@ public class DefaultExecutorServiceManag
}
public void testDefaultThreadPool() throws Exception {
- ExecutorService myPool = context.getExecutorServiceManager().getDefaultExecutorService("myPool", this);
+ ExecutorService myPool = context.getExecutorServiceManager().newDefaultThreadPool(this, "myPool");
assertEquals(false, myPool.isShutdown());
// should use default settings
@@ -151,7 +144,7 @@ public class DefaultExecutorServiceManag
context.getExecutorServiceManager().setDefaultThreadPoolProfile(custom);
assertEquals(true, custom.isDefaultProfile().booleanValue());
- ExecutorService myPool = context.getExecutorServiceManager().getDefaultExecutorService("myPool", this);
+ ExecutorService myPool = context.getExecutorServiceManager().newDefaultThreadPool(this, "myPool");
assertEquals(false, myPool.isShutdown());
// should use default settings
@@ -175,7 +168,7 @@ public class DefaultExecutorServiceManag
context.getExecutorServiceManager().setDefaultThreadPoolProfile(custom);
assertEquals(true, custom.isDefaultProfile().booleanValue());
- ExecutorService myPool = context.getExecutorServiceManager().getDefaultExecutorService("myPool", this);
+ ExecutorService myPool = context.getExecutorServiceManager().newDefaultThreadPool(this, "myPool");
assertEquals(false, myPool.isShutdown());
// should use default settings
@@ -190,6 +183,8 @@ public class DefaultExecutorServiceManag
}
public void testGetThreadPoolProfile() throws Exception {
+ assertNull(context.getExecutorServiceManager().getThreadPoolProfile("foo"));
+
ThreadPoolProfile foo = new ThreadPoolProfile("foo");
foo.setKeepAliveTime(20L);
foo.setMaxPoolSize(40);
@@ -229,13 +224,13 @@ public class DefaultExecutorServiceManag
}
public void testGetThreadPoolProfileInheritDefaultValues() throws Exception {
- assertNull(context.getExecutorServiceManager().getThreadPoolProfile("fooProfile"));
- ThreadPoolProfile foo = new ThreadPoolProfile("fooProfile");
+ assertNull(context.getExecutorServiceManager().getThreadPoolProfile("foo"));
+ ThreadPoolProfile foo = new ThreadPoolProfile("foo");
foo.setMaxPoolSize(40);
context.getExecutorServiceManager().registerThreadPoolProfile(foo);
- assertSame(foo, context.getExecutorServiceManager().getThreadPoolProfile("fooProfile"));
+ assertSame(foo, context.getExecutorServiceManager().getThreadPoolProfile("foo"));
- ExecutorService executor = context.getExecutorServiceManager().getDefaultExecutorService("fooProfile", this);
+ ExecutorService executor = context.getExecutorServiceManager().newThreadPool(this, "MyPool", "foo");
ThreadPoolExecutor tp = assertIsInstanceOf(ThreadPoolExecutor.class, executor);
assertEquals(40, tp.getMaximumPoolSize());
// should inherit the default values
@@ -260,7 +255,7 @@ public class DefaultExecutorServiceManag
context.getExecutorServiceManager().registerThreadPoolProfile(foo);
assertSame(foo, context.getExecutorServiceManager().getThreadPoolProfile("foo"));
- ExecutorService executor = context.getExecutorServiceManager().getDefaultExecutorService("foo", this);
+ ExecutorService executor = context.getExecutorServiceManager().newThreadPool(this, "MyPool", "foo");
ThreadPoolExecutor tp = assertIsInstanceOf(ThreadPoolExecutor.class, executor);
assertEquals(25, tp.getMaximumPoolSize());
@@ -282,7 +277,7 @@ public class DefaultExecutorServiceManag
context.getExecutorServiceManager().registerThreadPoolProfile(foo);
assertSame(foo, context.getExecutorServiceManager().getThreadPoolProfile("foo"));
- ExecutorService executor = context.getExecutorServiceManager().getDefaultExecutorService("foo", this);
+ ExecutorService executor = context.getExecutorServiceManager().newThreadPool(this, "MyPool", "foo");
ThreadPoolExecutor tp = assertIsInstanceOf(ThreadPoolExecutor.class, executor);
assertEquals(1, tp.getCorePoolSize());
@@ -303,7 +298,7 @@ public class DefaultExecutorServiceManag
context.getExecutorServiceManager().registerThreadPoolProfile(foo);
- ExecutorService pool = context.getExecutorServiceManager().getDefaultExecutorService("foo", this);
+ ExecutorService pool = context.getExecutorServiceManager().newThreadPool(this, "Cool", "foo");
assertNotNull(pool);
ThreadPoolExecutor tp = assertIsInstanceOf(ThreadPoolExecutor.class, pool);
@@ -318,6 +313,12 @@ public class DefaultExecutorServiceManag
}
public void testLookupThreadPoolProfile() throws Exception {
+ ExecutorService pool = context.getExecutorServiceManager().lookup(this, "Cool", "fooProfile");
+ // does not exists yet
+ assertNull(pool);
+
+ assertNull(context.getExecutorServiceManager().getThreadPoolProfile("fooProfile"));
+
ThreadPoolProfile foo = new ThreadPoolProfile("fooProfile");
foo.setKeepAliveTime(20L);
foo.setMaxPoolSize(40);
@@ -326,7 +327,7 @@ public class DefaultExecutorServiceManag
context.getExecutorServiceManager().registerThreadPoolProfile(foo);
- Object pool = context.getExecutorServiceManager().getDefaultExecutorService("fooProfile", this);
+ pool = context.getExecutorServiceManager().lookup(this, "Cool", "fooProfile");
assertNotNull(pool);
ThreadPoolExecutor tp = assertIsInstanceOf(ThreadPoolExecutor.class, pool);
@@ -340,4 +341,6 @@ public class DefaultExecutorServiceManag
assertTrue(tp.isShutdown());
}
+ // TODO: Add unit test for the newXXX methods
+
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedThreadPoolProfileTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedThreadPoolProfileTest.java?rev=1157157&r1=1157156&r2=1157157&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedThreadPoolProfileTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedThreadPoolProfileTest.java Fri Aug 12 15:32:50 2011
@@ -16,14 +16,11 @@
*/
package org.apache.camel.management;
-import java.util.concurrent.TimeUnit;
-
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.camel.ThreadPoolRejectedPolicy;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.builder.ThreadPoolBuilder;
import org.apache.camel.spi.ThreadPoolProfile;
/**
@@ -74,13 +71,12 @@ public class ManagedThreadPoolProfileTes
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- ThreadPoolProfile profile = new ThreadPoolBuilder("custom")
- .poolSize(5)
- .maxPoolSize(15)
- .keepAliveTime(25, TimeUnit.SECONDS)
- .maxQueueSize(250)
- .rejectedPolicy(ThreadPoolRejectedPolicy.Abort)
- .build();
+ ThreadPoolProfile profile = new ThreadPoolProfile("custom");
+ profile.setPoolSize(5);
+ profile.setMaxPoolSize(15);
+ profile.setKeepAliveTime(25L);
+ profile.setMaxQueueSize(250);
+ profile.setRejectedPolicy(ThreadPoolRejectedPolicy.Abort);
context.getExecutorServiceManager().registerThreadPoolProfile(profile);
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShutdownThreadPoolTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShutdownThreadPoolTest.java?rev=1157157&r1=1157156&r2=1157157&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShutdownThreadPoolTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShutdownThreadPoolTest.java Fri Aug 12 15:32:50 2011
@@ -94,7 +94,7 @@ public class AggregateShutdownThreadPool
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- myPool = context.getExecutorServiceManager().getDefaultExecutorService("myPool", this);
+ myPool = context.getExecutorServiceManager().newDefaultThreadPool(this, "myPool");
from("direct:foo").routeId("foo")
.aggregate(header("id"), new BodyInAggregatingStrategy()).completionSize(3)
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateThreadPoolProfileTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateThreadPoolProfileTest.java?rev=1157157&r1=1157156&r2=1157157&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateThreadPoolProfileTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateThreadPoolProfileTest.java Fri Aug 12 15:32:50 2011
@@ -19,7 +19,6 @@ package org.apache.camel.processor.aggre
import org.apache.camel.ContextTestSupport;
import org.apache.camel.ThreadPoolRejectedPolicy;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.builder.ThreadPoolBuilder;
import org.apache.camel.processor.BodyInAggregatingStrategy;
import org.apache.camel.spi.ThreadPoolProfile;
@@ -44,7 +43,10 @@ public class AggregateThreadPoolProfileT
@Override
public void configure() throws Exception {
// create and register thread pool profile
- ThreadPoolProfile profile = new ThreadPoolBuilder("myProfile").poolSize(2).maxPoolSize(8).rejectedPolicy(ThreadPoolRejectedPolicy.Abort).build();
+ ThreadPoolProfile profile = new ThreadPoolProfile("myProfile");
+ profile.setPoolSize(2);
+ profile.setMaxPoolSize(8);
+ profile.setRejectedPolicy(ThreadPoolRejectedPolicy.Abort);
context.getExecutorServiceManager().registerThreadPoolProfile(profile);
from("direct:start")
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateUnknownExecutorServiceRefTest.java (from r1156598, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateThreadPoolProfileTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateUnknownExecutorServiceRefTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateUnknownExecutorServiceRefTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateThreadPoolProfileTest.java&r1=1156598&r2=1157157&rev=1157157&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateThreadPoolProfileTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateUnknownExecutorServiceRefTest.java Fri Aug 12 15:32:50 2011
@@ -17,43 +17,40 @@
package org.apache.camel.processor.aggregator;
import org.apache.camel.ContextTestSupport;
-import org.apache.camel.ThreadPoolRejectedPolicy;
+import org.apache.camel.FailedToCreateRouteException;
+import org.apache.camel.NoSuchBeanException;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.builder.ThreadPoolBuilder;
import org.apache.camel.processor.BodyInAggregatingStrategy;
-import org.apache.camel.spi.ThreadPoolProfile;
/**
- * @version
+ *
*/
-public class AggregateThreadPoolProfileTest extends ContextTestSupport {
-
- public void testAggregateThreadPoolProfile() throws Exception {
- getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C");
-
- template.sendBodyAndHeader("direct:start", "A", "id", 123);
- template.sendBodyAndHeader("direct:start", "B", "id", 123);
- template.sendBodyAndHeader("direct:start", "C", "id", 123);
-
- assertMockEndpointsSatisfied();
- }
+public class AggregateUnknownExecutorServiceRefTest extends ContextTestSupport {
@Override
- protected RouteBuilder createRouteBuilder() throws Exception {
- return new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- // create and register thread pool profile
- ThreadPoolProfile profile = new ThreadPoolBuilder("myProfile").poolSize(2).maxPoolSize(8).rejectedPolicy(ThreadPoolRejectedPolicy.Abort).build();
- context.getExecutorServiceManager().registerThreadPoolProfile(profile);
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
- from("direct:start")
- .aggregate(header("id"), new BodyInAggregatingStrategy())
- // use our custom thread pool profile
- .completionSize(3).executorServiceRef("myProfile")
- .to("log:foo")
- .to("mock:aggregated");
- }
- };
+ public void testAggregateUnknownExecutorServiceRef() throws Exception {
+ try {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(header("id"), new BodyInAggregatingStrategy())
+ // use an unknown executor service ref should fail
+ .completionSize(3).executorServiceRef("myUnknownProfile")
+ .to("log:foo")
+ .to("mock:aggregated");
+ }
+ });
+ context.start();
+ fail("Should have thrown exception");
+ } catch (FailedToCreateRouteException e) {
+ NoSuchBeanException cause = assertIsInstanceOf(NoSuchBeanException.class, e.getCause());
+ assertEquals("myUnknownProfile", cause.getName());
+ }
}
+
}
\ No newline at end of file
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncDeadLetterChannelExecutorServiceRefTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncDeadLetterChannelExecutorServiceRefTest.java?rev=1157157&r1=1157156&r2=1157157&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncDeadLetterChannelExecutorServiceRefTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncDeadLetterChannelExecutorServiceRefTest.java Fri Aug 12 15:32:50 2011
@@ -20,7 +20,6 @@ import org.apache.camel.ContextTestSuppo
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.builder.ThreadPoolBuilder;
import org.apache.camel.spi.ThreadPoolProfile;
/**
@@ -39,7 +38,8 @@ public class AsyncDeadLetterChannelExecu
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
- ThreadPoolProfile profile = new ThreadPoolBuilder("myAsyncPool").poolSize(5).build();
+ ThreadPoolProfile profile = new ThreadPoolProfile("myAsyncPool");
+ profile.setPoolSize(5);
context.getExecutorServiceManager().registerThreadPoolProfile(profile);
errorHandler(deadLetterChannel("mock:dead")
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncProducer.java?rev=1157157&r1=1157156&r2=1157157&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncProducer.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncProducer.java Fri Aug 12 15:32:50 2011
@@ -38,7 +38,7 @@ public class MyAsyncProducer extends Def
public MyAsyncProducer(MyAsyncEndpoint endpoint) {
super(endpoint);
- this.executor = endpoint.getCamelContext().getExecutorServiceManager().getDefaultExecutorService("MyProducer", this);
+ this.executor = endpoint.getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MyProducer");
}
public MyAsyncEndpoint getEndpoint() {
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java?rev=1157157&r1=1157156&r2=1157157&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java Fri Aug 12 15:32:50 2011
@@ -18,16 +18,12 @@ package org.apache.camel.util;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import junit.framework.TestCase;
-import org.apache.camel.CamelContext;
-import org.apache.camel.builder.ThreadPoolBuilder;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.spi.ThreadPoolProfile;
-
/**
* @version
*/
@@ -106,9 +102,7 @@ public class DefaultTimeoutMapTest exten
}
public void testExecutor() throws Exception {
- CamelContext camelContext = new DefaultCamelContext();
- ThreadPoolProfile profile = new ThreadPoolBuilder("foo").poolSize(2).daemon().build();
- ScheduledExecutorService e = camelContext.getExecutorServiceManager().getScheduledExecutorService(profile, this);
+ ScheduledExecutorService e = Executors.newScheduledThreadPool(2);
DefaultTimeoutMap<String, Integer> map = new DefaultTimeoutMap<String, Integer>(e, 50);
assertEquals(50, map.getPurgePollTime());
Modified: camel/trunk/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java?rev=1157157&r1=1157156&r2=1157157&view=diff
==============================================================================
--- camel/trunk/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java (original)
+++ camel/trunk/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java Fri Aug 12 15:32:50 2011
@@ -570,13 +570,13 @@ public abstract class AbstractCamelConte
// use custom profiles defined in the CamelContext
if (getThreadPoolProfiles() != null && !getThreadPoolProfiles().isEmpty()) {
- for (ThreadPoolProfileDefinition profile : getThreadPoolProfiles()) {
- if (profile.isDefaultProfile()) {
- LOG.info("Using custom default ThreadPoolProfile with id: " + profile.getId() + " and implementation: " + profile);
- context.getExecutorServiceManager().setDefaultThreadPoolProfile(profile.asThreadPoolProfile(context));
- defaultIds.add(profile.getId());
+ for (ThreadPoolProfileDefinition definition : getThreadPoolProfiles()) {
+ if (definition.isDefaultProfile()) {
+ LOG.info("Using custom default ThreadPoolProfile with id: " + definition.getId() + " and implementation: " + definition);
+ context.getExecutorServiceManager().setDefaultThreadPoolProfile(asThreadPoolProfile(context, definition));
+ defaultIds.add(definition.getId());
} else {
- context.getExecutorServiceManager().registerThreadPoolProfile(profile.asThreadPoolProfile(context));
+ context.getExecutorServiceManager().registerThreadPoolProfile(asThreadPoolProfile(context, definition));
}
}
}
@@ -587,6 +587,26 @@ public abstract class AbstractCamelConte
}
}
+ /**
+ * Creates a {@link ThreadPoolProfile} instance based on the definition.
+ *
+ * @param context the camel context
+ * @return the profile
+ * @throws Exception is thrown if error creating the profile
+ */
+ private ThreadPoolProfile asThreadPoolProfile(CamelContext context, ThreadPoolProfileDefinition definition) throws Exception {
+ ThreadPoolProfile answer = new ThreadPoolProfile();
+ answer.setId(definition.getId());
+ answer.setDefaultProfile(definition.getDefaultProfile());
+ answer.setPoolSize(CamelContextHelper.parseInteger(context, definition.getPoolSize()));
+ answer.setMaxPoolSize(CamelContextHelper.parseInteger(context, definition.getMaxPoolSize()));
+ answer.setKeepAliveTime(CamelContextHelper.parseLong(context, definition.getKeepAliveTime()));
+ answer.setMaxQueueSize(CamelContextHelper.parseInteger(context, definition.getMaxQueueSize()));
+ answer.setRejectedPolicy(definition.getRejectedPolicy());
+ answer.setTimeUnit(definition.getTimeUnit());
+ return answer;
+ }
+
protected abstract void initBeanPostProcessor(T context);
/**
Modified: camel/trunk/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelThreadPoolFactoryBean.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelThreadPoolFactoryBean.java?rev=1157157&r1=1157156&r2=1157157&view=diff
==============================================================================
--- camel/trunk/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelThreadPoolFactoryBean.java (original)
+++ camel/trunk/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelThreadPoolFactoryBean.java Fri Aug 12 15:32:50 2011
@@ -18,7 +18,6 @@ package org.apache.camel.core.xml;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
-
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
@@ -28,7 +27,6 @@ import org.apache.camel.CamelContext;
import org.apache.camel.ThreadPoolRejectedPolicy;
import org.apache.camel.builder.ThreadPoolBuilder;
import org.apache.camel.builder.xml.TimeUnitAdapter;
-import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.util.CamelContextHelper;
/**
@@ -76,15 +74,10 @@ public abstract class AbstractCamelThrea
queueSize = CamelContextHelper.parseInteger(getCamelContext(), maxQueueSize);
}
- ThreadPoolProfile profile = new ThreadPoolBuilder(getId())
- .threadName(getThreadName())
- .poolSize(size)
- .maxPoolSize(max)
- .keepAliveTime(keepAlive, getTimeUnit())
- .maxQueueSize(queueSize)
- .rejectedPolicy(rejectedPolicy)
- .build();
- ExecutorService answer = getCamelContext().getExecutorServiceManager().createExecutorService(profile , getId());
+ ExecutorService answer = new ThreadPoolBuilder(getCamelContext())
+ .poolSize(size).maxPoolSize(max).keepAliveTime(keepAlive, getTimeUnit())
+ .maxQueueSize(queueSize).rejectedPolicy(getRejectedPolicy())
+ .build(getId(), getThreadName());
return answer;
}
@@ -150,5 +143,4 @@ public abstract class AbstractCamelThrea
this.threadName = threadName;
}
-
}
\ No newline at end of file
Modified: camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java?rev=1157157&r1=1157156&r2=1157157&view=diff
==============================================================================
--- camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java (original)
+++ camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java Fri Aug 12 15:32:50 2011
@@ -26,7 +26,6 @@ import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.builder.ThreadPoolBuilder;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.DefaultExchangeHolder;
@@ -53,7 +52,7 @@ public class HazelcastSedaConsumer exten
@Override
protected void doStart() throws Exception {
int concurrentConsumers = endpoint.getConfiguration().getConcurrentConsumers();
- executor = endpoint.getCamelContext().getExecutorServiceManager().getExecutorService(ThreadPoolBuilder.fixedThreadExecutor(endpoint.getEndpointUri(), concurrentConsumers), this);
+ executor = endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, endpoint.getEndpointUri(), concurrentConsumers);
for (int i = 0; i < concurrentConsumers; i++) {
executor.execute(this);
}
Modified: camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java?rev=1157157&r1=1157156&r2=1157157&view=diff
==============================================================================
--- camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java (original)
+++ camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java Fri Aug 12 15:32:50 2011
@@ -23,7 +23,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.camel.Exchange;
-import org.apache.camel.builder.ThreadPoolBuilder;
import org.apache.camel.impl.DefaultProducer;
public class HdfsProducer extends DefaultProducer {
@@ -101,7 +100,7 @@ public class HdfsProducer extends Defaul
}
}
if (idleStrategy != null) {
- scheduler = getEndpoint().getCamelContext().getExecutorServiceManager().getScheduledExecutorService(ThreadPoolBuilder.singleThreadExecutor("IdleCheck"), this);
+ scheduler = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "HdfsIdleCheck");
log.debug("Creating IdleCheck task scheduled to run every {} millis", config.getCheckIdleInterval());
scheduler.scheduleAtFixedRate(new IdleCheck(idleStrategy), 1000, config.getCheckIdleInterval(), TimeUnit.MILLISECONDS);
}
@@ -110,11 +109,11 @@ public class HdfsProducer extends Defaul
@Override
protected void doStop() throws Exception {
super.doStop();
- ostream.close();
if (scheduler != null) {
- getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(scheduler);
+ getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(scheduler);
scheduler = null;
}
+ ostream.close();
}
@Override
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1157157&r1=1157156&r2=1157157&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Fri Aug 12 15:32:50 2011
@@ -40,7 +40,6 @@ import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.Service;
import org.apache.camel.ServiceStatus;
-import org.apache.camel.builder.ThreadPoolBuilder;
import org.apache.camel.component.jms.reply.PersistentQueueReplyManager;
import org.apache.camel.component.jms.reply.ReplyManager;
import org.apache.camel.component.jms.reply.TemporaryQueueReplyManager;
@@ -213,9 +212,7 @@ public class JmsEndpoint extends Default
// include destination name as part of thread name
String name = "JmsConsumer[" + getEndpointConfiguredDestinationName() + "]";
// use a cached pool as DefaultMessageListenerContainer will throttle pool sizing
- // TODO: The refactored ExecutorServiceManager was not good, now we dont have the JDK API anymore
- // we need the CachedThreadPool here, but the refactored API does not offer that anymore
- ExecutorService executor = getCamelContext().getExecutorServiceManager().getDefaultExecutorService(name, consumer);
+ ExecutorService executor = getCamelContext().getExecutorServiceManager().newCachedThreadPool(consumer, name);
setContainerTaskExecutor(listenerContainer, executor);
}
}
@@ -444,7 +441,7 @@ public class JmsEndpoint extends Default
protected synchronized ScheduledExecutorService getReplyManagerExecutorService() {
if (replyManagerExecutorService == null) {
String name = "JmsReplyManagerTimeoutChecker[" + getEndpointConfiguredDestinationName() + "]";
- replyManagerExecutorService = getCamelContext().getExecutorServiceManager().getScheduledExecutorService(ThreadPoolBuilder.singleThreadExecutor(name), this);
+ replyManagerExecutorService = getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name, name);
}
return replyManagerExecutorService;
}
Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java?rev=1157157&r1=1157156&r2=1157157&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java Fri Aug 12 15:32:50 2011
@@ -26,7 +26,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-
import javax.jms.ConnectionFactory;
import org.apache.activemq.camel.component.ActiveMQComponent;
@@ -37,7 +36,6 @@ import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.builder.ThreadPoolBuilder;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Before;
import org.junit.Test;
@@ -452,7 +450,7 @@ public class JmsRouteRequestReplyTest ex
// start template
template.start();
- ExecutorService executor = context.getExecutorServiceManager().getExecutorService(ThreadPoolBuilder.fixedThreadExecutor("Task", maxTasks), this);
+ ExecutorService executor = context.getExecutorServiceManager().newFixedThreadPool(this, "Task", maxTasks);
CompletionService<Task> completionService = new ExecutorCompletionService<Task>(executor);
final AtomicInteger counter = new AtomicInteger(-1);
Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/MyAsyncProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/MyAsyncProducer.java?rev=1157157&r1=1157156&r2=1157157&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/MyAsyncProducer.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/MyAsyncProducer.java Fri Aug 12 15:32:50 2011
@@ -38,7 +38,7 @@ public class MyAsyncProducer extends Def
public MyAsyncProducer(MyAsyncEndpoint endpoint) {
super(endpoint);
- this.executor = endpoint.getCamelContext().getExecutorServiceManager().getDefaultExecutorService("MyProducer", this);
+ this.executor = endpoint.getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MyProducer");
}
public MyAsyncEndpoint getEndpoint() {
Modified: camel/trunk/components/camel-kestrel/src/main/java/org/apache/camel/component/kestrel/KestrelConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-kestrel/src/main/java/org/apache/camel/component/kestrel/KestrelConsumer.java?rev=1157157&r1=1157156&r2=1157157&view=diff
==============================================================================
--- camel/trunk/components/camel-kestrel/src/main/java/org/apache/camel/component/kestrel/KestrelConsumer.java (original)
+++ camel/trunk/components/camel-kestrel/src/main/java/org/apache/camel/component/kestrel/KestrelConsumer.java Fri Aug 12 15:32:50 2011
@@ -27,7 +27,6 @@ import net.spy.memcached.MemcachedClient
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ShutdownRunningTask;
-import org.apache.camel.builder.ThreadPoolBuilder;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.spi.ShutdownAware;
@@ -64,7 +63,7 @@ public class KestrelConsumer extends Def
shutdownLatch = new CountDownLatch(poolSize + 1);
// Fire up the handler thread pool
- handlerExecutor = endpoint.getCamelContext().getExecutorServiceManager().getExecutorService(ThreadPoolBuilder.fixedThreadExecutor("Handlers-" + endpoint.getEndpointUri(), poolSize), this);
+ handlerExecutor = endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "Handlers-" + endpoint.getEndpointUri(), poolSize);
for (int k = 0; k < poolSize; ++k) {
handlerExecutor.execute(new Handler());
}
@@ -76,7 +75,7 @@ public class KestrelConsumer extends Def
}
// Fire up the single poller thread
- pollerExecutor = endpoint.getCamelContext().getExecutorServiceManager().getExecutorService(ThreadPoolBuilder.singleThreadExecutor("Poller-" + endpoint.getEndpointUri()), this);
+ pollerExecutor = endpoint.getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "Poller-" + endpoint.getEndpointUri());
pollerExecutor.submit(new Poller(poolSize > 1));
super.doStart();
Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java?rev=1157157&r1=1157156&r2=1157157&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java Fri Aug 12 15:32:50 2011
@@ -165,9 +165,9 @@ public class MinaComponent extends Defau
final int processorCount = Runtime.getRuntime().availableProcessors() + 1;
IoAcceptor acceptor = new SocketAcceptor(processorCount,
- getCamelContext().getExecutorServiceManager().getDefaultExecutorService("MinaSocketAcceptor", this));
+ getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaSocketAcceptor"));
IoConnector connector = new SocketConnector(processorCount,
- getCamelContext().getExecutorServiceManager().getDefaultExecutorService("MinaSocketConnector", this));
+ getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaSocketConnector"));
SocketAddress address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
// connector config
@@ -176,7 +176,7 @@ public class MinaComponent extends Defau
connectorConfig.setThreadModel(ThreadModel.MANUAL);
configureCodecFactory("MinaProducer", connectorConfig, configuration);
connectorConfig.getFilterChain().addLast("threadPool",
- new ExecutorFilter(getCamelContext().getExecutorServiceManager().getDefaultExecutorService("MinaThreadPool", this)));
+ new ExecutorFilter(getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaThreadPool")));
if (minaLogger) {
connectorConfig.getFilterChain().addLast("logger", new LoggingFilter());
}
@@ -193,7 +193,7 @@ public class MinaComponent extends Defau
acceptorConfig.setReuseAddress(true);
acceptorConfig.setDisconnectOnUnbind(true);
acceptorConfig.getFilterChain().addLast("threadPool",
- new ExecutorFilter(getCamelContext().getExecutorServiceManager().getDefaultExecutorService("MinaThreadPool", this)));
+ new ExecutorFilter(getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaThreadPool")));
if (minaLogger) {
acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
}
@@ -258,8 +258,8 @@ public class MinaComponent extends Defau
boolean sync = configuration.isSync();
List<IoFilter> filters = configuration.getFilters();
- IoAcceptor acceptor = new DatagramAcceptor(getCamelContext().getExecutorServiceManager().getDefaultExecutorService("MinaDatagramAcceptor", this));
- IoConnector connector = new DatagramConnector(getCamelContext().getExecutorServiceManager().getDefaultExecutorService("MinaDatagramConnector", this));
+ IoAcceptor acceptor = new DatagramAcceptor(getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaDatagramAcceptor"));
+ IoConnector connector = new DatagramConnector(getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaDatagramConnector"));
SocketAddress address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
if (transferExchange) {
@@ -271,7 +271,7 @@ public class MinaComponent extends Defau
connectorConfig.setThreadModel(ThreadModel.MANUAL);
configureDataGramCodecFactory("MinaProducer", connectorConfig, configuration);
connectorConfig.getFilterChain().addLast("threadPool",
- new ExecutorFilter(getCamelContext().getExecutorServiceManager().getDefaultExecutorService("MinaThreadPool", this)));
+ new ExecutorFilter(getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaThreadPool")));
if (minaLogger) {
connectorConfig.getFilterChain().addLast("logger", new LoggingFilter());
}
@@ -286,7 +286,7 @@ public class MinaComponent extends Defau
acceptorConfig.setDisconnectOnUnbind(true);
// reuse address is default true for datagram
acceptorConfig.getFilterChain().addLast("threadPool",
- new ExecutorFilter(getCamelContext().getExecutorServiceManager().getDefaultExecutorService("MinaThreadPool", this)));
+ new ExecutorFilter(getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaThreadPool")));
if (minaLogger) {
acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
}
Modified: camel/trunk/components/camel-nagios/src/main/java/org/apache/camel/component/nagios/NagiosProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-nagios/src/main/java/org/apache/camel/component/nagios/NagiosProducer.java?rev=1157157&r1=1157156&r2=1157157&view=diff
==============================================================================
--- camel/trunk/components/camel-nagios/src/main/java/org/apache/camel/component/nagios/NagiosProducer.java (original)
+++ camel/trunk/components/camel-nagios/src/main/java/org/apache/camel/component/nagios/NagiosProducer.java Fri Aug 12 15:32:50 2011
@@ -23,7 +23,6 @@ import com.googlecode.jsendnsca.core.Lev
import com.googlecode.jsendnsca.core.MessagePayload;
import com.googlecode.jsendnsca.core.NonBlockingNagiosPassiveCheckSender;
import org.apache.camel.Exchange;
-import org.apache.camel.builder.ThreadPoolBuilder;
import org.apache.camel.impl.DefaultProducer;
import static org.apache.camel.component.nagios.NagiosConstants.HOST_NAME;
@@ -68,7 +67,7 @@ public class NagiosProducer extends Defa
if (sender instanceof NonBlockingNagiosPassiveCheckSender) {
NonBlockingNagiosPassiveCheckSender nonBlocking = (NonBlockingNagiosPassiveCheckSender) sender;
ExecutorService executor = getEndpoint().getCamelContext().getExecutorServiceManager()
- .getExecutorService(ThreadPoolBuilder.singleThreadExecutor(getEndpoint().getEndpointUri()), this);
+ .newSingleThreadExecutor(this, getEndpoint().getEndpointUri());
nonBlocking.setExecutor(executor);
}
super.doStart();
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java?rev=1157157&r1=1157156&r2=1157157&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java Fri Aug 12 15:32:50 2011
@@ -21,9 +21,7 @@ import java.util.concurrent.ExecutorServ
import org.apache.camel.CamelContext;
import org.apache.camel.Processor;
-import org.apache.camel.builder.ThreadPoolBuilder;
import org.apache.camel.impl.DefaultConsumer;
-import org.apache.camel.spi.ThreadPoolProfile;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
@@ -145,10 +143,10 @@ public class NettyConsumer extends Defau
}
private void initializeTCPServerSocketCommunicationLayer() throws Exception {
- ThreadPoolProfile bossProfile = new ThreadPoolBuilder("NettyTCPBoss").poolSize(configuration.getCorePoolSize()).maxPoolSize(configuration.getMaxPoolSize()).build();
- ThreadPoolProfile workerProfile = new ThreadPoolBuilder("NettyTCPWorker").poolSize(configuration.getCorePoolSize()).maxPoolSize(configuration.getMaxPoolSize()).build();
- ExecutorService bossExecutor = context.getExecutorServiceManager().getExecutorService(bossProfile, this);
- ExecutorService workerExecutor = context.getExecutorServiceManager().getExecutorService(workerProfile, this);
+ ExecutorService bossExecutor = context.getExecutorServiceManager().newThreadPool(this, "NettyTCPBoss",
+ configuration.getCorePoolSize(), configuration.getMaxPoolSize());
+ ExecutorService workerExecutor = context.getExecutorServiceManager().newThreadPool(this, "NettyTCPWorker",
+ configuration.getCorePoolSize(), configuration.getMaxPoolSize());
channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor);
serverBootstrap = new ServerBootstrap(channelFactory);
@@ -169,8 +167,8 @@ public class NettyConsumer extends Defau
}
private void initializeUDPServerSocketCommunicationLayer() throws Exception {
- ThreadPoolProfile profile = new ThreadPoolBuilder("NettyUDPWorker").poolSize(configuration.getCorePoolSize()).maxPoolSize(configuration.getMaxPoolSize()).build();
- ExecutorService workerExecutor = context.getExecutorServiceManager().getExecutorService(profile, this);
+ ExecutorService workerExecutor = context.getExecutorServiceManager().newThreadPool(this, "NettyUDPWorker",
+ configuration.getCorePoolSize(), configuration.getMaxPoolSize());
datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor);
connectionlessServerBootstrap = new ConnectionlessBootstrap(datagramChannelFactory);
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=1157157&r1=1157156&r2=1157157&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Fri Aug 12 15:32:50 2011
@@ -26,12 +26,10 @@ import org.apache.camel.CamelException;
import org.apache.camel.Exchange;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.ServicePoolAware;
-import org.apache.camel.builder.ThreadPoolBuilder;
import org.apache.camel.converter.IOConverter;
import org.apache.camel.impl.DefaultAsyncProducer;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.processor.CamelLogger;
-import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.util.ExchangeHelper;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
@@ -210,18 +208,18 @@ public class NettyProducer extends Defau
protected void setupTCPCommunication() throws Exception {
if (channelFactory == null) {
- ThreadPoolProfile bossProfile = new ThreadPoolBuilder("NettyTCPBoss").poolSize(configuration.getCorePoolSize()).maxPoolSize(configuration.getMaxPoolSize()).build();
- ThreadPoolProfile workerProfile = new ThreadPoolBuilder("NettyTCPWorker").poolSize(configuration.getCorePoolSize()).maxPoolSize(configuration.getMaxPoolSize()).build();
- ExecutorService bossExecutor = context.getExecutorServiceManager().getExecutorService(bossProfile, this);
- ExecutorService workerExecutor = context.getExecutorServiceManager().getExecutorService(workerProfile, this);
+ ExecutorService bossExecutor = context.getExecutorServiceManager().newThreadPool(this, "NettyTCPBoss",
+ configuration.getCorePoolSize(), configuration.getMaxPoolSize());
+ ExecutorService workerExecutor = context.getExecutorServiceManager().newThreadPool(this, "NettyTCPWorker",
+ configuration.getCorePoolSize(), configuration.getMaxPoolSize());
channelFactory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor);
}
}
protected void setupUDPCommunication() throws Exception {
if (datagramChannelFactory == null) {
- ThreadPoolProfile profile = new ThreadPoolBuilder("NettyUDPWorker").poolSize(configuration.getCorePoolSize()).maxPoolSize(configuration.getMaxPoolSize()).build();
- ExecutorService workerExecutor = context.getExecutorServiceManager().getExecutorService(profile, this);
+ ExecutorService workerExecutor = context.getExecutorServiceManager().newThreadPool(this, "NettyUDPWorker",
+ configuration.getCorePoolSize(), configuration.getMaxPoolSize());
datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor);
}
}
Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java?rev=1157157&r1=1157156&r2=1157157&view=diff
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java (original)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java Fri Aug 12 15:32:50 2011
@@ -26,7 +26,6 @@ import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.ShutdownRunningTask;
-import org.apache.camel.builder.ThreadPoolBuilder;
import org.apache.camel.component.routebox.RouteboxConsumer;
import org.apache.camel.component.routebox.RouteboxServiceSupport;
import org.apache.camel.component.routebox.strategy.RouteboxDispatcher;
@@ -54,8 +53,7 @@ public class RouteboxSedaConsumer extend
// Create a URI link from the primary context to routes in the new inner context
int poolSize = getRouteboxEndpoint().getConfig().getThreads();
- setExecutor(getRouteboxEndpoint().getCamelContext().getExecutorServiceManager()
- .getExecutorService(ThreadPoolBuilder.fixedThreadExecutor(getRouteboxEndpoint().getEndpointUri(), poolSize), this));
+ setExecutor(getRouteboxEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, getRouteboxEndpoint().getEndpointUri(), poolSize));
for (int i = 0; i < poolSize; i++) {
getExecutor().execute(this);
}
Copied: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/CustomExecutorServiceManager.java (from r1156598, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/CustomExecutorServiceStrategy.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/CustomExecutorServiceManager.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/CustomExecutorServiceManager.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/CustomExecutorServiceStrategy.java&r1=1156598&r2=1157157&rev=1157157&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/CustomExecutorServiceStrategy.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/CustomExecutorServiceManager.java Fri Aug 12 15:32:50 2011
@@ -18,14 +18,13 @@ package org.apache.camel.spring.config;
import org.apache.camel.CamelContext;
import org.apache.camel.impl.DefaultExecutorServiceManager;
-import org.apache.camel.impl.DefaultThreadPoolFactory;
/**
* @version
*/
-public class CustomExecutorServiceStrategy extends DefaultExecutorServiceManager {
+public class CustomExecutorServiceManager extends DefaultExecutorServiceManager {
- public CustomExecutorServiceStrategy(CamelContext camelContext) {
- super(camelContext, new DefaultThreadPoolFactory());
+ public CustomExecutorServiceManager(CamelContext camelContext) {
+ super(camelContext);
}
}
Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/CustomExecutorServiceManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/CustomExecutorServiceManager.java
------------------------------------------------------------------------------
svn:keywords = Rev Date