You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/03/06 08:45:47 UTC

svn commit: r919711 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/component/seda/ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/management/ camel-core...

Author: davsclaus
Date: Sat Mar  6 07:45:46 2010
New Revision: 919711

URL: http://svn.apache.org/viewvc?rev=919711&view=rev
Log:
CAMEL-1437: Added SPI ExecutorServiceStrategy to abstrct how to use thread pools with Camel.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java   (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java   (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/InterceptDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherRefTest.java
    camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
    camel/trunk/components/camel-nagios/src/main/java/org/apache/camel/component/nagios/NagiosProducer.java
    camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelExecutorServiceFactoryBean.java
    camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/util/CamelContextResolverHelper.java
    camel/trunk/components/camel-spring/src/test/resources/log4j.properties
    camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java Sat Mar  6 07:45:46 2010
@@ -29,6 +29,7 @@
 import org.apache.camel.spi.DataFormat;
 import org.apache.camel.spi.DataFormatResolver;
 import org.apache.camel.spi.EndpointStrategy;
+import org.apache.camel.spi.ExecutorServiceStrategy;
 import org.apache.camel.spi.FactoryFinder;
 import org.apache.camel.spi.FactoryFinderResolver;
 import org.apache.camel.spi.InflightRepository;
@@ -704,4 +705,18 @@
      */
     void setShutdownStrategy(ShutdownStrategy shutdownStrategy);
 
+    /**
+     * Gets the current {@link org.apache.camel.spi.ExecutorServiceStrategy}
+     *
+     * @return the strategy
+     */
+    ExecutorServiceStrategy getExecutorServiceStrategy();
+
+    /**
+     * Sets a custom {@link org.apache.camel.spi.ExecutorServiceStrategy}
+     *
+     * @param executorServiceStrategy the custom strategy
+     */
+    void setExecutorServiceStrategy(ExecutorServiceStrategy executorServiceStrategy);
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Sat Mar  6 07:45:46 2010
@@ -33,7 +33,6 @@
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.ShutdownAware;
 import org.apache.camel.util.ServiceHelper;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -171,15 +170,17 @@
                 processors.add(consumer.getProcessor());
             }
 
-            ExecutorService multicastExecutor = ExecutorServiceHelper.newFixedThreadPool(size, endpoint.getEndpointUri() + "(multicast)", true);
-            multicast = new MulticastProcessor(processors, null, true, multicastExecutor, false, false);
+            ExecutorService multicastExecutor = endpoint.getCamelContext().getExecutorServiceStrategy()
+                                                    .newFixedThreadPool(endpoint.getEndpointUri() + "(multicast)", size);
+            multicast = new MulticastProcessor(endpoint.getCamelContext(), processors, null, true, multicastExecutor, false, false);
         }
         return multicast;
     }
 
     protected void doStart() throws Exception {
         int poolSize = endpoint.getConcurrentConsumers();
-        executor = ExecutorServiceHelper.newFixedThreadPool(poolSize, endpoint.getEndpointUri(), true);
+        executor = endpoint.getCamelContext().getExecutorServiceStrategy()
+                        .newFixedThreadPool(endpoint.getEndpointUri(), poolSize);
         for (int i = 0; i < poolSize; i++) {
             executor.execute(this);
         }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Sat Mar  6 07:45:46 2010
@@ -71,6 +71,7 @@
 import org.apache.camel.spi.DataFormat;
 import org.apache.camel.spi.DataFormatResolver;
 import org.apache.camel.spi.EndpointStrategy;
+import org.apache.camel.spi.ExecutorServiceStrategy;
 import org.apache.camel.spi.FactoryFinder;
 import org.apache.camel.spi.FactoryFinderResolver;
 import org.apache.camel.spi.InflightRepository;
@@ -154,9 +155,10 @@
     private InflightRepository inflightRepository = new DefaultInflightRepository();
     private final List<RouteStartupOrder> routeStartupOrder = new ArrayList<RouteStartupOrder>();
     private int defaultRouteStartupOrder = 1000;
-    private ShutdownStrategy shutdownStrategy = new DefaultShutdownStrategy();
+    private ShutdownStrategy shutdownStrategy = new DefaultShutdownStrategy(this);
     private ShutdownRoute shutdownRoute = ShutdownRoute.Default;
     private ShutdownRunningTask shutdownRunningTask = ShutdownRunningTask.CompleteCurrentTaskOnly;
+    private ExecutorServiceStrategy executorServiceStrategy = new DefaultExecutorServiceStrategy(this);
 
     public DefaultCamelContext() {
         super();
@@ -1524,6 +1526,14 @@
         this.shutdownRunningTask = shutdownRunningTask;
     }
 
+    public ExecutorServiceStrategy getExecutorServiceStrategy() {
+        return executorServiceStrategy;
+    }
+
+    public void setExecutorServiceStrategy(ExecutorServiceStrategy executorServiceStrategy) {
+        this.executorServiceStrategy = executorServiceStrategy;
+    }
+
     protected String getEndpointKey(String uri, Endpoint endpoint) {
         if (endpoint.isSingleton()) {
             return uri;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java Sat Mar  6 07:45:46 2010
@@ -33,7 +33,6 @@
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.URISupport;
 import org.apache.camel.util.UnsafeUriCharactersEncoder;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -196,7 +195,7 @@
      */
     protected ScheduledExecutorService createScheduledExecutorService() {
         String name = getClass().getSimpleName();
-        return ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, name, true);
+        return getCamelContext().getExecutorServiceStrategy().newScheduledThreadPool(name, DEFAULT_THREADPOOL_SIZE);
     }
 
     protected void doStart() throws Exception {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java Sat Mar  6 07:45:46 2010
@@ -30,7 +30,6 @@
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 
 /**
  * A default endpoint useful for implementation inheritance
@@ -197,7 +196,7 @@
     }
 
     protected ScheduledExecutorService createScheduledExecutorService() {
-        return ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, getEndpointUri(), true);
+        return getCamelContext().getExecutorServiceStrategy().newScheduledThreadPool(getEndpointUri(), DEFAULT_THREADPOOL_SIZE);
     }
 
     public void configureProperties(Map<String, Object> options) {

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java?rev=919711&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java Sat Mar  6 07:45:46 2010
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.ExecutorServiceStrategy;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
+
+/**
+ * @version $Revision$
+ */
+public class DefaultExecutorServiceStrategy implements ExecutorServiceStrategy {
+
+    private final CamelContext camelContext;
+
+    public DefaultExecutorServiceStrategy(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    public ExecutorService lookup(String executorServiceRef) {
+        return camelContext.getRegistry().lookup(executorServiceRef, ExecutorService.class);
+    }
+
+    public ExecutorService newCachedThreadPool(String name) {
+        return ExecutorServiceHelper.newCachedThreadPool(name, true);
+    }
+
+    public ScheduledExecutorService newScheduledThreadPool(String name, int poolSize) {
+        return ExecutorServiceHelper.newScheduledThreadPool(poolSize, name, true);
+    }
+
+    public ExecutorService newFixedThreadPool(String name, int poolSize) {
+        return ExecutorServiceHelper.newFixedThreadPool(poolSize, name, true);
+    }
+
+    public ExecutorService newSingleThreadExecutor(String name) {
+        return ExecutorServiceHelper.newSingleThreadExecutor(name, true);
+    }
+
+    public ExecutorService newThreadPool(String name, int corePoolSize, int maxPoolSize) {
+        return ExecutorServiceHelper.newThreadPool(name, corePoolSize, maxPoolSize);
+    }
+
+    public ExecutorService newThreadPool(String name, int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, boolean daemon) {
+        return ExecutorServiceHelper.newThreadPool(name, corePoolSize, maxPoolSize, keepAliveTime, timeUnit, daemon);
+    }
+
+    public void shutdown(ExecutorService executorService) {
+        executorService.shutdown();
+    }
+
+    public List<Runnable> shutdownNow(ExecutorService executorService) {
+        return executorService.shutdownNow();
+    }
+    
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java Sat Mar  6 07:45:46 2010
@@ -36,7 +36,6 @@
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 
 /**
  * A client helper object (named like Spring's TransactionTemplate & JmsTemplate
@@ -54,7 +53,7 @@
     public DefaultProducerTemplate(CamelContext context) {
         this.context = context;
         this.producerCache = new ProducerCache(context);
-        this.executor = ExecutorServiceHelper.newCachedThreadPool("ProducerTemplate", true);
+        this.executor = context.getExecutorServiceStrategy().newCachedThreadPool("ProducerTemplate");
     }
 
     public DefaultProducerTemplate(CamelContext context, ExecutorService executor) {
@@ -683,7 +682,7 @@
         super.start();
         ServiceHelper.startService(producerCache);
         if (executor == null || executor.isShutdown()) {
-            executor = ExecutorServiceHelper.newCachedThreadPool("ProducerTemplate", true);
+            executor = context.getExecutorServiceStrategy().newCachedThreadPool("ProducerTemplate");
         }
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java Sat Mar  6 07:45:46 2010
@@ -145,7 +145,7 @@
     public void commit() {
         // now lets turn all of the event driven consumer processors into a single route
         if (!eventDrivenProcessors.isEmpty()) {
-            Processor processor = Pipeline.newInstance(eventDrivenProcessors);
+            Processor processor = Pipeline.newInstance(getCamelContext(), eventDrivenProcessors);
 
             // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW
             Processor unitOfWorkProcessor = new UnitOfWorkProcessor(this, processor);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java Sat Mar  6 07:45:46 2010
@@ -25,6 +25,7 @@
 import java.util.concurrent.TimeoutException;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.Consumer;
 import org.apache.camel.Route;
 import org.apache.camel.ShutdownRoute;
@@ -36,7 +37,6 @@
 import org.apache.camel.util.EventHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -57,14 +57,22 @@
  *
  * @version $Revision$
  */
-public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownStrategy {
+public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownStrategy, CamelContextAware {
     private static final transient Log LOG = LogFactory.getLog(DefaultShutdownStrategy.class);
 
+    private CamelContext camelContext;
     private ExecutorService executor;
     private long timeout = 5 * 60;
     private TimeUnit timeUnit = TimeUnit.SECONDS;
     private boolean shutdownNowOnTimeout = true;
 
+    public DefaultShutdownStrategy() {
+    }
+
+    public DefaultShutdownStrategy(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
     public void shutdown(CamelContext context, List<RouteStartupOrder> routes) throws Exception {
         shutdown(context, routes, getTimeout(), getTimeUnit());
     }
@@ -133,6 +141,14 @@
         return shutdownNowOnTimeout;
     }
 
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
     /**
      * Shutdown all the consumers immediately.
      *
@@ -217,13 +233,14 @@
 
     private ExecutorService getExecutorService() {
         if (executor == null) {
-            executor = ExecutorServiceHelper.newSingleThreadExecutor("ShutdownTask", true);
+            executor = camelContext.getExecutorServiceStrategy().newSingleThreadExecutor("ShutdownTask");
         }
         return executor;
     }
 
     @Override
     protected void doStart() throws Exception {
+        ObjectHelper.notNull(camelContext, "CamelContext must be set");
     }
 
     @Override

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Sat Mar  6 07:45:46 2010
@@ -26,7 +26,6 @@
 import org.apache.camel.SuspendableService;
 import org.apache.camel.spi.PollingConsumerPollStrategy;
 import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -58,7 +57,8 @@
         if (service instanceof ScheduledExecutorService) {
             scheduled = (ScheduledExecutorService) service;
         } else {
-            scheduled = ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, getEndpoint().getEndpointUri(), true);
+            scheduled = endpoint.getCamelContext().getExecutorServiceStrategy()
+                            .newScheduledThreadPool(getEndpoint().getEndpointUri(), DEFAULT_THREADPOOL_SIZE);
         }
 
         this.executor = scheduled;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java Sat Mar  6 07:45:46 2010
@@ -393,6 +393,7 @@
 
         if (executorService == null) {
             // we only need a single for the JMX connector
+            // TODO use ExecutorServiceStrategy
             executorService = ExecutorServiceHelper.newSingleThreadExecutor("JMXConnector: " + url, true);
         }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Sat Mar  6 07:45:46 2010
@@ -148,7 +148,7 @@
         Expression correlation = getExpression().createExpression(routeContext);
         AggregationStrategy strategy = createAggregationStrategy(routeContext);
 
-        AggregateProcessor answer = new AggregateProcessor(processor, correlation, strategy);
+        AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(), processor, correlation, strategy);
 
         ExecutorService executor = createExecutorService(routeContext);
         answer.setExecutorService(executor);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/InterceptDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/InterceptDefinition.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/InterceptDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/InterceptDefinition.java Sat Mar  6 07:45:46 2010
@@ -92,7 +92,7 @@
                     List<Processor> list = new ArrayList<Processor>(2);
                     list.add(output);
                     list.add(interceptedTarget);
-                    return new Pipeline(list);
+                    return new Pipeline(context, list);
                 } else {
                     return output;
                 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java Sat Mar  6 07:45:46 2010
@@ -153,8 +153,8 @@
             }
         }
 
-        return new MulticastProcessor(list, aggregationStrategy, isParallelProcessing(), executorService,
-                                      isStreaming(), isStopOnException());
+        return new MulticastProcessor(routeContext.getCamelContext(), list, aggregationStrategy, isParallelProcessing(),
+                                      executorService, isStreaming(), isStopOnException());
     }
 
     public AggregationStrategy getAggregationStrategy() {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java Sat Mar  6 07:45:46 2010
@@ -103,7 +103,8 @@
             }
         }
 
-        OnCompletionProcessor answer = new OnCompletionProcessor(childProcessor, onCompleteOnly, onFailureOnly, when);
+        OnCompletionProcessor answer = new OnCompletionProcessor(routeContext.getCamelContext(), childProcessor,
+                                                                 onCompleteOnly, onFailureOnly, when);
         answer.setExecutorService(executorService);
         return answer;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Sat Mar  6 07:45:46 2010
@@ -282,7 +282,7 @@
      * to using a {@link Pipeline} but derived classes could change the behaviour
      */
     protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) throws Exception {
-        return new Pipeline(list);
+        return new Pipeline(routeContext.getCamelContext(), list);
     }
 
     /**

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java Sat Mar  6 07:45:46 2010
@@ -32,7 +32,6 @@
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 import org.apache.camel.spi.RouteContext;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 
 /**
  * Represents an XML &lt;recipientList/&gt; element
@@ -123,7 +122,7 @@
         }
         if (executorService == null) {
             // fall back and use default
-            executorService = ExecutorServiceHelper.newCachedThreadPool("RecipientList", true);
+            executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("RecipientList");
         }
         return executorService;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java Sat Mar  6 07:45:46 2010
@@ -31,7 +31,6 @@
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 import org.apache.camel.spi.RouteContext;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 
 /**
  * Represents an XML &lt;split/&gt; element
@@ -87,8 +86,9 @@
         Processor childProcessor = routeContext.createProcessor(this);
         aggregationStrategy = createAggregationStrategy(routeContext);
         executorService = createExecutorService(routeContext);
-        return new Splitter(getExpression().createExpression(routeContext), childProcessor, aggregationStrategy,
-                isParallelProcessing(), executorService, isStreaming(), isStopOnException());
+        Expression exp = getExpression().createExpression(routeContext);
+        return new Splitter(routeContext.getCamelContext(), exp, childProcessor, aggregationStrategy,
+                            isParallelProcessing(), executorService, isStreaming(), isStopOnException());
     }
 
     
@@ -113,7 +113,7 @@
         }
         if (executorService == null) {
             // fall back and use default
-            executorService = ExecutorServiceHelper.newCachedThreadPool("Split", true);
+            executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("Split");
         }
         return executorService;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java Sat Mar  6 07:45:46 2010
@@ -31,7 +31,6 @@
 import org.apache.camel.processor.ThreadsProcessor;
 import org.apache.camel.processor.UnitOfWorkProcessor;
 import org.apache.camel.spi.RouteContext;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 
 /**
  * Represents an XML &lt;threads/&gt; element
@@ -73,11 +72,12 @@
             String name = getThreadName() != null ? getThreadName() : "Threads";
             if (poolSize == null || poolSize <= 0) {
                 // use the cached thread pool
-                executorService = ExecutorServiceHelper.newCachedThreadPool(name, true);
+                executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(name);
             } else {
                 // use a custom pool based on the settings
                 int max = getMaxPoolSize() != null ? getMaxPoolSize() : poolSize;
-                executorService = ExecutorServiceHelper.newThreadPool(name, poolSize, max, getKeepAliveTime(), getUnits(), true);
+                executorService = routeContext.getCamelContext().getExecutorServiceStrategy()
+                                        .newThreadPool(name, poolSize, max, getKeepAliveTime(), getUnits(), true);
             }
         }
         Processor childProcessor = routeContext.createProcessor(this);
@@ -85,7 +85,7 @@
         // wrap it in a unit of work so the route that comes next is also done in a unit of work
         UnitOfWorkProcessor uow = new UnitOfWorkProcessor(routeContext, childProcessor);
 
-        return new ThreadsProcessor(uow, executorService, waitForTaskToComplete);
+        return new ThreadsProcessor(routeContext.getCamelContext(), uow, executorService, waitForTaskToComplete);
     }
 
     @Override

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java Sat Mar  6 07:45:46 2010
@@ -31,7 +31,6 @@
 import org.apache.camel.processor.SendAsyncProcessor;
 import org.apache.camel.processor.UnitOfWorkProcessor;
 import org.apache.camel.spi.RouteContext;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 
 /**
  * Represents an XML &lt;to/&gt; element
@@ -97,7 +96,8 @@
             }
         }
         if (executorService == null && poolSize != null) {
-            executorService = ExecutorServiceHelper.newThreadPool("ToAsync[" + getLabel() + "]", poolSize, poolSize);
+            executorService = routeContext.getCamelContext().getExecutorServiceStrategy()
+                                .newThreadPool("ToAsync[" + getLabel() + "]", poolSize, poolSize);
         }
 
         // create the child processor which is the async route

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Sat Mar  6 07:45:46 2010
@@ -29,6 +29,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -45,7 +46,6 @@
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
 import org.apache.camel.util.concurrent.AtomicExchange;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 import org.apache.camel.util.concurrent.SubmitOrderedCompletionService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -99,6 +99,7 @@
         }
     }
 
+    private final CamelContext camelContext;
     private Collection<Processor> processors;
     private final AggregationStrategy aggregationStrategy;
     private final boolean isParallelProcessing;
@@ -106,17 +107,19 @@
     private final boolean stopOnException;
     private ExecutorService executorService;
 
-    public MulticastProcessor(Collection<Processor> processors) {
-        this(processors, null);
+    public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors) {
+        this(camelContext, processors, null);
     }
 
-    public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy) {
-        this(processors, aggregationStrategy, false, null, false, false);
+    public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy) {
+        this(camelContext,processors, aggregationStrategy, false, null, false, false);
     }
     
-    public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy,
+    public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy,
                               boolean parallelProcessing, ExecutorService executorService, boolean streaming, boolean stopOnException) {
+        notNull(camelContext, "camelContext");
         notNull(processors, "processors");
+        this.camelContext = camelContext;
         this.processors = processors;
         this.aggregationStrategy = aggregationStrategy;
         this.isParallelProcessing = parallelProcessing;
@@ -125,7 +128,7 @@
         this.stopOnException = stopOnException;
 
         if (isParallelProcessing() && getExecutorService() == null) {
-            this.executorService = ExecutorServiceHelper.newCachedThreadPool("Multicast", true);
+            this.executorService = camelContext.getExecutorServiceStrategy().newCachedThreadPool("Multicast");
         }
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java Sat Mar  6 07:45:46 2010
@@ -19,6 +19,7 @@
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Predicate;
@@ -27,23 +28,28 @@
 import org.apache.camel.impl.SynchronizationAdapter;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ServiceHelper;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import static org.apache.camel.util.ObjectHelper.notNull;
+
 /**
  * @version $Revision$
  */
 public class OnCompletionProcessor extends ServiceSupport implements Processor, Traceable {
 
     private static final transient Log LOG = LogFactory.getLog(OnCompletionProcessor.class);
+    private final CamelContext camelContext;
+    private final Processor processor;
     private ExecutorService executorService;
-    private Processor processor;
     private boolean onCompleteOnly;
     private boolean onFailureOnly;
     private Predicate onWhen;
 
-    public OnCompletionProcessor(Processor processor, boolean onCompleteOnly, boolean onFailureOnly, Predicate onWhen) {
+    public OnCompletionProcessor(CamelContext camelContext, Processor processor, boolean onCompleteOnly, boolean onFailureOnly, Predicate onWhen) {
+        notNull(camelContext, "camelContext");
+        notNull(processor, "processor");
+        this.camelContext = camelContext;
         // wrap processor in UnitOfWork so what we send out runs in a UoW
         this.processor = new UnitOfWorkProcessor(processor);
         this.onCompleteOnly = onCompleteOnly;
@@ -176,7 +182,7 @@
     }
 
     protected ExecutorService createExecutorService() {
-        return ExecutorServiceHelper.newCachedThreadPool(this.toString(), true);
+        return camelContext.getExecutorServiceStrategy().newCachedThreadPool(this.toString());
     }
 
     public void setExecutorService(ExecutorService executorService) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Sat Mar  6 07:45:46 2010
@@ -20,6 +20,7 @@
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultExchange;
@@ -36,17 +37,17 @@
 public class Pipeline extends MulticastProcessor implements Processor, Traceable {
     private static final transient Log LOG = LogFactory.getLog(Pipeline.class);
 
-    public Pipeline(Collection<Processor> processors) {
-        super(processors);
+    public Pipeline(CamelContext camelContext, Collection<Processor> processors) {
+        super(camelContext, processors);
     }
 
-    public static Processor newInstance(List<Processor> processors) {
+    public static Processor newInstance(CamelContext camelContext, List<Processor> processors) {
         if (processors.isEmpty()) {
             return null;
         } else if (processors.size() == 1) {
             return processors.get(0);
         }
-        return new Pipeline(processors);
+        return new Pipeline(camelContext, processors);
     }
 
     public void process(Exchange exchange) throws Exception {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java Sat Mar  6 07:45:46 2010
@@ -104,8 +104,8 @@
                 producers.put(endpoint, producer);
             }
 
-            MulticastProcessor mp = new MulticastProcessor(processors, getAggregationStrategy(), isParallelProcessing(),
-                                                           getExecutorService(), false, isStopOnException());
+            MulticastProcessor mp = new MulticastProcessor(exchange.getContext(), processors, getAggregationStrategy(),
+                                                           isParallelProcessing(), getExecutorService(), false, isStopOnException());
 
             // now let the multicast process the exchange
             mp.process(exchange);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java Sat Mar  6 07:45:46 2010
@@ -36,7 +36,6 @@
 import org.apache.camel.impl.LoggingExceptionHandler;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.util.ExchangeHelper;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 
 /**
  * @version $Revision$
@@ -165,7 +164,8 @@
 
     public ExecutorService getExecutorService() {
         if (executorService == null) {
-            executorService = ExecutorServiceHelper.newThreadPool("SendAsyncProcessor-Consumer", poolSize, poolSize);
+            executorService = destination.getCamelContext().getExecutorServiceStrategy()
+                                .newThreadPool("SendAsyncProcessor-Consumer", poolSize, poolSize);
         }
         return executorService;
     }
@@ -182,7 +182,8 @@
     public ExecutorService getProducerExecutorService() {
         if (producerExecutorService == null) {
             // use a cached pool for the producers which can grow/schrink itself
-            producerExecutorService = ExecutorServiceHelper.newCachedThreadPool("SendAsyncProcessor-Producer", true);
+            producerExecutorService = destination.getCamelContext().getExecutorServiceStrategy()
+                                        .newCachedThreadPool("SendAsyncProcessor-Producer");
         }
         return producerExecutorService;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java Sat Mar  6 07:45:46 2010
@@ -23,6 +23,7 @@
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Message;
@@ -44,13 +45,13 @@
 public class Splitter extends MulticastProcessor implements Processor, Traceable {
     private final Expression expression;
 
-    public Splitter(Expression expression, Processor destination, AggregationStrategy aggregationStrategy) {
-        this(expression, destination, aggregationStrategy, false, null, false, false);
+    public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy) {
+        this(camelContext, expression, destination, aggregationStrategy, false, null, false, false);
     }
 
-    public Splitter(Expression expression, Processor destination, AggregationStrategy aggregationStrategy,
+    public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy,
                     boolean parallelProcessing, ExecutorService executorService, boolean streaming, boolean stopOnException) {
-        super(Collections.singleton(destination), aggregationStrategy, parallelProcessing, executorService, streaming, stopOnException);
+        super(camelContext, Collections.singleton(destination), aggregationStrategy, parallelProcessing, executorService, streaming, stopOnException);
 
         this.expression = expression;
         notNull(expression, "expression");

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java Sat Mar  6 07:45:46 2010
@@ -20,11 +20,12 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.WaitForTaskToComplete;
 import org.apache.camel.util.ExchangeHelper;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
+import org.apache.camel.util.ObjectHelper;
 
 /**
  * Threads processor that leverage a thread pool for processing exchanges.
@@ -39,11 +40,15 @@
  */
 public class ThreadsProcessor extends DelegateProcessor implements Processor {
 
+    protected final CamelContext camelContext;
     protected ExecutorService executorService;
     protected WaitForTaskToComplete waitForTaskToComplete;
 
-    public ThreadsProcessor(Processor output, ExecutorService executorService, WaitForTaskToComplete waitForTaskToComplete) {
+    public ThreadsProcessor(CamelContext camelContext, Processor output, ExecutorService executorService, WaitForTaskToComplete waitForTaskToComplete) {
         super(output);
+        ObjectHelper.notNull(camelContext, "camelContext");
+        ObjectHelper.notNull(executorService, "executorService");
+        this.camelContext = camelContext;
         this.executorService = executorService;
         this.waitForTaskToComplete = waitForTaskToComplete;
     }
@@ -98,15 +103,11 @@
 
     public ExecutorService getExecutorService() {
         if (executorService == null) {
-            executorService = createExecutorService();
+            executorService = camelContext.getExecutorServiceStrategy().newCachedThreadPool("Threads");
         }
         return executorService;
     }
 
-    protected ExecutorService createExecutorService() {
-        return ExecutorServiceHelper.newCachedThreadPool("Threads", true);
-    }
-
     protected void doStop() throws Exception {
         super.doStop();
         if (executorService != null) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java Sat Mar  6 07:45:46 2010
@@ -29,7 +29,6 @@
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 
 /**
  * Processor for wire tapping exchanges to an endpoint destination.
@@ -161,7 +160,7 @@
     }
 
     protected ExecutorService createExecutorService() {
-        return ExecutorServiceHelper.newCachedThreadPool(this.toString(), true);
+        return getDestination().getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this.toString());
     }
 
     public void setExecutorService(ExecutorService executorService) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Sat Mar  6 07:45:46 2010
@@ -24,6 +24,7 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
@@ -41,7 +42,6 @@
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
 import org.apache.camel.util.TimeoutMap;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -66,6 +66,7 @@
 
     private static final Log LOG = LogFactory.getLog(AggregateProcessor.class);
 
+    private final CamelContext camelContext;
     private final Processor processor;
     private final AggregationStrategy aggregationStrategy;
     private final Expression correlationExpression;
@@ -90,10 +91,13 @@
     private boolean completionFromBatchConsumer;
     private AtomicInteger batchConsumerCounter = new AtomicInteger();
 
-    public AggregateProcessor(Processor processor, Expression correlationExpression, AggregationStrategy aggregationStrategy) {
+    public AggregateProcessor(CamelContext camelContext, Processor processor,
+                              Expression correlationExpression, AggregationStrategy aggregationStrategy) {
+        ObjectHelper.notNull(camelContext, "camelContext");
         ObjectHelper.notNull(processor, "processor");
         ObjectHelper.notNull(correlationExpression, "correlationExpression");
         ObjectHelper.notNull(aggregationStrategy, "aggregationStrategy");
+        this.camelContext = camelContext;
         this.processor = processor;
         this.correlationExpression = correlationExpression;
         this.aggregationStrategy = aggregationStrategy;
@@ -471,16 +475,16 @@
         if (executorService == null) {
             if (isParallelProcessing()) {
                 // we are running in parallel so create a cached thread pool which grows/shrinks automatic
-                executorService = ExecutorServiceHelper.newCachedThreadPool("Aggregator", true);
+                executorService = camelContext.getExecutorServiceStrategy().newCachedThreadPool("Aggregator");
             } else {
                 // use a single threaded if we are not running in parallel
-                executorService = ExecutorServiceHelper.newSingleThreadExecutor("Aggregator", true);
+                executorService = camelContext.getExecutorServiceStrategy().newSingleThreadExecutor("Aggregator");
             }
         }
 
         // start timeout service if its in use
         if (getCompletionTimeout() > 0 || getCompletionTimeoutExpression() != null) {
-            ScheduledExecutorService scheduler = ExecutorServiceHelper.newScheduledThreadPool(1, "AggregateTimeoutChecker", true);
+            ScheduledExecutorService scheduler = camelContext.getExecutorServiceStrategy().newScheduledThreadPool("AggregateTimeoutChecker", 1);
             // check for timed out aggregated messages once every second
             timeoutMap = new AggregationTimeoutMap(scheduler, 1000L);
             ServiceHelper.startService(timeoutMap);

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java?rev=919711&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java Sat Mar  6 07:45:46 2010
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @version $Revision$
+ */
+public interface ExecutorServiceStrategy {
+
+    ExecutorService lookup(String executorServiceRef);
+
+    ExecutorService newCachedThreadPool(String name);
+
+    ScheduledExecutorService newScheduledThreadPool(String name, int poolSize);
+
+    ExecutorService newFixedThreadPool(String name, int poolSize);
+
+    ExecutorService newSingleThreadExecutor(String name);
+
+    ExecutorService newThreadPool(String name, int corePoolSize, int maxPoolSize);
+
+    ExecutorService newThreadPool(final String name, int corePoolSize, int maxPoolSize,
+                                  long keepAliveTime, TimeUnit timeUnit, boolean daemon);
+
+    void shutdown(ExecutorService executorService);
+
+    List<Runnable> shutdownNow(ExecutorService executorService);
+
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Sat Mar  6 07:45:46 2010
@@ -30,7 +30,9 @@
  * create thread names with Camel prefix.
  *
  * @version $Revision$
+ * @deprecated replaced with {@link org.apache.camel.spi.ExecutorServiceStrategy}
  */
+@Deprecated
 public final class ExecutorServiceHelper {
 
     private static AtomicInteger threadCounter = new AtomicInteger();

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Sat Mar  6 07:45:46 2010
@@ -52,7 +52,7 @@
         AggregationStrategy as = new BodyInAggregatingStrategy();
         Predicate complete = body().contains("END");
 
-        AggregateProcessor ap = new AggregateProcessor(done, corr, as);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as);
         ap.setCompletionPredicate(complete);
         ap.setEagerCheckCompletion(false);
         ap.start();
@@ -93,7 +93,7 @@
         AggregationStrategy as = new BodyInAggregatingStrategy();
         Predicate complete = body().isEqualTo("END");
 
-        AggregateProcessor ap = new AggregateProcessor(done, corr, as);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as);
         ap.setCompletionPredicate(complete);
         ap.setEagerCheckCompletion(true);
         ap.start();
@@ -141,7 +141,7 @@
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(done, corr, as);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as);
         ap.setCompletionSize(3);
         ap.setEagerCheckCompletion(eager);
         ap.start();
@@ -189,7 +189,7 @@
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(done, corr, as);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as);
         ap.setCompletionTimeout(3000);
         ap.setEagerCheckCompletion(eager);
         ap.start();
@@ -235,7 +235,7 @@
         AggregationStrategy as = new BodyInAggregatingStrategy();
         Predicate complete = body().contains("END");
 
-        AggregateProcessor ap = new AggregateProcessor(done, corr, as);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as);
         ap.setCompletionPredicate(complete);
         ap.setIgnoreBadCorrelationKeys(true);
 
@@ -275,7 +275,7 @@
         AggregationStrategy as = new BodyInAggregatingStrategy();
         Predicate complete = body().contains("END");
 
-        AggregateProcessor ap = new AggregateProcessor(done, corr, as);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as);
         ap.setCompletionPredicate(complete);
 
         ap.start();
@@ -322,7 +322,7 @@
         AggregationStrategy as = new BodyInAggregatingStrategy();
         Predicate complete = body().contains("END");
 
-        AggregateProcessor ap = new AggregateProcessor(done, corr, as);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as);
         ap.setCompletionPredicate(complete);
         ap.setCloseCorrelationKeyOnCompletion(1000);
 
@@ -369,7 +369,7 @@
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(done, corr, as);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as);
         ap.setCompletionSize(100);
         ap.setCompletionFromBatchConsumer(true);
 
@@ -464,7 +464,7 @@
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(done, corr, as);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as);
         ap.setEagerCheckCompletion(true);
         ap.setCompletionPredicate(body().isEqualTo("END"));
         if (handler != null) {

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherRefTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherRefTest.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherRefTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherRefTest.java Sat Mar  6 07:45:46 2010
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.processor.enricher;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
@@ -39,8 +40,14 @@
         return jndi;
     }
 
-    public void testPollEnrichRef() throws Exception {
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
         cool.setCamelContext(context);
+        return context;
+    }
+
+    public void testPollEnrichRef() throws Exception {
         Exchange exchange = new DefaultExchange(context);
         exchange.getIn().setBody("Bye World");
         cool.getQueue().add(exchange);

Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java Sat Mar  6 07:45:46 2010
@@ -28,7 +28,6 @@
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.impl.DefaultComponent;
 import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.mina.common.DefaultIoFilterChainBuilder;
@@ -165,8 +164,10 @@
         List<IoFilter> filters = configuration.getFilters();
         final int processorCount = Runtime.getRuntime().availableProcessors() + 1;
 
-        IoAcceptor acceptor = new SocketAcceptor(processorCount, ExecutorServiceHelper.newCachedThreadPool("MinaSocketAcceptor", true));
-        IoConnector connector = new SocketConnector(processorCount, ExecutorServiceHelper.newCachedThreadPool("MinaSocketConnector", true));
+        IoAcceptor acceptor = new SocketAcceptor(processorCount,
+                getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("MinaSocketAcceptor"));
+        IoConnector connector = new SocketConnector(processorCount,
+                getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("MinaSocketConnector"));
         SocketAddress address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
 
         // connector config
@@ -174,7 +175,8 @@
         // must use manual thread model according to Mina documentation
         connectorConfig.setThreadModel(ThreadModel.MANUAL);
         configureCodecFactory("MinaProducer", connectorConfig, configuration);
-        connectorConfig.getFilterChain().addLast("threadPool", new ExecutorFilter(ExecutorServiceHelper.newCachedThreadPool("MinaThreadPool", true)));
+        connectorConfig.getFilterChain().addLast("threadPool",
+                new ExecutorFilter(getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("MinaThreadPool")));
         if (minaLogger) {
             connectorConfig.getFilterChain().addLast("logger", new LoggingFilter());
         }
@@ -190,7 +192,8 @@
         configureCodecFactory("MinaConsumer", acceptorConfig, configuration);
         acceptorConfig.setReuseAddress(true);
         acceptorConfig.setDisconnectOnUnbind(true);
-        acceptorConfig.getFilterChain().addLast("threadPool", new ExecutorFilter(ExecutorServiceHelper.newCachedThreadPool("MinaThreadPool", true)));
+        acceptorConfig.getFilterChain().addLast("threadPool",
+                new ExecutorFilter(getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("MinaThreadPool")));
         if (minaLogger) {
             acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
         }
@@ -258,8 +261,8 @@
         boolean sync = configuration.isSync();
         List<IoFilter> filters = configuration.getFilters();
 
-        IoAcceptor acceptor = new DatagramAcceptor(ExecutorServiceHelper.newCachedThreadPool("MinaDatagramAcceptor", true));
-        IoConnector connector = new DatagramConnector(ExecutorServiceHelper.newCachedThreadPool("MinaDatagramConnector", true));
+        IoAcceptor acceptor = new DatagramAcceptor(getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("MinaDatagramAcceptor"));
+        IoConnector connector = new DatagramConnector(getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("MinaDatagramConnector"));
         SocketAddress address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
 
         if (transferExchange) {
@@ -270,7 +273,8 @@
         // must use manual thread model according to Mina documentation
         connectorConfig.setThreadModel(ThreadModel.MANUAL);
         configureDataGramCodecFactory("MinaProducer", connectorConfig, configuration);
-        connectorConfig.getFilterChain().addLast("threadPool", new ExecutorFilter(ExecutorServiceHelper.newCachedThreadPool("MinaThreadPool", true)));
+        connectorConfig.getFilterChain().addLast("threadPool",
+                new ExecutorFilter(getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("MinaThreadPool")));
         if (minaLogger) {
             connectorConfig.getFilterChain().addLast("logger", new LoggingFilter());
         }
@@ -284,7 +288,8 @@
         configureDataGramCodecFactory("MinaConsumer", acceptorConfig, configuration);
         acceptorConfig.setDisconnectOnUnbind(true);
         // reuse address is default true for datagram
-        acceptorConfig.getFilterChain().addLast("threadPool", new ExecutorFilter(ExecutorServiceHelper.newCachedThreadPool("MinaThreadPool", true)));
+        acceptorConfig.getFilterChain().addLast("threadPool",
+                new ExecutorFilter(getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("MinaThreadPool")));
         if (minaLogger) {
             acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
         }

Modified: camel/trunk/components/camel-nagios/src/main/java/org/apache/camel/component/nagios/NagiosProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-nagios/src/main/java/org/apache/camel/component/nagios/NagiosProducer.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/components/camel-nagios/src/main/java/org/apache/camel/component/nagios/NagiosProducer.java (original)
+++ camel/trunk/components/camel-nagios/src/main/java/org/apache/camel/component/nagios/NagiosProducer.java Sat Mar  6 07:45:46 2010
@@ -16,13 +16,14 @@
  */
 package org.apache.camel.component.nagios;
 
+import java.util.concurrent.ExecutorService;
+
 import com.googlecode.jsendnsca.core.INagiosPassiveCheckSender;
 import com.googlecode.jsendnsca.core.Level;
 import com.googlecode.jsendnsca.core.MessagePayload;
 import com.googlecode.jsendnsca.core.NonBlockingNagiosPassiveCheckSender;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultProducer;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 
 import static org.apache.camel.component.nagios.NagiosConstants.HOST_NAME;
 import static org.apache.camel.component.nagios.NagiosConstants.LEVEL;
@@ -67,7 +68,9 @@
         // if non blocking then set a executor service on it
         if (sender instanceof NonBlockingNagiosPassiveCheckSender) {
             NonBlockingNagiosPassiveCheckSender nonBlocking = (NonBlockingNagiosPassiveCheckSender) sender;
-            nonBlocking.setExecutor(ExecutorServiceHelper.newSingleThreadExecutor(getEndpoint().getEndpointUri(), true));
+            ExecutorService executor = getEndpoint().getCamelContext().getExecutorServiceStrategy()
+                                            .newSingleThreadExecutor(getEndpoint().getEndpointUri());
+            nonBlocking.setExecutor(executor);
         }
         super.doStart();
     }

Modified: camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelExecutorServiceFactoryBean.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelExecutorServiceFactoryBean.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelExecutorServiceFactoryBean.java (original)
+++ camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelExecutorServiceFactoryBean.java Sat Mar  6 07:45:46 2010
@@ -22,12 +22,19 @@
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
 import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
 
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.builder.xml.TimeUnitAdapter;
 import org.apache.camel.model.IdentifiedType;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
+import org.apache.camel.spring.util.CamelContextResolverHelper;
 import org.springframework.beans.factory.FactoryBean;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+
+import static org.apache.camel.util.ObjectHelper.notNull;
 
 /**
  * A {@link org.springframework.beans.factory.FactoryBean} which instantiates {@link java.util.concurrent.ExecutorService} objects
@@ -36,7 +43,7 @@
  */
 @XmlRootElement(name = "threadPool")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class CamelExecutorServiceFactoryBean extends IdentifiedType implements FactoryBean {
+public class CamelExecutorServiceFactoryBean extends IdentifiedType implements FactoryBean, CamelContextAware, ApplicationContextAware {
 
     @XmlAttribute(required = false)
     private Integer poolSize;
@@ -51,18 +58,30 @@
     private String threadName;
     @XmlAttribute
     private Boolean deamon = Boolean.TRUE;
+    @XmlAttribute
+    private String camelContextId;
+    @XmlTransient
+    private CamelContext camelContext;
+    @XmlTransient
+    private ApplicationContext applicationContext;
 
     public Object getObject() throws Exception {
+        if (camelContext == null && camelContextId != null) {
+            camelContext = CamelContextResolverHelper.getCamelContextWithId(applicationContext, camelContextId);
+        }
+        notNull(camelContext, "camelContext");
+
         String name = getThreadName() != null ? getThreadName() : getId();
 
         ExecutorService answer;
         if (getPoolSize() == null || getPoolSize() <= 0) {
             // use the cached thread pool
-            answer = ExecutorServiceHelper.newCachedThreadPool(name, isDeamon());
+            answer = camelContext.getExecutorServiceStrategy().newCachedThreadPool(name);
         } else {
             // use a custom pool based on the settings
             int max = getMaxPoolSize() != null ? getMaxPoolSize() : getPoolSize();
-            answer = ExecutorServiceHelper.newThreadPool(name, getPoolSize(), max, getKeepAliveTime(), getUnits(), isDeamon());
+            answer = camelContext.getExecutorServiceStrategy()
+                        .newThreadPool(name, getPoolSize(), max, getKeepAliveTime(), getUnits(), isDeamon());
         }
         return answer;
     }
@@ -122,4 +141,28 @@
     public void setDeamon(Boolean deamon) {
         this.deamon = deamon;
     }
+
+    public String getCamelContextId() {
+        return camelContextId;
+    }
+
+    public void setCamelContextId(String camelContextId) {
+        this.camelContextId = camelContextId;
+    }
+
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    public ApplicationContext getApplicationContext() {
+        return applicationContext;
+    }
+
+    public void setApplicationContext(ApplicationContext applicationContext) {
+        this.applicationContext = applicationContext;
+    }
 }
\ No newline at end of file

Modified: camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/util/CamelContextResolverHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/util/CamelContextResolverHelper.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/util/CamelContextResolverHelper.java (original)
+++ camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/util/CamelContextResolverHelper.java Sat Mar  6 07:45:46 2010
@@ -19,18 +19,20 @@
 import org.apache.camel.CamelContext;
 import org.springframework.context.ApplicationContext;
 
+/**
+ * Helper to resolve {@link CamelContext} from the Spring {@link org.springframework.context.ApplicationContext}.
+ */
 public final class CamelContextResolverHelper {
+
     private CamelContextResolverHelper() {
         // The helper class
     }
     
     public static CamelContext getCamelContextWithId(ApplicationContext context, String contextId) {
-        CamelContext answer;
         try {
-            answer = (CamelContext) context.getBean(contextId);
-            return answer;
+            return (CamelContext) context.getBean(contextId);
         } catch (Exception e) {
-            throw new IllegalArgumentException("Can't find the CamelContext with id " + contextId + ", the cause : ", e);
+            throw new IllegalArgumentException("Cannot find the CamelContext with id " + contextId, e);
         }
     }
 

Modified: camel/trunk/components/camel-spring/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/log4j.properties?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/log4j.properties (original)
+++ camel/trunk/components/camel-spring/src/test/resources/log4j.properties Sat Mar  6 07:45:46 2010
@@ -18,7 +18,7 @@
 #
 # The logging properties used for eclipse testing, We want to see debug output on the console.
 #
-log4j.rootLogger=INFO, out
+log4j.rootLogger=INFO, file
 
 log4j.logger.org.springframework=WARN
 log4j.logger.org.apache.camel.impl.converter=WARN

Modified: camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java?rev=919711&r1=919710&r2=919711&view=diff
==============================================================================
--- camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java (original)
+++ camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java Sat Mar  6 07:45:46 2010
@@ -35,7 +35,6 @@
 import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.impl.DefaultMessage;
 import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -73,7 +72,7 @@
             inputStream = resolveStreamFromUrl();
         }
 
-        executor = ExecutorServiceHelper.newSingleThreadExecutor(endpoint.getEndpointUri(), true);
+        executor = endpoint.getCamelContext().getExecutorServiceStrategy().newSingleThreadExecutor(endpoint.getEndpointUri());
         executor.execute(this);
     }