You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/03/12 09:57:30 UTC
svn commit: r922185 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/component/seda/
camel-core/src/main/java/org/apache/camel/impl/
camel-core/src/main/java/org/apache/camel/management/
camel-core/src/main/java/org/apache/camel/model/ came...
Author: davsclaus
Date: Fri Mar 12 08:57:30 2010
New Revision: 922185
URL: http://svn.apache.org/viewvc?rev=922185&view=rev
Log:
CAMEL-1588: Provide source info to executor service strategy so we in the future can have pattern based configuration.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
camel/trunk/components/camel-nagios/src/main/java/org/apache/camel/component/nagios/NagiosProducer.java
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelExecutorServiceFactoryBean.java
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=922185&r1=922184&r2=922185&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Fri Mar 12 08:57:30 2010
@@ -171,7 +171,7 @@ public class SedaConsumer extends Servic
}
ExecutorService multicastExecutor = endpoint.getCamelContext().getExecutorServiceStrategy()
- .newFixedThreadPool(endpoint.getEndpointUri() + "(multicast)", size);
+ .newFixedThreadPool(this, endpoint.getEndpointUri() + "(multicast)", size);
multicast = new MulticastProcessor(endpoint.getCamelContext(), processors, null, true, multicastExecutor, false, false);
}
return multicast;
@@ -180,7 +180,7 @@ public class SedaConsumer extends Servic
protected void doStart() throws Exception {
int poolSize = endpoint.getConcurrentConsumers();
executor = endpoint.getCamelContext().getExecutorServiceStrategy()
- .newFixedThreadPool(endpoint.getEndpointUri(), poolSize);
+ .newFixedThreadPool(this, endpoint.getEndpointUri(), poolSize);
for (int i = 0; i < poolSize; i++) {
executor.execute(this);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java?rev=922185&r1=922184&r2=922185&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java Fri Mar 12 08:57:30 2010
@@ -195,7 +195,7 @@ public abstract class DefaultComponent e
*/
protected ScheduledExecutorService createScheduledExecutorService() {
String name = getClass().getSimpleName();
- return getCamelContext().getExecutorServiceStrategy().newScheduledThreadPool(name, DEFAULT_THREADPOOL_SIZE);
+ return getCamelContext().getExecutorServiceStrategy().newScheduledThreadPool(this, name, DEFAULT_THREADPOOL_SIZE);
}
protected void doStart() throws Exception {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java?rev=922185&r1=922184&r2=922185&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java Fri Mar 12 08:57:30 2010
@@ -196,7 +196,7 @@ public abstract class DefaultEndpoint im
}
protected ScheduledExecutorService createScheduledExecutorService() {
- return getCamelContext().getExecutorServiceStrategy().newScheduledThreadPool(getEndpointUri(), DEFAULT_THREADPOOL_SIZE);
+ return getCamelContext().getExecutorServiceStrategy().newScheduledThreadPool(this, getEndpointUri(), DEFAULT_THREADPOOL_SIZE);
}
public void configureProperties(Map<String, Object> options) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java?rev=922185&r1=922184&r2=922185&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java Fri Mar 12 08:57:30 2010
@@ -49,31 +49,31 @@ public class DefaultExecutorServiceStrat
this.threadNamePattern = threadNamePattern;
}
- public ExecutorService lookup(String executorServiceRef) {
+ public ExecutorService lookup(Object source, String executorServiceRef) {
return camelContext.getRegistry().lookup(executorServiceRef, ExecutorService.class);
}
- public ExecutorService newCachedThreadPool(String name) {
+ public ExecutorService newCachedThreadPool(Object source, String name) {
return ExecutorServiceHelper.newCachedThreadPool(getThreadName(name), true);
}
- public ScheduledExecutorService newScheduledThreadPool(String name, int poolSize) {
+ public ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize) {
return ExecutorServiceHelper.newScheduledThreadPool(poolSize, getThreadName(name), true);
}
- public ExecutorService newFixedThreadPool(String name, int poolSize) {
+ public ExecutorService newFixedThreadPool(Object source, String name, int poolSize) {
return ExecutorServiceHelper.newFixedThreadPool(poolSize, getThreadName(name), true);
}
- public ExecutorService newSingleThreadExecutor(String name) {
+ public ExecutorService newSingleThreadExecutor(Object source, String name) {
return ExecutorServiceHelper.newSingleThreadExecutor(getThreadName(name), true);
}
- public ExecutorService newThreadPool(String name, int corePoolSize, int maxPoolSize) {
+ public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize) {
return ExecutorServiceHelper.newThreadPool(getThreadName(name), corePoolSize, maxPoolSize);
}
- public ExecutorService newThreadPool(String name, int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, boolean daemon) {
+ public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, boolean daemon) {
return ExecutorServiceHelper.newThreadPool(getThreadName(name), corePoolSize, maxPoolSize, keepAliveTime, timeUnit, daemon);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=922185&r1=922184&r2=922185&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java Fri Mar 12 08:57:30 2010
@@ -53,7 +53,7 @@ public class DefaultProducerTemplate ext
public DefaultProducerTemplate(CamelContext context) {
this.context = context;
this.producerCache = new ProducerCache(context);
- this.executor = context.getExecutorServiceStrategy().newCachedThreadPool("ProducerTemplate");
+ this.executor = context.getExecutorServiceStrategy().newCachedThreadPool(this, "ProducerTemplate");
}
public DefaultProducerTemplate(CamelContext context, ExecutorService executor) {
@@ -682,7 +682,7 @@ public class DefaultProducerTemplate ext
super.start();
ServiceHelper.startService(producerCache);
if (executor == null || executor.isShutdown()) {
- executor = context.getExecutorServiceStrategy().newCachedThreadPool("ProducerTemplate");
+ executor = context.getExecutorServiceStrategy().newCachedThreadPool(this, "ProducerTemplate");
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java?rev=922185&r1=922184&r2=922185&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java Fri Mar 12 08:57:30 2010
@@ -233,7 +233,7 @@ public class DefaultShutdownStrategy ext
private ExecutorService getExecutorService() {
if (executor == null) {
- executor = camelContext.getExecutorServiceStrategy().newSingleThreadExecutor("ShutdownTask");
+ executor = camelContext.getExecutorServiceStrategy().newSingleThreadExecutor(this, "ShutdownTask");
}
return executor;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=922185&r1=922184&r2=922185&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Fri Mar 12 08:57:30 2010
@@ -58,7 +58,7 @@ public abstract class ScheduledPollConsu
scheduled = (ScheduledExecutorService) service;
} else {
scheduled = endpoint.getCamelContext().getExecutorServiceStrategy()
- .newScheduledThreadPool(getEndpoint().getEndpointUri(), DEFAULT_THREADPOOL_SIZE);
+ .newScheduledThreadPool(this, getEndpoint().getEndpointUri(), DEFAULT_THREADPOOL_SIZE);
}
this.executor = scheduled;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java?rev=922185&r1=922184&r2=922185&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java Fri Mar 12 08:57:30 2010
@@ -412,7 +412,7 @@ public class DefaultManagementAgent exte
if (executorService == null) {
// we only need a single for the JMX connector
- executorService = camelContext.getExecutorServiceStrategy().newSingleThreadExecutor("JMXConnector: " + url);
+ executorService = camelContext.getExecutorServiceStrategy().newSingleThreadExecutor(this, "JMXConnector: " + url);
}
// execute the JMX connector
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java?rev=922185&r1=922184&r2=922185&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java Fri Mar 12 08:57:30 2010
@@ -101,7 +101,7 @@ public class RecipientListDefinition ext
executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
if (executorService == null) {
// fallback to create a new executor
- executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("RecipientList");
+ executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, "RecipientList");
}
answer.setExecutorService(executorService);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java?rev=922185&r1=922184&r2=922185&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java Fri Mar 12 08:57:30 2010
@@ -91,7 +91,7 @@ public class SplitDefinition extends Exp
executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
if (executorService == null) {
// fallback to create a new executor
- executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("Split");
+ executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, "Split");
}
Expression exp = getExpression().createExpression(routeContext);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=922185&r1=922184&r2=922185&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java Fri Mar 12 08:57:30 2010
@@ -69,12 +69,12 @@ public class ThreadsDefinition extends O
String name = getThreadName() != null ? getThreadName() : "Threads";
if (poolSize == null || poolSize <= 0) {
// use the cached thread pool
- executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(name);
+ executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, name);
} else {
// use a custom pool based on the settings
int max = getMaxPoolSize() != null ? getMaxPoolSize() : poolSize;
executorService = routeContext.getCamelContext().getExecutorServiceStrategy()
- .newThreadPool(name, poolSize, max, getKeepAliveTime(), getUnits(), true);
+ .newThreadPool(this, name, poolSize, max, getKeepAliveTime(), getUnits(), true);
}
}
Processor childProcessor = routeContext.createProcessor(this);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java?rev=922185&r1=922184&r2=922185&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java Fri Mar 12 08:57:30 2010
@@ -94,7 +94,7 @@ public class ToDefinition extends SendDe
if (executorService == null && poolSize != null) {
// crete a new based on the other options
executorService = routeContext.getCamelContext().getExecutorServiceStrategy()
- .newThreadPool("ToAsync[" + getLabel() + "]", poolSize, poolSize);
+ .newThreadPool(this, "ToAsync[" + getLabel() + "]", poolSize, poolSize);
}
// create the child processor which is the async route
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=922185&r1=922184&r2=922185&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Fri Mar 12 08:57:30 2010
@@ -128,7 +128,7 @@ public class MulticastProcessor extends
this.stopOnException = stopOnException;
if (isParallelProcessing() && getExecutorService() == null) {
- this.executorService = camelContext.getExecutorServiceStrategy().newCachedThreadPool("Multicast");
+ this.executorService = camelContext.getExecutorServiceStrategy().newCachedThreadPool(this, "Multicast");
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java?rev=922185&r1=922184&r2=922185&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java Fri Mar 12 08:57:30 2010
@@ -182,7 +182,7 @@ public class OnCompletionProcessor exten
}
protected ExecutorService createExecutorService() {
- return camelContext.getExecutorServiceStrategy().newCachedThreadPool(this.toString());
+ return camelContext.getExecutorServiceStrategy().newCachedThreadPool(this, this.toString());
}
public void setExecutorService(ExecutorService executorService) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java?rev=922185&r1=922184&r2=922185&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java Fri Mar 12 08:57:30 2010
@@ -165,7 +165,7 @@ public class SendAsyncProcessor extends
public ExecutorService getExecutorService() {
if (executorService == null) {
executorService = destination.getCamelContext().getExecutorServiceStrategy()
- .newThreadPool("SendAsyncProcessor-Consumer", poolSize, poolSize);
+ .newThreadPool(this, "SendAsyncProcessor-Consumer", poolSize, poolSize);
}
return executorService;
}
@@ -183,7 +183,7 @@ public class SendAsyncProcessor extends
if (producerExecutorService == null) {
// use a cached pool for the producers which can grow/schrink itself
producerExecutorService = destination.getCamelContext().getExecutorServiceStrategy()
- .newCachedThreadPool("SendAsyncProcessor-Producer");
+ .newCachedThreadPool(this, "SendAsyncProcessor-Producer");
}
return producerExecutorService;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java?rev=922185&r1=922184&r2=922185&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java Fri Mar 12 08:57:30 2010
@@ -103,7 +103,7 @@ public class ThreadsProcessor extends De
public ExecutorService getExecutorService() {
if (executorService == null) {
- executorService = camelContext.getExecutorServiceStrategy().newCachedThreadPool("Threads");
+ executorService = camelContext.getExecutorServiceStrategy().newCachedThreadPool(this, "Threads");
}
return executorService;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java?rev=922185&r1=922184&r2=922185&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java Fri Mar 12 08:57:30 2010
@@ -160,7 +160,7 @@ public class WireTapProcessor extends Se
}
protected ExecutorService createExecutorService() {
- return getDestination().getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this.toString());
+ return getDestination().getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, this.toString());
}
public void setExecutorService(ExecutorService executorService) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=922185&r1=922184&r2=922185&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Fri Mar 12 08:57:30 2010
@@ -475,16 +475,16 @@ public class AggregateProcessor extends
if (executorService == null) {
if (isParallelProcessing()) {
// we are running in parallel so create a cached thread pool which grows/shrinks automatic
- executorService = camelContext.getExecutorServiceStrategy().newCachedThreadPool("Aggregator");
+ executorService = camelContext.getExecutorServiceStrategy().newCachedThreadPool(this, "Aggregator");
} else {
// use a single threaded if we are not running in parallel
- executorService = camelContext.getExecutorServiceStrategy().newSingleThreadExecutor("Aggregator");
+ executorService = camelContext.getExecutorServiceStrategy().newSingleThreadExecutor(this, "Aggregator");
}
}
// start timeout service if its in use
if (getCompletionTimeout() > 0 || getCompletionTimeoutExpression() != null) {
- ScheduledExecutorService scheduler = camelContext.getExecutorServiceStrategy().newScheduledThreadPool("AggregateTimeoutChecker", 1);
+ ScheduledExecutorService scheduler = camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, "AggregateTimeoutChecker", 1);
// check for timed out aggregated messages once every second
timeoutMap = new AggregationTimeoutMap(scheduler, 1000L);
ServiceHelper.startService(timeoutMap);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java?rev=922185&r1=922184&r2=922185&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java Fri Mar 12 08:57:30 2010
@@ -67,60 +67,67 @@ public interface ExecutorServiceStrategy
/**
* Lookup a {@link java.util.concurrent.ExecutorService} from the {@link org.apache.camel.spi.Registry}.
*
- * @param executorServiceRef reference to lookup
+ * @param source the source object, usually it should be <tt>this</tt> passed in as parameter
+ * @param executorServiceRef reference to lookup
* @return the {@link java.util.concurrent.ExecutorService} or <tt>null</tt> if not found
*/
- ExecutorService lookup(String executorServiceRef);
+ ExecutorService lookup(Object source, String executorServiceRef);
/**
* Creates a new cached thread pool.
*
- * @param name name which is appended to the thread name
+ * @param source the source object, usually it should be <tt>this</tt> passed in as parameter
+ * @param name name which is appended to the thread name
* @return the thread pool
*/
- ExecutorService newCachedThreadPool(String name);
+ ExecutorService newCachedThreadPool(Object source, String name);
/**
* Creates a new scheduled thread pool.
*
+ * @param source the source object, usually it should be <tt>this</tt> passed in as parameter
* @param name name which is appended to the thread name
* @param poolSize the core pool size
* @return the thread pool
*/
- ScheduledExecutorService newScheduledThreadPool(String name, int poolSize);
+ ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize);
/**
* Creates a new fixed thread pool.
*
+ * @param source the source object, usually it should be <tt>this</tt> passed in as parameter
* @param name name which is appended to the thread name
* @param poolSize the core pool size
* @return the thread pool
*/
- ExecutorService newFixedThreadPool(String name, int poolSize);
+ ExecutorService newFixedThreadPool(Object source, String name, int poolSize);
/**
* Creates a new single-threaded thread pool. This is often used for background threads.
*
- * @param name name which is appended to the thread name
+ * @param source the source object, usually it should be <tt>this</tt> passed in as parameter
+ * @param name name which is appended to the thread name
* @return the thread pool
*/
- ExecutorService newSingleThreadExecutor(String name);
+ ExecutorService newSingleThreadExecutor(Object source, String name);
/**
* Creates a new custom thread pool.
* <p/>
* Will by default use 60 seconds for keep alive time for idle threads.
*
+ * @param source the source object, usually it should be <tt>this</tt> passed in as parameter
* @param name name which is appended to the thread name
* @param corePoolSize the core pool size
* @param maxPoolSize the maximum pool size
* @return the thread pool
*/
- ExecutorService newThreadPool(String name, int corePoolSize, int maxPoolSize);
+ ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize);
/**
* Creates a new custom thread pool.
*
+ * @param source the source object, usually it should be <tt>this</tt> passed in as parameter
* @param name name which is appended to the thread name
* @param corePoolSize the core pool size
* @param maxPoolSize the maximum pool size
@@ -129,7 +136,7 @@ public interface ExecutorServiceStrategy
* @param daemon whether or not the created threads is daemon or not
* @return the thread pool
*/
- ExecutorService newThreadPool(final String name, int corePoolSize, int maxPoolSize,
+ ExecutorService newThreadPool(Object source, final String name, int corePoolSize, int maxPoolSize,
long keepAliveTime, TimeUnit timeUnit, boolean daemon);
/**
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=922185&r1=922184&r2=922185&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Fri Mar 12 08:57:30 2010
@@ -194,7 +194,7 @@ public final class ExecutorServiceHelper
if (definition.getExecutorService() != null) {
return definition.getExecutorService();
} else if (definition.getExecutorServiceRef() != null) {
- ExecutorService answer = strategy.lookup(definition.getExecutorServiceRef());
+ ExecutorService answer = strategy.lookup(definition, definition.getExecutorServiceRef());
if (answer == null) {
throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " not found in registry.");
}
Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java?rev=922185&r1=922184&r2=922185&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java Fri Mar 12 08:57:30 2010
@@ -165,9 +165,9 @@ public class MinaComponent extends Defau
final int processorCount = Runtime.getRuntime().availableProcessors() + 1;
IoAcceptor acceptor = new SocketAcceptor(processorCount,
- getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("MinaSocketAcceptor"));
+ getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, "MinaSocketAcceptor"));
IoConnector connector = new SocketConnector(processorCount,
- getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("MinaSocketConnector"));
+ getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, "MinaSocketConnector"));
SocketAddress address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
// connector config
@@ -176,7 +176,7 @@ public class MinaComponent extends Defau
connectorConfig.setThreadModel(ThreadModel.MANUAL);
configureCodecFactory("MinaProducer", connectorConfig, configuration);
connectorConfig.getFilterChain().addLast("threadPool",
- new ExecutorFilter(getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("MinaThreadPool")));
+ new ExecutorFilter(getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, "MinaThreadPool")));
if (minaLogger) {
connectorConfig.getFilterChain().addLast("logger", new LoggingFilter());
}
@@ -193,7 +193,7 @@ public class MinaComponent extends Defau
acceptorConfig.setReuseAddress(true);
acceptorConfig.setDisconnectOnUnbind(true);
acceptorConfig.getFilterChain().addLast("threadPool",
- new ExecutorFilter(getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("MinaThreadPool")));
+ new ExecutorFilter(getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, "MinaThreadPool")));
if (minaLogger) {
acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
}
@@ -261,8 +261,8 @@ public class MinaComponent extends Defau
boolean sync = configuration.isSync();
List<IoFilter> filters = configuration.getFilters();
- IoAcceptor acceptor = new DatagramAcceptor(getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("MinaDatagramAcceptor"));
- IoConnector connector = new DatagramConnector(getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("MinaDatagramConnector"));
+ IoAcceptor acceptor = new DatagramAcceptor(getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, "MinaDatagramAcceptor"));
+ IoConnector connector = new DatagramConnector(getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, "MinaDatagramConnector"));
SocketAddress address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
if (transferExchange) {
@@ -274,7 +274,7 @@ public class MinaComponent extends Defau
connectorConfig.setThreadModel(ThreadModel.MANUAL);
configureDataGramCodecFactory("MinaProducer", connectorConfig, configuration);
connectorConfig.getFilterChain().addLast("threadPool",
- new ExecutorFilter(getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("MinaThreadPool")));
+ new ExecutorFilter(getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, "MinaThreadPool")));
if (minaLogger) {
connectorConfig.getFilterChain().addLast("logger", new LoggingFilter());
}
@@ -289,7 +289,7 @@ public class MinaComponent extends Defau
acceptorConfig.setDisconnectOnUnbind(true);
// reuse address is default true for datagram
acceptorConfig.getFilterChain().addLast("threadPool",
- new ExecutorFilter(getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("MinaThreadPool")));
+ new ExecutorFilter(getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, "MinaThreadPool")));
if (minaLogger) {
acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
}
Modified: camel/trunk/components/camel-nagios/src/main/java/org/apache/camel/component/nagios/NagiosProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-nagios/src/main/java/org/apache/camel/component/nagios/NagiosProducer.java?rev=922185&r1=922184&r2=922185&view=diff
==============================================================================
--- camel/trunk/components/camel-nagios/src/main/java/org/apache/camel/component/nagios/NagiosProducer.java (original)
+++ camel/trunk/components/camel-nagios/src/main/java/org/apache/camel/component/nagios/NagiosProducer.java Fri Mar 12 08:57:30 2010
@@ -69,7 +69,7 @@ public class NagiosProducer extends Defa
if (sender instanceof NonBlockingNagiosPassiveCheckSender) {
NonBlockingNagiosPassiveCheckSender nonBlocking = (NonBlockingNagiosPassiveCheckSender) sender;
ExecutorService executor = getEndpoint().getCamelContext().getExecutorServiceStrategy()
- .newSingleThreadExecutor(getEndpoint().getEndpointUri());
+ .newSingleThreadExecutor(this, getEndpoint().getEndpointUri());
nonBlocking.setExecutor(executor);
}
super.doStart();
Modified: camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelExecutorServiceFactoryBean.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelExecutorServiceFactoryBean.java?rev=922185&r1=922184&r2=922185&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelExecutorServiceFactoryBean.java (original)
+++ camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelExecutorServiceFactoryBean.java Fri Mar 12 08:57:30 2010
@@ -76,12 +76,12 @@ public class CamelExecutorServiceFactory
ExecutorService answer;
if (getPoolSize() == null || getPoolSize() <= 0) {
// use the cached thread pool
- answer = camelContext.getExecutorServiceStrategy().newCachedThreadPool(name);
+ answer = camelContext.getExecutorServiceStrategy().newCachedThreadPool(getId(), name);
} else {
// use a custom pool based on the settings
int max = getMaxPoolSize() != null ? getMaxPoolSize() : getPoolSize();
answer = camelContext.getExecutorServiceStrategy()
- .newThreadPool(name, getPoolSize(), max, getKeepAliveTime(), getUnits(), isDeamon());
+ .newThreadPool(getId(), name, getPoolSize(), max, getKeepAliveTime(), getUnits(), isDeamon());
}
return answer;
}
Modified: camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java?rev=922185&r1=922184&r2=922185&view=diff
==============================================================================
--- camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java (original)
+++ camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java Fri Mar 12 08:57:30 2010
@@ -72,7 +72,7 @@ public class StreamConsumer extends Defa
inputStream = resolveStreamFromUrl();
}
- executor = endpoint.getCamelContext().getExecutorServiceStrategy().newSingleThreadExecutor(endpoint.getEndpointUri());
+ executor = endpoint.getCamelContext().getExecutorServiceStrategy().newSingleThreadExecutor(this, endpoint.getEndpointUri());
executor.execute(this);
}