You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/03/06 08:45:47 UTC
svn commit: r919711 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/component/seda/
camel-core/src/main/java/org/apache/camel/impl/
camel-core/src/main/java/org/apache/camel/management/ camel-core...
Author: davsclaus
Date: Sat Mar 6 07:45:46 2010
New Revision: 919711
URL: http://svn.apache.org/viewvc?rev=919711&view=rev
Log:
CAMEL-1437: Added SPI ExecutorServiceStrategy to abstrct how to use thread pools with Camel.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java (with props)
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java (with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/InterceptDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherRefTest.java
camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
camel/trunk/components/camel-nagios/src/main/java/org/apache/camel/component/nagios/NagiosProducer.java
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelExecutorServiceFactoryBean.java
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/util/CamelContextResolverHelper.java
camel/trunk/components/camel-spring/src/test/resources/log4j.properties
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java Sat Mar 6 07:45:46 2010
@@ -29,6 +29,7 @@
import org.apache.camel.spi.DataFormat;
import org.apache.camel.spi.DataFormatResolver;
import org.apache.camel.spi.EndpointStrategy;
+import org.apache.camel.spi.ExecutorServiceStrategy;
import org.apache.camel.spi.FactoryFinder;
import org.apache.camel.spi.FactoryFinderResolver;
import org.apache.camel.spi.InflightRepository;
@@ -704,4 +705,18 @@
*/
void setShutdownStrategy(ShutdownStrategy shutdownStrategy);
+ /**
+ * Gets the current {@link org.apache.camel.spi.ExecutorServiceStrategy}
+ *
+ * @return the strategy
+ */
+ ExecutorServiceStrategy getExecutorServiceStrategy();
+
+ /**
+ * Sets a custom {@link org.apache.camel.spi.ExecutorServiceStrategy}
+ *
+ * @param executorServiceStrategy the custom strategy
+ */
+ void setExecutorServiceStrategy(ExecutorServiceStrategy executorServiceStrategy);
+
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Sat Mar 6 07:45:46 2010
@@ -33,7 +33,6 @@
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.ShutdownAware;
import org.apache.camel.util.ServiceHelper;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -171,15 +170,17 @@
processors.add(consumer.getProcessor());
}
- ExecutorService multicastExecutor = ExecutorServiceHelper.newFixedThreadPool(size, endpoint.getEndpointUri() + "(multicast)", true);
- multicast = new MulticastProcessor(processors, null, true, multicastExecutor, false, false);
+ ExecutorService multicastExecutor = endpoint.getCamelContext().getExecutorServiceStrategy()
+ .newFixedThreadPool(endpoint.getEndpointUri() + "(multicast)", size);
+ multicast = new MulticastProcessor(endpoint.getCamelContext(), processors, null, true, multicastExecutor, false, false);
}
return multicast;
}
protected void doStart() throws Exception {
int poolSize = endpoint.getConcurrentConsumers();
- executor = ExecutorServiceHelper.newFixedThreadPool(poolSize, endpoint.getEndpointUri(), true);
+ executor = endpoint.getCamelContext().getExecutorServiceStrategy()
+ .newFixedThreadPool(endpoint.getEndpointUri(), poolSize);
for (int i = 0; i < poolSize; i++) {
executor.execute(this);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Sat Mar 6 07:45:46 2010
@@ -71,6 +71,7 @@
import org.apache.camel.spi.DataFormat;
import org.apache.camel.spi.DataFormatResolver;
import org.apache.camel.spi.EndpointStrategy;
+import org.apache.camel.spi.ExecutorServiceStrategy;
import org.apache.camel.spi.FactoryFinder;
import org.apache.camel.spi.FactoryFinderResolver;
import org.apache.camel.spi.InflightRepository;
@@ -154,9 +155,10 @@
private InflightRepository inflightRepository = new DefaultInflightRepository();
private final List<RouteStartupOrder> routeStartupOrder = new ArrayList<RouteStartupOrder>();
private int defaultRouteStartupOrder = 1000;
- private ShutdownStrategy shutdownStrategy = new DefaultShutdownStrategy();
+ private ShutdownStrategy shutdownStrategy = new DefaultShutdownStrategy(this);
private ShutdownRoute shutdownRoute = ShutdownRoute.Default;
private ShutdownRunningTask shutdownRunningTask = ShutdownRunningTask.CompleteCurrentTaskOnly;
+ private ExecutorServiceStrategy executorServiceStrategy = new DefaultExecutorServiceStrategy(this);
public DefaultCamelContext() {
super();
@@ -1524,6 +1526,14 @@
this.shutdownRunningTask = shutdownRunningTask;
}
+ public ExecutorServiceStrategy getExecutorServiceStrategy() {
+ return executorServiceStrategy;
+ }
+
+ public void setExecutorServiceStrategy(ExecutorServiceStrategy executorServiceStrategy) {
+ this.executorServiceStrategy = executorServiceStrategy;
+ }
+
protected String getEndpointKey(String uri, Endpoint endpoint) {
if (endpoint.isSingleton()) {
return uri;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java Sat Mar 6 07:45:46 2010
@@ -33,7 +33,6 @@
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.URISupport;
import org.apache.camel.util.UnsafeUriCharactersEncoder;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -196,7 +195,7 @@
*/
protected ScheduledExecutorService createScheduledExecutorService() {
String name = getClass().getSimpleName();
- return ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, name, true);
+ return getCamelContext().getExecutorServiceStrategy().newScheduledThreadPool(name, DEFAULT_THREADPOOL_SIZE);
}
protected void doStart() throws Exception {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java Sat Mar 6 07:45:46 2010
@@ -30,7 +30,6 @@
import org.apache.camel.ExchangePattern;
import org.apache.camel.PollingConsumer;
import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
/**
* A default endpoint useful for implementation inheritance
@@ -197,7 +196,7 @@
}
protected ScheduledExecutorService createScheduledExecutorService() {
- return ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, getEndpointUri(), true);
+ return getCamelContext().getExecutorServiceStrategy().newScheduledThreadPool(getEndpointUri(), DEFAULT_THREADPOOL_SIZE);
}
public void configureProperties(Map<String, Object> options) {
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java?rev=919711&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java Sat Mar 6 07:45:46 2010
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.ExecutorServiceStrategy;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
+
+/**
+ * @version $Revision$
+ */
+public class DefaultExecutorServiceStrategy implements ExecutorServiceStrategy {
+
+ private final CamelContext camelContext;
+
+ public DefaultExecutorServiceStrategy(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ public ExecutorService lookup(String executorServiceRef) {
+ return camelContext.getRegistry().lookup(executorServiceRef, ExecutorService.class);
+ }
+
+ public ExecutorService newCachedThreadPool(String name) {
+ return ExecutorServiceHelper.newCachedThreadPool(name, true);
+ }
+
+ public ScheduledExecutorService newScheduledThreadPool(String name, int poolSize) {
+ return ExecutorServiceHelper.newScheduledThreadPool(poolSize, name, true);
+ }
+
+ public ExecutorService newFixedThreadPool(String name, int poolSize) {
+ return ExecutorServiceHelper.newFixedThreadPool(poolSize, name, true);
+ }
+
+ public ExecutorService newSingleThreadExecutor(String name) {
+ return ExecutorServiceHelper.newSingleThreadExecutor(name, true);
+ }
+
+ public ExecutorService newThreadPool(String name, int corePoolSize, int maxPoolSize) {
+ return ExecutorServiceHelper.newThreadPool(name, corePoolSize, maxPoolSize);
+ }
+
+ public ExecutorService newThreadPool(String name, int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, boolean daemon) {
+ return ExecutorServiceHelper.newThreadPool(name, corePoolSize, maxPoolSize, keepAliveTime, timeUnit, daemon);
+ }
+
+ public void shutdown(ExecutorService executorService) {
+ executorService.shutdown();
+ }
+
+ public List<Runnable> shutdownNow(ExecutorService executorService) {
+ return executorService.shutdownNow();
+ }
+
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java Sat Mar 6 07:45:46 2010
@@ -36,7 +36,6 @@
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
/**
* A client helper object (named like Spring's TransactionTemplate & JmsTemplate
@@ -54,7 +53,7 @@
public DefaultProducerTemplate(CamelContext context) {
this.context = context;
this.producerCache = new ProducerCache(context);
- this.executor = ExecutorServiceHelper.newCachedThreadPool("ProducerTemplate", true);
+ this.executor = context.getExecutorServiceStrategy().newCachedThreadPool("ProducerTemplate");
}
public DefaultProducerTemplate(CamelContext context, ExecutorService executor) {
@@ -683,7 +682,7 @@
super.start();
ServiceHelper.startService(producerCache);
if (executor == null || executor.isShutdown()) {
- executor = ExecutorServiceHelper.newCachedThreadPool("ProducerTemplate", true);
+ executor = context.getExecutorServiceStrategy().newCachedThreadPool("ProducerTemplate");
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java Sat Mar 6 07:45:46 2010
@@ -145,7 +145,7 @@
public void commit() {
// now lets turn all of the event driven consumer processors into a single route
if (!eventDrivenProcessors.isEmpty()) {
- Processor processor = Pipeline.newInstance(eventDrivenProcessors);
+ Processor processor = Pipeline.newInstance(getCamelContext(), eventDrivenProcessors);
// and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW
Processor unitOfWorkProcessor = new UnitOfWorkProcessor(this, processor);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java Sat Mar 6 07:45:46 2010
@@ -25,6 +25,7 @@
import java.util.concurrent.TimeoutException;
import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
import org.apache.camel.Consumer;
import org.apache.camel.Route;
import org.apache.camel.ShutdownRoute;
@@ -36,7 +37,6 @@
import org.apache.camel.util.EventHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -57,14 +57,22 @@
*
* @version $Revision$
*/
-public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownStrategy {
+public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownStrategy, CamelContextAware {
private static final transient Log LOG = LogFactory.getLog(DefaultShutdownStrategy.class);
+ private CamelContext camelContext;
private ExecutorService executor;
private long timeout = 5 * 60;
private TimeUnit timeUnit = TimeUnit.SECONDS;
private boolean shutdownNowOnTimeout = true;
+ public DefaultShutdownStrategy() {
+ }
+
+ public DefaultShutdownStrategy(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
public void shutdown(CamelContext context, List<RouteStartupOrder> routes) throws Exception {
shutdown(context, routes, getTimeout(), getTimeUnit());
}
@@ -133,6 +141,14 @@
return shutdownNowOnTimeout;
}
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
/**
* Shutdown all the consumers immediately.
*
@@ -217,13 +233,14 @@
private ExecutorService getExecutorService() {
if (executor == null) {
- executor = ExecutorServiceHelper.newSingleThreadExecutor("ShutdownTask", true);
+ executor = camelContext.getExecutorServiceStrategy().newSingleThreadExecutor("ShutdownTask");
}
return executor;
}
@Override
protected void doStart() throws Exception {
+ ObjectHelper.notNull(camelContext, "CamelContext must be set");
}
@Override
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Sat Mar 6 07:45:46 2010
@@ -26,7 +26,6 @@
import org.apache.camel.SuspendableService;
import org.apache.camel.spi.PollingConsumerPollStrategy;
import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -58,7 +57,8 @@
if (service instanceof ScheduledExecutorService) {
scheduled = (ScheduledExecutorService) service;
} else {
- scheduled = ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, getEndpoint().getEndpointUri(), true);
+ scheduled = endpoint.getCamelContext().getExecutorServiceStrategy()
+ .newScheduledThreadPool(getEndpoint().getEndpointUri(), DEFAULT_THREADPOOL_SIZE);
}
this.executor = scheduled;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java Sat Mar 6 07:45:46 2010
@@ -393,6 +393,7 @@
if (executorService == null) {
// we only need a single for the JMX connector
+ // TODO use ExecutorServiceStrategy
executorService = ExecutorServiceHelper.newSingleThreadExecutor("JMXConnector: " + url, true);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Sat Mar 6 07:45:46 2010
@@ -148,7 +148,7 @@
Expression correlation = getExpression().createExpression(routeContext);
AggregationStrategy strategy = createAggregationStrategy(routeContext);
- AggregateProcessor answer = new AggregateProcessor(processor, correlation, strategy);
+ AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(), processor, correlation, strategy);
ExecutorService executor = createExecutorService(routeContext);
answer.setExecutorService(executor);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/InterceptDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/InterceptDefinition.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/InterceptDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/InterceptDefinition.java Sat Mar 6 07:45:46 2010
@@ -92,7 +92,7 @@
List<Processor> list = new ArrayList<Processor>(2);
list.add(output);
list.add(interceptedTarget);
- return new Pipeline(list);
+ return new Pipeline(context, list);
} else {
return output;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java Sat Mar 6 07:45:46 2010
@@ -153,8 +153,8 @@
}
}
- return new MulticastProcessor(list, aggregationStrategy, isParallelProcessing(), executorService,
- isStreaming(), isStopOnException());
+ return new MulticastProcessor(routeContext.getCamelContext(), list, aggregationStrategy, isParallelProcessing(),
+ executorService, isStreaming(), isStopOnException());
}
public AggregationStrategy getAggregationStrategy() {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java Sat Mar 6 07:45:46 2010
@@ -103,7 +103,8 @@
}
}
- OnCompletionProcessor answer = new OnCompletionProcessor(childProcessor, onCompleteOnly, onFailureOnly, when);
+ OnCompletionProcessor answer = new OnCompletionProcessor(routeContext.getCamelContext(), childProcessor,
+ onCompleteOnly, onFailureOnly, when);
answer.setExecutorService(executorService);
return answer;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Sat Mar 6 07:45:46 2010
@@ -282,7 +282,7 @@
* to using a {@link Pipeline} but derived classes could change the behaviour
*/
protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) throws Exception {
- return new Pipeline(list);
+ return new Pipeline(routeContext.getCamelContext(), list);
}
/**
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java Sat Mar 6 07:45:46 2010
@@ -32,7 +32,6 @@
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
import org.apache.camel.spi.RouteContext;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
/**
* Represents an XML <recipientList/> element
@@ -123,7 +122,7 @@
}
if (executorService == null) {
// fall back and use default
- executorService = ExecutorServiceHelper.newCachedThreadPool("RecipientList", true);
+ executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("RecipientList");
}
return executorService;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java Sat Mar 6 07:45:46 2010
@@ -31,7 +31,6 @@
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
import org.apache.camel.spi.RouteContext;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
/**
* Represents an XML <split/> element
@@ -87,8 +86,9 @@
Processor childProcessor = routeContext.createProcessor(this);
aggregationStrategy = createAggregationStrategy(routeContext);
executorService = createExecutorService(routeContext);
- return new Splitter(getExpression().createExpression(routeContext), childProcessor, aggregationStrategy,
- isParallelProcessing(), executorService, isStreaming(), isStopOnException());
+ Expression exp = getExpression().createExpression(routeContext);
+ return new Splitter(routeContext.getCamelContext(), exp, childProcessor, aggregationStrategy,
+ isParallelProcessing(), executorService, isStreaming(), isStopOnException());
}
@@ -113,7 +113,7 @@
}
if (executorService == null) {
// fall back and use default
- executorService = ExecutorServiceHelper.newCachedThreadPool("Split", true);
+ executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("Split");
}
return executorService;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java Sat Mar 6 07:45:46 2010
@@ -31,7 +31,6 @@
import org.apache.camel.processor.ThreadsProcessor;
import org.apache.camel.processor.UnitOfWorkProcessor;
import org.apache.camel.spi.RouteContext;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
/**
* Represents an XML <threads/> element
@@ -73,11 +72,12 @@
String name = getThreadName() != null ? getThreadName() : "Threads";
if (poolSize == null || poolSize <= 0) {
// use the cached thread pool
- executorService = ExecutorServiceHelper.newCachedThreadPool(name, true);
+ executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(name);
} else {
// use a custom pool based on the settings
int max = getMaxPoolSize() != null ? getMaxPoolSize() : poolSize;
- executorService = ExecutorServiceHelper.newThreadPool(name, poolSize, max, getKeepAliveTime(), getUnits(), true);
+ executorService = routeContext.getCamelContext().getExecutorServiceStrategy()
+ .newThreadPool(name, poolSize, max, getKeepAliveTime(), getUnits(), true);
}
}
Processor childProcessor = routeContext.createProcessor(this);
@@ -85,7 +85,7 @@
// wrap it in a unit of work so the route that comes next is also done in a unit of work
UnitOfWorkProcessor uow = new UnitOfWorkProcessor(routeContext, childProcessor);
- return new ThreadsProcessor(uow, executorService, waitForTaskToComplete);
+ return new ThreadsProcessor(routeContext.getCamelContext(), uow, executorService, waitForTaskToComplete);
}
@Override
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java Sat Mar 6 07:45:46 2010
@@ -31,7 +31,6 @@
import org.apache.camel.processor.SendAsyncProcessor;
import org.apache.camel.processor.UnitOfWorkProcessor;
import org.apache.camel.spi.RouteContext;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
/**
* Represents an XML <to/> element
@@ -97,7 +96,8 @@
}
}
if (executorService == null && poolSize != null) {
- executorService = ExecutorServiceHelper.newThreadPool("ToAsync[" + getLabel() + "]", poolSize, poolSize);
+ executorService = routeContext.getCamelContext().getExecutorServiceStrategy()
+ .newThreadPool("ToAsync[" + getLabel() + "]", poolSize, poolSize);
}
// create the child processor which is the async route
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Sat Mar 6 07:45:46 2010
@@ -29,6 +29,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.camel.CamelContext;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
@@ -45,7 +46,6 @@
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
import org.apache.camel.util.concurrent.AtomicExchange;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
import org.apache.camel.util.concurrent.SubmitOrderedCompletionService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -99,6 +99,7 @@
}
}
+ private final CamelContext camelContext;
private Collection<Processor> processors;
private final AggregationStrategy aggregationStrategy;
private final boolean isParallelProcessing;
@@ -106,17 +107,19 @@
private final boolean stopOnException;
private ExecutorService executorService;
- public MulticastProcessor(Collection<Processor> processors) {
- this(processors, null);
+ public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors) {
+ this(camelContext, processors, null);
}
- public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy) {
- this(processors, aggregationStrategy, false, null, false, false);
+ public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy) {
+ this(camelContext,processors, aggregationStrategy, false, null, false, false);
}
- public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy,
+ public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy,
boolean parallelProcessing, ExecutorService executorService, boolean streaming, boolean stopOnException) {
+ notNull(camelContext, "camelContext");
notNull(processors, "processors");
+ this.camelContext = camelContext;
this.processors = processors;
this.aggregationStrategy = aggregationStrategy;
this.isParallelProcessing = parallelProcessing;
@@ -125,7 +128,7 @@
this.stopOnException = stopOnException;
if (isParallelProcessing() && getExecutorService() == null) {
- this.executorService = ExecutorServiceHelper.newCachedThreadPool("Multicast", true);
+ this.executorService = camelContext.getExecutorServiceStrategy().newCachedThreadPool("Multicast");
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java Sat Mar 6 07:45:46 2010
@@ -19,6 +19,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Predicate;
@@ -27,23 +28,28 @@
import org.apache.camel.impl.SynchronizationAdapter;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ServiceHelper;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import static org.apache.camel.util.ObjectHelper.notNull;
+
/**
* @version $Revision$
*/
public class OnCompletionProcessor extends ServiceSupport implements Processor, Traceable {
private static final transient Log LOG = LogFactory.getLog(OnCompletionProcessor.class);
+ private final CamelContext camelContext;
+ private final Processor processor;
private ExecutorService executorService;
- private Processor processor;
private boolean onCompleteOnly;
private boolean onFailureOnly;
private Predicate onWhen;
- public OnCompletionProcessor(Processor processor, boolean onCompleteOnly, boolean onFailureOnly, Predicate onWhen) {
+ public OnCompletionProcessor(CamelContext camelContext, Processor processor, boolean onCompleteOnly, boolean onFailureOnly, Predicate onWhen) {
+ notNull(camelContext, "camelContext");
+ notNull(processor, "processor");
+ this.camelContext = camelContext;
// wrap processor in UnitOfWork so what we send out runs in a UoW
this.processor = new UnitOfWorkProcessor(processor);
this.onCompleteOnly = onCompleteOnly;
@@ -176,7 +182,7 @@
}
protected ExecutorService createExecutorService() {
- return ExecutorServiceHelper.newCachedThreadPool(this.toString(), true);
+ return camelContext.getExecutorServiceStrategy().newCachedThreadPool(this.toString());
}
public void setExecutorService(ExecutorService executorService) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Sat Mar 6 07:45:46 2010
@@ -20,6 +20,7 @@
import java.util.Iterator;
import java.util.List;
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultExchange;
@@ -36,17 +37,17 @@
public class Pipeline extends MulticastProcessor implements Processor, Traceable {
private static final transient Log LOG = LogFactory.getLog(Pipeline.class);
- public Pipeline(Collection<Processor> processors) {
- super(processors);
+ public Pipeline(CamelContext camelContext, Collection<Processor> processors) {
+ super(camelContext, processors);
}
- public static Processor newInstance(List<Processor> processors) {
+ public static Processor newInstance(CamelContext camelContext, List<Processor> processors) {
if (processors.isEmpty()) {
return null;
} else if (processors.size() == 1) {
return processors.get(0);
}
- return new Pipeline(processors);
+ return new Pipeline(camelContext, processors);
}
public void process(Exchange exchange) throws Exception {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java Sat Mar 6 07:45:46 2010
@@ -104,8 +104,8 @@
producers.put(endpoint, producer);
}
- MulticastProcessor mp = new MulticastProcessor(processors, getAggregationStrategy(), isParallelProcessing(),
- getExecutorService(), false, isStopOnException());
+ MulticastProcessor mp = new MulticastProcessor(exchange.getContext(), processors, getAggregationStrategy(),
+ isParallelProcessing(), getExecutorService(), false, isStopOnException());
// now let the multicast process the exchange
mp.process(exchange);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java Sat Mar 6 07:45:46 2010
@@ -36,7 +36,6 @@
import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.util.ExchangeHelper;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
/**
* @version $Revision$
@@ -165,7 +164,8 @@
public ExecutorService getExecutorService() {
if (executorService == null) {
- executorService = ExecutorServiceHelper.newThreadPool("SendAsyncProcessor-Consumer", poolSize, poolSize);
+ executorService = destination.getCamelContext().getExecutorServiceStrategy()
+ .newThreadPool("SendAsyncProcessor-Consumer", poolSize, poolSize);
}
return executorService;
}
@@ -182,7 +182,8 @@
public ExecutorService getProducerExecutorService() {
if (producerExecutorService == null) {
// use a cached pool for the producers which can grow/schrink itself
- producerExecutorService = ExecutorServiceHelper.newCachedThreadPool("SendAsyncProcessor-Producer", true);
+ producerExecutorService = destination.getCamelContext().getExecutorServiceStrategy()
+ .newCachedThreadPool("SendAsyncProcessor-Producer");
}
return producerExecutorService;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java Sat Mar 6 07:45:46 2010
@@ -23,6 +23,7 @@
import java.util.List;
import java.util.concurrent.ExecutorService;
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Message;
@@ -44,13 +45,13 @@
public class Splitter extends MulticastProcessor implements Processor, Traceable {
private final Expression expression;
- public Splitter(Expression expression, Processor destination, AggregationStrategy aggregationStrategy) {
- this(expression, destination, aggregationStrategy, false, null, false, false);
+ public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy) {
+ this(camelContext, expression, destination, aggregationStrategy, false, null, false, false);
}
- public Splitter(Expression expression, Processor destination, AggregationStrategy aggregationStrategy,
+ public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy,
boolean parallelProcessing, ExecutorService executorService, boolean streaming, boolean stopOnException) {
- super(Collections.singleton(destination), aggregationStrategy, parallelProcessing, executorService, streaming, stopOnException);
+ super(camelContext, Collections.singleton(destination), aggregationStrategy, parallelProcessing, executorService, streaming, stopOnException);
this.expression = expression;
notNull(expression, "expression");
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java Sat Mar 6 07:45:46 2010
@@ -20,11 +20,12 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.WaitForTaskToComplete;
import org.apache.camel.util.ExchangeHelper;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
+import org.apache.camel.util.ObjectHelper;
/**
* Threads processor that leverage a thread pool for processing exchanges.
@@ -39,11 +40,15 @@
*/
public class ThreadsProcessor extends DelegateProcessor implements Processor {
+ protected final CamelContext camelContext;
protected ExecutorService executorService;
protected WaitForTaskToComplete waitForTaskToComplete;
- public ThreadsProcessor(Processor output, ExecutorService executorService, WaitForTaskToComplete waitForTaskToComplete) {
+ public ThreadsProcessor(CamelContext camelContext, Processor output, ExecutorService executorService, WaitForTaskToComplete waitForTaskToComplete) {
super(output);
+ ObjectHelper.notNull(camelContext, "camelContext");
+ ObjectHelper.notNull(executorService, "executorService");
+ this.camelContext = camelContext;
this.executorService = executorService;
this.waitForTaskToComplete = waitForTaskToComplete;
}
@@ -98,15 +103,11 @@
public ExecutorService getExecutorService() {
if (executorService == null) {
- executorService = createExecutorService();
+ executorService = camelContext.getExecutorServiceStrategy().newCachedThreadPool("Threads");
}
return executorService;
}
- protected ExecutorService createExecutorService() {
- return ExecutorServiceHelper.newCachedThreadPool("Threads", true);
- }
-
protected void doStop() throws Exception {
super.doStop();
if (executorService != null) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java Sat Mar 6 07:45:46 2010
@@ -29,7 +29,6 @@
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
/**
* Processor for wire tapping exchanges to an endpoint destination.
@@ -161,7 +160,7 @@
}
protected ExecutorService createExecutorService() {
- return ExecutorServiceHelper.newCachedThreadPool(this.toString(), true);
+ return getDestination().getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this.toString());
}
public void setExecutorService(ExecutorService executorService) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Sat Mar 6 07:45:46 2010
@@ -24,6 +24,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.camel.CamelContext;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
@@ -41,7 +42,6 @@
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
import org.apache.camel.util.TimeoutMap;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -66,6 +66,7 @@
private static final Log LOG = LogFactory.getLog(AggregateProcessor.class);
+ private final CamelContext camelContext;
private final Processor processor;
private final AggregationStrategy aggregationStrategy;
private final Expression correlationExpression;
@@ -90,10 +91,13 @@
private boolean completionFromBatchConsumer;
private AtomicInteger batchConsumerCounter = new AtomicInteger();
- public AggregateProcessor(Processor processor, Expression correlationExpression, AggregationStrategy aggregationStrategy) {
+ public AggregateProcessor(CamelContext camelContext, Processor processor,
+ Expression correlationExpression, AggregationStrategy aggregationStrategy) {
+ ObjectHelper.notNull(camelContext, "camelContext");
ObjectHelper.notNull(processor, "processor");
ObjectHelper.notNull(correlationExpression, "correlationExpression");
ObjectHelper.notNull(aggregationStrategy, "aggregationStrategy");
+ this.camelContext = camelContext;
this.processor = processor;
this.correlationExpression = correlationExpression;
this.aggregationStrategy = aggregationStrategy;
@@ -471,16 +475,16 @@
if (executorService == null) {
if (isParallelProcessing()) {
// we are running in parallel so create a cached thread pool which grows/shrinks automatic
- executorService = ExecutorServiceHelper.newCachedThreadPool("Aggregator", true);
+ executorService = camelContext.getExecutorServiceStrategy().newCachedThreadPool("Aggregator");
} else {
// use a single threaded if we are not running in parallel
- executorService = ExecutorServiceHelper.newSingleThreadExecutor("Aggregator", true);
+ executorService = camelContext.getExecutorServiceStrategy().newSingleThreadExecutor("Aggregator");
}
}
// start timeout service if its in use
if (getCompletionTimeout() > 0 || getCompletionTimeoutExpression() != null) {
- ScheduledExecutorService scheduler = ExecutorServiceHelper.newScheduledThreadPool(1, "AggregateTimeoutChecker", true);
+ ScheduledExecutorService scheduler = camelContext.getExecutorServiceStrategy().newScheduledThreadPool("AggregateTimeoutChecker", 1);
// check for timed out aggregated messages once every second
timeoutMap = new AggregationTimeoutMap(scheduler, 1000L);
ServiceHelper.startService(timeoutMap);
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java?rev=919711&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java Sat Mar 6 07:45:46 2010
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @version $Revision$
+ */
+public interface ExecutorServiceStrategy {
+
+ ExecutorService lookup(String executorServiceRef);
+
+ ExecutorService newCachedThreadPool(String name);
+
+ ScheduledExecutorService newScheduledThreadPool(String name, int poolSize);
+
+ ExecutorService newFixedThreadPool(String name, int poolSize);
+
+ ExecutorService newSingleThreadExecutor(String name);
+
+ ExecutorService newThreadPool(String name, int corePoolSize, int maxPoolSize);
+
+ ExecutorService newThreadPool(final String name, int corePoolSize, int maxPoolSize,
+ long keepAliveTime, TimeUnit timeUnit, boolean daemon);
+
+ void shutdown(ExecutorService executorService);
+
+ List<Runnable> shutdownNow(ExecutorService executorService);
+
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Sat Mar 6 07:45:46 2010
@@ -30,7 +30,9 @@
* create thread names with Camel prefix.
*
* @version $Revision$
+ * @deprecated replaced with {@link org.apache.camel.spi.ExecutorServiceStrategy}
*/
+@Deprecated
public final class ExecutorServiceHelper {
private static AtomicInteger threadCounter = new AtomicInteger();
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Sat Mar 6 07:45:46 2010
@@ -52,7 +52,7 @@
AggregationStrategy as = new BodyInAggregatingStrategy();
Predicate complete = body().contains("END");
- AggregateProcessor ap = new AggregateProcessor(done, corr, as);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as);
ap.setCompletionPredicate(complete);
ap.setEagerCheckCompletion(false);
ap.start();
@@ -93,7 +93,7 @@
AggregationStrategy as = new BodyInAggregatingStrategy();
Predicate complete = body().isEqualTo("END");
- AggregateProcessor ap = new AggregateProcessor(done, corr, as);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as);
ap.setCompletionPredicate(complete);
ap.setEagerCheckCompletion(true);
ap.start();
@@ -141,7 +141,7 @@
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(done, corr, as);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as);
ap.setCompletionSize(3);
ap.setEagerCheckCompletion(eager);
ap.start();
@@ -189,7 +189,7 @@
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(done, corr, as);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as);
ap.setCompletionTimeout(3000);
ap.setEagerCheckCompletion(eager);
ap.start();
@@ -235,7 +235,7 @@
AggregationStrategy as = new BodyInAggregatingStrategy();
Predicate complete = body().contains("END");
- AggregateProcessor ap = new AggregateProcessor(done, corr, as);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as);
ap.setCompletionPredicate(complete);
ap.setIgnoreBadCorrelationKeys(true);
@@ -275,7 +275,7 @@
AggregationStrategy as = new BodyInAggregatingStrategy();
Predicate complete = body().contains("END");
- AggregateProcessor ap = new AggregateProcessor(done, corr, as);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as);
ap.setCompletionPredicate(complete);
ap.start();
@@ -322,7 +322,7 @@
AggregationStrategy as = new BodyInAggregatingStrategy();
Predicate complete = body().contains("END");
- AggregateProcessor ap = new AggregateProcessor(done, corr, as);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as);
ap.setCompletionPredicate(complete);
ap.setCloseCorrelationKeyOnCompletion(1000);
@@ -369,7 +369,7 @@
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(done, corr, as);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as);
ap.setCompletionSize(100);
ap.setCompletionFromBatchConsumer(true);
@@ -464,7 +464,7 @@
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(done, corr, as);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as);
ap.setEagerCheckCompletion(true);
ap.setCompletionPredicate(body().isEqualTo("END"));
if (handler != null) {
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherRefTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherRefTest.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherRefTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherRefTest.java Sat Mar 6 07:45:46 2010
@@ -16,6 +16,7 @@
*/
package org.apache.camel.processor.enricher;
+import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
@@ -39,8 +40,14 @@
return jndi;
}
- public void testPollEnrichRef() throws Exception {
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext context = super.createCamelContext();
cool.setCamelContext(context);
+ return context;
+ }
+
+ public void testPollEnrichRef() throws Exception {
Exchange exchange = new DefaultExchange(context);
exchange.getIn().setBody("Bye World");
cool.getQueue().add(exchange);
Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java Sat Mar 6 07:45:46 2010
@@ -28,7 +28,6 @@
import org.apache.camel.ExchangePattern;
import org.apache.camel.impl.DefaultComponent;
import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.mina.common.DefaultIoFilterChainBuilder;
@@ -165,8 +164,10 @@
List<IoFilter> filters = configuration.getFilters();
final int processorCount = Runtime.getRuntime().availableProcessors() + 1;
- IoAcceptor acceptor = new SocketAcceptor(processorCount, ExecutorServiceHelper.newCachedThreadPool("MinaSocketAcceptor", true));
- IoConnector connector = new SocketConnector(processorCount, ExecutorServiceHelper.newCachedThreadPool("MinaSocketConnector", true));
+ IoAcceptor acceptor = new SocketAcceptor(processorCount,
+ getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("MinaSocketAcceptor"));
+ IoConnector connector = new SocketConnector(processorCount,
+ getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("MinaSocketConnector"));
SocketAddress address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
// connector config
@@ -174,7 +175,8 @@
// must use manual thread model according to Mina documentation
connectorConfig.setThreadModel(ThreadModel.MANUAL);
configureCodecFactory("MinaProducer", connectorConfig, configuration);
- connectorConfig.getFilterChain().addLast("threadPool", new ExecutorFilter(ExecutorServiceHelper.newCachedThreadPool("MinaThreadPool", true)));
+ connectorConfig.getFilterChain().addLast("threadPool",
+ new ExecutorFilter(getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("MinaThreadPool")));
if (minaLogger) {
connectorConfig.getFilterChain().addLast("logger", new LoggingFilter());
}
@@ -190,7 +192,8 @@
configureCodecFactory("MinaConsumer", acceptorConfig, configuration);
acceptorConfig.setReuseAddress(true);
acceptorConfig.setDisconnectOnUnbind(true);
- acceptorConfig.getFilterChain().addLast("threadPool", new ExecutorFilter(ExecutorServiceHelper.newCachedThreadPool("MinaThreadPool", true)));
+ acceptorConfig.getFilterChain().addLast("threadPool",
+ new ExecutorFilter(getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("MinaThreadPool")));
if (minaLogger) {
acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
}
@@ -258,8 +261,8 @@
boolean sync = configuration.isSync();
List<IoFilter> filters = configuration.getFilters();
- IoAcceptor acceptor = new DatagramAcceptor(ExecutorServiceHelper.newCachedThreadPool("MinaDatagramAcceptor", true));
- IoConnector connector = new DatagramConnector(ExecutorServiceHelper.newCachedThreadPool("MinaDatagramConnector", true));
+ IoAcceptor acceptor = new DatagramAcceptor(getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("MinaDatagramAcceptor"));
+ IoConnector connector = new DatagramConnector(getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("MinaDatagramConnector"));
SocketAddress address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
if (transferExchange) {
@@ -270,7 +273,8 @@
// must use manual thread model according to Mina documentation
connectorConfig.setThreadModel(ThreadModel.MANUAL);
configureDataGramCodecFactory("MinaProducer", connectorConfig, configuration);
- connectorConfig.getFilterChain().addLast("threadPool", new ExecutorFilter(ExecutorServiceHelper.newCachedThreadPool("MinaThreadPool", true)));
+ connectorConfig.getFilterChain().addLast("threadPool",
+ new ExecutorFilter(getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("MinaThreadPool")));
if (minaLogger) {
connectorConfig.getFilterChain().addLast("logger", new LoggingFilter());
}
@@ -284,7 +288,8 @@
configureDataGramCodecFactory("MinaConsumer", acceptorConfig, configuration);
acceptorConfig.setDisconnectOnUnbind(true);
// reuse address is default true for datagram
- acceptorConfig.getFilterChain().addLast("threadPool", new ExecutorFilter(ExecutorServiceHelper.newCachedThreadPool("MinaThreadPool", true)));
+ acceptorConfig.getFilterChain().addLast("threadPool",
+ new ExecutorFilter(getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("MinaThreadPool")));
if (minaLogger) {
acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
}
Modified: camel/trunk/components/camel-nagios/src/main/java/org/apache/camel/component/nagios/NagiosProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-nagios/src/main/java/org/apache/camel/component/nagios/NagiosProducer.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/components/camel-nagios/src/main/java/org/apache/camel/component/nagios/NagiosProducer.java (original)
+++ camel/trunk/components/camel-nagios/src/main/java/org/apache/camel/component/nagios/NagiosProducer.java Sat Mar 6 07:45:46 2010
@@ -16,13 +16,14 @@
*/
package org.apache.camel.component.nagios;
+import java.util.concurrent.ExecutorService;
+
import com.googlecode.jsendnsca.core.INagiosPassiveCheckSender;
import com.googlecode.jsendnsca.core.Level;
import com.googlecode.jsendnsca.core.MessagePayload;
import com.googlecode.jsendnsca.core.NonBlockingNagiosPassiveCheckSender;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
import static org.apache.camel.component.nagios.NagiosConstants.HOST_NAME;
import static org.apache.camel.component.nagios.NagiosConstants.LEVEL;
@@ -67,7 +68,9 @@
// if non blocking then set a executor service on it
if (sender instanceof NonBlockingNagiosPassiveCheckSender) {
NonBlockingNagiosPassiveCheckSender nonBlocking = (NonBlockingNagiosPassiveCheckSender) sender;
- nonBlocking.setExecutor(ExecutorServiceHelper.newSingleThreadExecutor(getEndpoint().getEndpointUri(), true));
+ ExecutorService executor = getEndpoint().getCamelContext().getExecutorServiceStrategy()
+ .newSingleThreadExecutor(getEndpoint().getEndpointUri());
+ nonBlocking.setExecutor(executor);
}
super.doStart();
}
Modified: camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelExecutorServiceFactoryBean.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelExecutorServiceFactoryBean.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelExecutorServiceFactoryBean.java (original)
+++ camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelExecutorServiceFactoryBean.java Sat Mar 6 07:45:46 2010
@@ -22,12 +22,19 @@
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
import org.apache.camel.builder.xml.TimeUnitAdapter;
import org.apache.camel.model.IdentifiedType;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
+import org.apache.camel.spring.util.CamelContextResolverHelper;
import org.springframework.beans.factory.FactoryBean;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+
+import static org.apache.camel.util.ObjectHelper.notNull;
/**
* A {@link org.springframework.beans.factory.FactoryBean} which instantiates {@link java.util.concurrent.ExecutorService} objects
@@ -36,7 +43,7 @@
*/
@XmlRootElement(name = "threadPool")
@XmlAccessorType(XmlAccessType.FIELD)
-public class CamelExecutorServiceFactoryBean extends IdentifiedType implements FactoryBean {
+public class CamelExecutorServiceFactoryBean extends IdentifiedType implements FactoryBean, CamelContextAware, ApplicationContextAware {
@XmlAttribute(required = false)
private Integer poolSize;
@@ -51,18 +58,30 @@
private String threadName;
@XmlAttribute
private Boolean deamon = Boolean.TRUE;
+ @XmlAttribute
+ private String camelContextId;
+ @XmlTransient
+ private CamelContext camelContext;
+ @XmlTransient
+ private ApplicationContext applicationContext;
public Object getObject() throws Exception {
+ if (camelContext == null && camelContextId != null) {
+ camelContext = CamelContextResolverHelper.getCamelContextWithId(applicationContext, camelContextId);
+ }
+ notNull(camelContext, "camelContext");
+
String name = getThreadName() != null ? getThreadName() : getId();
ExecutorService answer;
if (getPoolSize() == null || getPoolSize() <= 0) {
// use the cached thread pool
- answer = ExecutorServiceHelper.newCachedThreadPool(name, isDeamon());
+ answer = camelContext.getExecutorServiceStrategy().newCachedThreadPool(name);
} else {
// use a custom pool based on the settings
int max = getMaxPoolSize() != null ? getMaxPoolSize() : getPoolSize();
- answer = ExecutorServiceHelper.newThreadPool(name, getPoolSize(), max, getKeepAliveTime(), getUnits(), isDeamon());
+ answer = camelContext.getExecutorServiceStrategy()
+ .newThreadPool(name, getPoolSize(), max, getKeepAliveTime(), getUnits(), isDeamon());
}
return answer;
}
@@ -122,4 +141,28 @@
public void setDeamon(Boolean deamon) {
this.deamon = deamon;
}
+
+ public String getCamelContextId() {
+ return camelContextId;
+ }
+
+ public void setCamelContextId(String camelContextId) {
+ this.camelContextId = camelContextId;
+ }
+
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ public ApplicationContext getApplicationContext() {
+ return applicationContext;
+ }
+
+ public void setApplicationContext(ApplicationContext applicationContext) {
+ this.applicationContext = applicationContext;
+ }
}
\ No newline at end of file
Modified: camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/util/CamelContextResolverHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/util/CamelContextResolverHelper.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/util/CamelContextResolverHelper.java (original)
+++ camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/util/CamelContextResolverHelper.java Sat Mar 6 07:45:46 2010
@@ -19,18 +19,20 @@
import org.apache.camel.CamelContext;
import org.springframework.context.ApplicationContext;
+/**
+ * Helper to resolve {@link CamelContext} from the Spring {@link org.springframework.context.ApplicationContext}.
+ */
public final class CamelContextResolverHelper {
+
private CamelContextResolverHelper() {
// The helper class
}
public static CamelContext getCamelContextWithId(ApplicationContext context, String contextId) {
- CamelContext answer;
try {
- answer = (CamelContext) context.getBean(contextId);
- return answer;
+ return (CamelContext) context.getBean(contextId);
} catch (Exception e) {
- throw new IllegalArgumentException("Can't find the CamelContext with id " + contextId + ", the cause : ", e);
+ throw new IllegalArgumentException("Cannot find the CamelContext with id " + contextId, e);
}
}
Modified: camel/trunk/components/camel-spring/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/log4j.properties?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/log4j.properties (original)
+++ camel/trunk/components/camel-spring/src/test/resources/log4j.properties Sat Mar 6 07:45:46 2010
@@ -18,7 +18,7 @@
#
# The logging properties used for eclipse testing, We want to see debug output on the console.
#
-log4j.rootLogger=INFO, out
+log4j.rootLogger=INFO, file
log4j.logger.org.springframework=WARN
log4j.logger.org.apache.camel.impl.converter=WARN
Modified: camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java (original)
+++ camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java Sat Mar 6 07:45:46 2010
@@ -35,7 +35,6 @@
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.impl.DefaultMessage;
import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -73,7 +72,7 @@
inputStream = resolveStreamFromUrl();
}
- executor = ExecutorServiceHelper.newSingleThreadExecutor(endpoint.getEndpointUri(), true);
+ executor = endpoint.getCamelContext().getExecutorServiceStrategy().newSingleThreadExecutor(endpoint.getEndpointUri());
executor.execute(this);
}