You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/03/12 07:57:09 UTC

svn commit: r922155 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/model/ camel-core/src/main/java/org/apache/camel/util/concurrent/ camel-core/src/test/java/org/apache/camel/processor/ component...

Author: davsclaus
Date: Fri Mar 12 06:57:08 2010
New Revision: 922155

URL: http://svn.apache.org/viewvc?rev=922155&view=rev
Log:
CAMEL-1588: EIPs using thread pools now use common code from helper to lookup configuration.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/ExecutorServiceAware.java   (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java   (contents, props changed)
      - copied, changed from r921818, camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAware.java
Removed:
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAware.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringSplitterWithCustomThreadPoolExecutorTest.java
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/splitterWithCustomThreadPoolExecutor.xml

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/ExecutorServiceAware.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ExecutorServiceAware.java?rev=922155&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/ExecutorServiceAware.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/ExecutorServiceAware.java Fri Mar 12 06:57:08 2010
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Is used for easy configuration of {@link ExecutorService}.
+ *
+ * @version $Revision$
+ */
+public interface ExecutorServiceAware {
+
+    /**
+     * Gets the executor service
+     *
+     * @return the executor
+     */
+    ExecutorService getExecutorService();
+
+    /**
+     * Sets the executor service to be used.
+     *
+     * @param executorService the executor
+     */
+    void setExecutorService(ExecutorService executorService);
+
+    /**
+     * Gets the reference to lookup in the {@link org.apache.camel.spi.Registry} for the executor service to be used.
+     *
+     * @return the reference, or <tt>null</tt> if the executor was set directly
+     */
+    String getExecutorServiceRef();
+
+    /**
+     * Sets a reference to lookup in the {@link org.apache.camel.spi.Registry} for the executor service to be used.
+     *
+     * @param executorServiceRef reference for the executor
+     */
+    void setExecutorServiceRef(String executorServiceRef);
+
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/ExecutorServiceAware.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/ExecutorServiceAware.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=922155&r1=922154&r2=922155&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Fri Mar 12 06:57:08 2010
@@ -38,6 +38,7 @@ import org.apache.camel.processor.aggreg
 import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
 import org.apache.camel.spi.AggregationRepository;
 import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 
 /**
  * Represents an XML &lt;aggregate/&gt; element
@@ -46,7 +47,7 @@ import org.apache.camel.spi.RouteContext
  */
 @XmlRootElement(name = "aggregate")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition> implements ExecutorServiceAware<AggregateDefinition> {
+public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition> implements ExecutorServiceAwareDefinition<AggregateDefinition> {
     @XmlElement(name = "correlationExpression", required = true)
     private ExpressionSubElementDefinition correlationExpression;
     @XmlElement(name = "completionPredicate", required = false)
@@ -150,8 +151,8 @@ public class AggregateDefinition extends
 
         AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(), processor, correlation, strategy);
 
-        ExecutorService executor = createExecutorService(routeContext);
-        answer.setExecutorService(executor);
+        executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
+        answer.setExecutorService(executorService);
         if (isParallelProcessing() != null) {
             answer.setParallelProcessing(isParallelProcessing());
         }
@@ -220,16 +221,6 @@ public class AggregateDefinition extends
         return repository;
     }
 
-    private ExecutorService createExecutorService(RouteContext routeContext) {
-        if (executorService == null && executorServiceRef != null) {
-            executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
-            if (executorService == null) {
-                throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef + " not found in registry.");
-            }
-        }
-        return executorService;
-    }
-
     public AggregationStrategy getAggregationStrategy() {
         return aggregationStrategy;
     }

Copied: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java (from r921818, camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAware.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java?p2=camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java&p1=camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAware.java&r1=921818&r2=922155&rev=922155&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAware.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java Fri Mar 12 06:57:08 2010
@@ -18,12 +18,16 @@ package org.apache.camel.model;
 
 import java.util.concurrent.ExecutorService;
 
+import org.apache.camel.ExecutorServiceAware;
+
 /**
  * Enables definitions to support concurrency using {@link java.util.concurrent.ExecutorService}
  *
  * @version $Revision$
+ * @see org.apache.camel.util.concurrent.ExecutorServiceHelper#getConfiguredExecutorService(org.apache.camel.spi.RouteContext,
+ *                                                                                          ExecutorServiceAwareDefinition)
  */
-public interface ExecutorServiceAware<Type extends ProcessorDefinition> {
+public interface ExecutorServiceAwareDefinition<Type extends ProcessorDefinition> extends ExecutorServiceAware {
 
     /**
      * Setting the executor service for executing
@@ -31,7 +35,7 @@ public interface ExecutorServiceAware<Ty
      * @param executorService the executor service
      * @return the builder
      */
-    <Type> Type executorService(ExecutorService executorService);
+    Type executorService(ExecutorService executorService);
 
     /**
      * Setting the executor service for executing
@@ -40,14 +44,6 @@ public interface ExecutorServiceAware<Ty
      *                           to lookup in the {@link org.apache.camel.spi.Registry}
      * @return the builder
      */
-    <Type> Type executorServiceRef(String executorServiceRef);
-
-    ExecutorService getExecutorService();
-
-    void setExecutorService(ExecutorService executorService);
-
-    String getExecutorServiceRef();
-
-    void setExecutorServiceRef(String executorServiceRef);
+    Type executorServiceRef(String executorServiceRef);
 
 }

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java?rev=922155&r1=922154&r2=922155&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java Fri Mar 12 06:57:08 2010
@@ -29,6 +29,7 @@ import org.apache.camel.processor.Multic
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 
 /**
  * Represents an XML &lt;multicast/&gt; element
@@ -37,7 +38,7 @@ import org.apache.camel.spi.RouteContext
  */
 @XmlRootElement(name = "multicast")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class MulticastDefinition extends OutputDefinition<ProcessorDefinition> implements ExecutorServiceAware<MulticastDefinition> {
+public class MulticastDefinition extends OutputDefinition<ProcessorDefinition> implements ExecutorServiceAwareDefinition<MulticastDefinition> {
     @XmlAttribute(required = false)
     private Boolean parallelProcessing;
     @XmlAttribute(required = false)
@@ -146,12 +147,7 @@ public class MulticastDefinition extends
             // default to use latest aggregation strategy
             aggregationStrategy = new UseLatestAggregationStrategy();
         }
-        if (executorServiceRef != null) {
-            executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
-            if (executorService == null) {
-                throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef + " not found in registry.");
-            }
-        }
+        executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
 
         return new MulticastProcessor(routeContext.getCamelContext(), list, aggregationStrategy, isParallelProcessing(),
                                       executorService, isStreaming(), isStopOnException());

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java?rev=922155&r1=922154&r2=922155&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java Fri Mar 12 06:57:08 2010
@@ -34,6 +34,7 @@ import org.apache.camel.builder.Expressi
 import org.apache.camel.processor.OnCompletionProcessor;
 import org.apache.camel.processor.UnitOfWorkProcessor;
 import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 
 /**
  * Represents an XML &lt;onCompletion/&gt; element
@@ -42,7 +43,7 @@ import org.apache.camel.spi.RouteContext
  */
 @XmlRootElement(name = "onCompletion")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class OnCompletionDefinition extends ProcessorDefinition<ProcessorDefinition> implements ExecutorServiceAware<OnCompletionDefinition> {
+public class OnCompletionDefinition extends ProcessorDefinition<ProcessorDefinition> implements ExecutorServiceAwareDefinition<OnCompletionDefinition> {
 
     @XmlAttribute(required = false)
     private Boolean onCompleteOnly = Boolean.FALSE;
@@ -82,6 +83,10 @@ public class OnCompletionDefinition exte
 
     @Override
     public Processor createProcessor(RouteContext routeContext) throws Exception {
+        if (onCompleteOnly && onFailureOnly) {
+            throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this);
+        }
+
         Processor childProcessor = createOutputsProcessor(routeContext);
 
         // wrap the on completion route in a unit of work processor
@@ -92,19 +97,9 @@ public class OnCompletionDefinition exte
             when = onWhen.getExpression().createPredicate(routeContext);
         }
 
-        if (onCompleteOnly && onFailureOnly) {
-            throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this);
-        }
-
-        if (executorServiceRef != null) {
-            executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
-            if (executorService == null) {
-                throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef + " not found in registry.");
-            }
-        }
-
         OnCompletionProcessor answer = new OnCompletionProcessor(routeContext.getCamelContext(), childProcessor,
                                                                  onCompleteOnly, onFailureOnly, when);
+        executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
         answer.setExecutorService(executorService);
         return answer;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java?rev=922155&r1=922154&r2=922155&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java Fri Mar 12 06:57:08 2010
@@ -32,6 +32,7 @@ import org.apache.camel.processor.Recipi
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 
 /**
  * Represents an XML &lt;recipientList/&gt; element
@@ -40,7 +41,7 @@ import org.apache.camel.spi.RouteContext
  */
 @XmlRootElement(name = "recipientList")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class RecipientListDefinition extends ExpressionNode implements ExecutorServiceAware<RecipientListDefinition> {
+public class RecipientListDefinition extends ExpressionNode implements ExecutorServiceAwareDefinition<RecipientListDefinition> {
 
     @XmlTransient
     private AggregationStrategy aggregationStrategy;
@@ -97,7 +98,12 @@ public class RecipientListDefinition ext
         }
         
         answer.setAggregationStrategy(createAggregationStrategy(routeContext));
-        answer.setExecutorService(createExecutorService(routeContext));
+        executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
+        if (executorService == null) {
+            // fallback to create a new executor
+            executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("RecipientList");
+        }
+        answer.setExecutorService(executorService);
 
         return answer;
     }
@@ -113,20 +119,6 @@ public class RecipientListDefinition ext
         return aggregationStrategy;
     }
 
-    private ExecutorService createExecutorService(RouteContext routeContext) {
-        if (executorService == null && executorServiceRef != null) {
-            executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
-            if (executorService == null) {
-                throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef + " not found in registry.");
-            }
-        }
-        if (executorService == null) {
-            // fall back and use default
-            executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("RecipientList");
-        }
-        return executorService;
-    }
-
     @Override
     @SuppressWarnings("unchecked")
     public List<ProcessorDefinition> getOutputs() {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java?rev=922155&r1=922154&r2=922155&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java Fri Mar 12 06:57:08 2010
@@ -278,6 +278,8 @@ public class RouteDefinition extends Pro
 
     /**
      * Disable stream caching for this route.
+     * 
+     * @return the builder
      */
     public RouteDefinition noStreamCaching() {
         setStreamCache(Boolean.FALSE);
@@ -287,6 +289,8 @@ public class RouteDefinition extends Pro
 
     /**
      * Enable stream caching for this route.
+     * 
+     * @return the builder
      */
     public RouteDefinition streamCaching() {
         setStreamCache(Boolean.TRUE);
@@ -301,6 +305,8 @@ public class RouteDefinition extends Pro
 
     /**
      * Disable tracing for this route.
+     * 
+     * @return the builder
      */
     public RouteDefinition noTracing() {
         setTrace(false);
@@ -309,6 +315,8 @@ public class RouteDefinition extends Pro
 
     /**
      * Enable tracing for this route.
+     * 
+     * @return the builder
      */
     public RouteDefinition tracing() {
         setTrace(true);
@@ -317,6 +325,8 @@ public class RouteDefinition extends Pro
 
     /**
      * Disable handle fault for this route.
+     * 
+     * @return the builder
      */
     public RouteDefinition noHandleFault() {
         setHandleFault(false);
@@ -325,6 +335,8 @@ public class RouteDefinition extends Pro
 
     /**
      * Enable handle fault for this route.
+     * 
+     * @return the builder
      */
     public RouteDefinition handleFault() {
         setHandleFault(true);
@@ -333,6 +345,8 @@ public class RouteDefinition extends Pro
 
     /**
      * Disable delayer for this route.
+     * 
+     * @return the builder
      */
     public RouteDefinition noDelayer() {
         setDelayer(0L);
@@ -343,6 +357,7 @@ public class RouteDefinition extends Pro
      * Enable delayer for this route.
      *
      * @param delay delay in millis
+     * @return the builder
      */
     public RouteDefinition delayer(long delay) {
         setDelayer(delay);
@@ -362,6 +377,8 @@ public class RouteDefinition extends Pro
 
     /**
      * Disables this route from being auto started when Camel starts.
+     * 
+     * @return the builder
      */
     public RouteDefinition noAutoStartup() {
         setAutoStartup(Boolean.FALSE);
@@ -375,7 +392,7 @@ public class RouteDefinition extends Pro
      * Camel will stop routes in reverse order when its stopping.
      *
      * @param order the order represented as a number
-     * @return this builder
+     * @return the builder
      */
     public RouteDefinition startupOrder(int order) {
         setStartupOrder(order);
@@ -383,8 +400,11 @@ public class RouteDefinition extends Pro
     }
 
     /**
-     * Disables this route from being auto started when Camel starts.
-     */
+     * Configures a route policy for this route
+     *
+     * @param routePolicy the route policy
+     * @return the builder
+     */ 
     public RouteDefinition routePolicy(RoutePolicy routePolicy) {
         setRoutePolicy(routePolicy);
         return this;
@@ -394,6 +414,7 @@ public class RouteDefinition extends Pro
      * Configures a route policy for this route
      *
      * @param routePolicyRef reference to a {@link RoutePolicy} to lookup and use.
+     * @return the builder
      */
     public RouteDefinition routePolicyRef(String routePolicyRef) {
         setRoutePolicyRef(routePolicyRef);
@@ -404,6 +425,7 @@ public class RouteDefinition extends Pro
      * Configures a shutdown route option.
      *
      * @param shutdownRoute the option to use when shutting down this route
+     * @return the builder
      */
     public RouteDefinition shutdownRoute(ShutdownRoute shutdownRoute) {
         setShutdownRoute(shutdownRoute);
@@ -414,6 +436,7 @@ public class RouteDefinition extends Pro
      * Configures a shutdown running task option.
      *
      * @param shutdownRunningTask the option to use when shutting down and how to act upon running tasks.
+     * @return the builder
      */
     public RouteDefinition shutdownRunningTask(ShutdownRunningTask shutdownRunningTask) {
         setShutdownRunningTask(shutdownRunningTask);
@@ -663,14 +686,23 @@ public class RouteDefinition extends Pro
 
         // configure auto startup
         if (autoStartup != null) {
+            if (log.isDebugEnabled()) {
+                log.debug("Using AutoStartup " + isAutoStartup() + " on route: " + this);
+            }
             routeContext.setAutoStartup(isAutoStartup());
         }
 
         // configure shutdown
         if (shutdownRoute != null) {
+            if (log.isDebugEnabled()) {
+                log.debug("Using ShutdownRoute " + getShutdownRoute() + " on route: " + this);
+            }
             routeContext.setShutdownRoute(getShutdownRoute());
         }
         if (shutdownRunningTask != null) {
+            if (log.isDebugEnabled()) {
+                log.debug("Using ShutdownRunningTask " + getShutdownRunningTask() + " on route: " + this);
+            }
             routeContext.setShutdownRunningTask(getShutdownRunningTask());
         }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java?rev=922155&r1=922154&r2=922155&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java Fri Mar 12 06:57:08 2010
@@ -31,6 +31,7 @@ import org.apache.camel.processor.Splitt
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 
 /**
  * Represents an XML &lt;split/&gt; element
@@ -39,7 +40,7 @@ import org.apache.camel.spi.RouteContext
  */
 @XmlRootElement(name = "split")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class SplitDefinition extends ExpressionNode implements ExecutorServiceAware<SplitDefinition> {
+public class SplitDefinition extends ExpressionNode implements ExecutorServiceAwareDefinition<SplitDefinition> {
     @XmlTransient
     private AggregationStrategy aggregationStrategy;
     @XmlTransient
@@ -84,8 +85,15 @@ public class SplitDefinition extends Exp
     @Override
     public Processor createProcessor(RouteContext routeContext) throws Exception {
         Processor childProcessor = routeContext.createProcessor(this);
+
         aggregationStrategy = createAggregationStrategy(routeContext);
-        executorService = createExecutorService(routeContext);
+
+        executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
+        if (executorService == null) {
+            // fallback to create a new executor
+            executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("Split");
+        }
+
         Expression exp = getExpression().createExpression(routeContext);
         return new Splitter(routeContext.getCamelContext(), exp, childProcessor, aggregationStrategy,
                             isParallelProcessing(), executorService, isStreaming(), isStopOnException());
@@ -103,21 +111,7 @@ public class SplitDefinition extends Exp
         }
         return strategy;
     }        
-    
-    private ExecutorService createExecutorService(RouteContext routeContext) {
-        if (executorService == null && executorServiceRef != null) {
-            executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
-            if (executorService == null) {
-                throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef + " not found in registry.");
-            }
-        }
-        if (executorService == null) {
-            // fall back and use default
-            executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("Split");
-        }
-        return executorService;
-    }
-    
+
     // Fluent API
     // -------------------------------------------------------------------------
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=922155&r1=922154&r2=922155&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java Fri Mar 12 06:57:08 2010
@@ -31,6 +31,7 @@ import org.apache.camel.builder.xml.Time
 import org.apache.camel.processor.ThreadsProcessor;
 import org.apache.camel.processor.UnitOfWorkProcessor;
 import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 
 /**
  * Represents an XML &lt;threads/&gt; element
@@ -39,7 +40,7 @@ import org.apache.camel.spi.RouteContext
  */
 @XmlRootElement(name = "threads")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class ThreadsDefinition extends OutputDefinition<ProcessorDefinition> implements ExecutorServiceAware<ThreadsDefinition> {
+public class ThreadsDefinition extends OutputDefinition<ProcessorDefinition> implements ExecutorServiceAwareDefinition<ThreadsDefinition> {
 
     @XmlTransient
     private ExecutorService executorService;
@@ -61,14 +62,10 @@ public class ThreadsDefinition extends O
 
     @Override
     public Processor createProcessor(RouteContext routeContext) throws Exception {
-        if (executorService == null && executorServiceRef != null) {
-            executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
-            if (executorService == null) {
-                throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef + " not found in registry.");
-            }
-        }
-
+        // prefer any explicit configured executor service
+        executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
         if (executorService == null) {
+            // none was configured so create an executor based on the other parameters
             String name = getThreadName() != null ? getThreadName() : "Threads";
             if (poolSize == null || poolSize <= 0) {
                 // use the cached thread pool

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java?rev=922155&r1=922154&r2=922155&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java Fri Mar 12 06:57:08 2010
@@ -31,6 +31,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.processor.SendAsyncProcessor;
 import org.apache.camel.processor.UnitOfWorkProcessor;
 import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 
 /**
  * Represents an XML &lt;to/&gt; element
@@ -39,7 +40,7 @@ import org.apache.camel.spi.RouteContext
  */
 @XmlRootElement(name = "to")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class ToDefinition extends SendDefinition<ToDefinition> implements ExecutorServiceAware<ToDefinition> {
+public class ToDefinition extends SendDefinition<ToDefinition> implements ExecutorServiceAwareDefinition<ToDefinition> {
     @XmlTransient
     private final List<ProcessorDefinition> outputs = new ArrayList<ProcessorDefinition>();
     @XmlAttribute(required = false)
@@ -89,13 +90,9 @@ public class ToDefinition extends SendDe
         // this code below is only for creating when async is enabled
         // ----------------------------------------------------------
 
-        if (executorServiceRef != null) {
-            executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
-            if (executorService == null) {
-                throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef + " not found in registry.");
-            }
-        }
+        executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
         if (executorService == null && poolSize != null) {
+            // crete a new based on the other options
             executorService = routeContext.getCamelContext().getExecutorServiceStrategy()
                                 .newThreadPool("ToAsync[" + getLabel() + "]", poolSize, poolSize);
         }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java?rev=922155&r1=922154&r2=922155&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java Fri Mar 12 06:57:08 2010
@@ -30,6 +30,7 @@ import org.apache.camel.Expression;
 import org.apache.camel.Processor;
 import org.apache.camel.processor.WireTapProcessor;
 import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 
 /**
  * Represents an XML &lt;wireTap/&gt; element
@@ -38,7 +39,7 @@ import org.apache.camel.spi.RouteContext
  */
 @XmlRootElement(name = "wireTap")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class WireTapDefinition extends SendDefinition<WireTapDefinition> implements ExecutorServiceAware<ProcessorDefinition> {
+public class WireTapDefinition extends SendDefinition<WireTapDefinition> implements ExecutorServiceAwareDefinition<ProcessorDefinition> {
 
     @XmlTransient
     private Processor newExchangeProcessor;
@@ -75,12 +76,7 @@ public class WireTapDefinition extends S
             answer.setNewExchangeExpression(newExchangeExpression.createExpression(routeContext));
         }
 
-        if (executorServiceRef != null) {
-            executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
-            if (executorService == null) {
-                throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef + " not found in registry.");
-            }
-        }
+        executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
         answer.setExecutorService(executorService);
 
         return answer;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=922155&r1=922154&r2=922155&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Fri Mar 12 06:57:08 2010
@@ -25,14 +25,23 @@ import java.util.concurrent.ThreadPoolEx
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.camel.model.ExecutorServiceAwareDefinition;
+import org.apache.camel.spi.ExecutorServiceStrategy;
+import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.ObjectHelper;
+
 /**
  * Helper for {@link java.util.concurrent.ExecutorService} to construct executors using a thread factory that
  * create thread names with Camel prefix.
+ * <p/>
+ * This helper should <b>NOT</b> be used by end users of Camel, as you should use
+ * {@link org.apache.camel.spi.ExecutorServiceStrategy} which you obtain from {@link org.apache.camel.CamelContext}
+ * to create thread pools.
+ * <p/>
+ * This helper should only be used internally in Camel.
  *
  * @version $Revision$
- * @deprecated replaced with {@link org.apache.camel.spi.ExecutorServiceStrategy}
  */
-@Deprecated
 public final class ExecutorServiceHelper {
 
     private static AtomicInteger threadCounter = new AtomicInteger();
@@ -160,4 +169,39 @@ public final class ExecutorServiceHelper
         return answer;
     }
 
+    /**
+     * Will lookup and get the configured {@link java.util.concurrent.ExecutorService} from the given definition.
+     * <p/>
+     * This method will lookup for configured thread pool in the following order
+     * <ul>
+     *   <li>from the definition if any explicit configured executor service.</li>
+     *   <li>if none found, then <tt>null</tt> is returned.</li>
+     * </ul>
+     * The various {@link ExecutorServiceAwareDefinition} should use this helper method to ensure they support
+     * configured executor services in the same coherent way.
+     *
+     * @param routeContext  the rout context
+     * @param definition    the node definition which may leverage executor service.
+     * @return the configured executor service, or <tt>null</tt> if none was configured.
+     * @throws IllegalArgumentException is thrown if lookup of executor service in {@link org.apache.camel.spi.Registry} was not found
+     */
+    public static ExecutorService getConfiguredExecutorService(RouteContext routeContext,
+                                                               ExecutorServiceAwareDefinition definition) throws IllegalArgumentException {
+        ExecutorServiceStrategy strategy = routeContext.getCamelContext().getExecutorServiceStrategy();
+        ObjectHelper.notNull(strategy, "ExecutorServiceStrategy", routeContext.getCamelContext());
+
+        // prefer to use explicit configured executor on the definition
+        if (definition.getExecutorService() != null) {
+            return definition.getExecutorService();
+        } else if (definition.getExecutorServiceRef() != null) {
+            ExecutorService answer = strategy.lookup(definition.getExecutorServiceRef());
+            if (answer == null) {
+                throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " not found in registry.");
+            }
+            return answer;
+        }
+
+        return null;
+    }
+
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java?rev=922155&r1=922154&r2=922155&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java Fri Mar 12 06:57:08 2010
@@ -33,9 +33,16 @@ public class SplitterWithCustomThreadPoo
 
     public void testSplitterWithCustomThreadPoolExecutor() throws Exception {
         ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) getSplitter().getExecutorService();
+        if (threadPoolExecutor == null) {
+            threadPoolExecutor = context.getRegistry().lookup(getSplitter().getExecutorServiceRef(), ThreadPoolExecutor.class);
+        }
         // this should be sufficient as core pool size is the only thing I changed from the default
-        assertTrue(threadPoolExecutor.getCorePoolSize() == customThreadPoolExecutor.getCorePoolSize());
-        assertTrue(threadPoolExecutor.getMaximumPoolSize() == customThreadPoolExecutor.getMaximumPoolSize());
+        assertTrue(threadPoolExecutor.getCorePoolSize() == getThreadPoolExecutor().getCorePoolSize());
+        assertTrue(threadPoolExecutor.getMaximumPoolSize() == getThreadPoolExecutor().getMaximumPoolSize());
+    }
+
+    protected ThreadPoolExecutor getThreadPoolExecutor() {
+        return customThreadPoolExecutor;
     }
     
     protected SplitDefinition getSplitter() {

Modified: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringSplitterWithCustomThreadPoolExecutorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringSplitterWithCustomThreadPoolExecutorTest.java?rev=922155&r1=922154&r2=922155&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringSplitterWithCustomThreadPoolExecutorTest.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringSplitterWithCustomThreadPoolExecutorTest.java Fri Mar 12 06:57:08 2010
@@ -16,11 +16,20 @@
  */
 package org.apache.camel.spring.processor;
 
+import java.util.concurrent.ThreadPoolExecutor;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.processor.SplitterWithCustomThreadPoolExecutorTest;
+
 import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
 
 public class SpringSplitterWithCustomThreadPoolExecutorTest extends SplitterWithCustomThreadPoolExecutorTest {
+
+    @Override
+    protected ThreadPoolExecutor getThreadPoolExecutor() {
+        return context.getRegistry().lookup("myThreadPoolExecutor", ThreadPoolExecutor.class);
+    }
+
     protected CamelContext createCamelContext() throws Exception {
         return createSpringCamelContext(this, "org/apache/camel/spring/processor/splitterWithCustomThreadPoolExecutor.xml");
     }

Modified: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/splitterWithCustomThreadPoolExecutor.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/splitterWithCustomThreadPoolExecutor.xml?rev=922155&r1=922154&r2=922155&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/splitterWithCustomThreadPoolExecutor.xml (original)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/splitterWithCustomThreadPoolExecutor.xml Fri Mar 12 06:57:08 2010
@@ -26,7 +26,7 @@
   <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
     <route>
       <from uri="direct:parallel-custom-pool"/>
-      <split executorServiceRef="threadPoolExecutor">
+      <split executorServiceRef="myThreadPoolExecutor">
         <xpath>/invoice/lineItems</xpath>
         <to uri="mock:result"/>
       </split>
@@ -35,7 +35,7 @@
 
   <!-- There's an easier way of specifying constructor args, just can't remember it
        at the moment... old Spring syntax will do for now! -->
-  <bean id="threadPoolExecutor" class="java.util.concurrent.ThreadPoolExecutor">
+  <bean id="myThreadPoolExecutor" class="java.util.concurrent.ThreadPoolExecutor">
     <constructor-arg index="0" value="8"/>
     <constructor-arg index="1" value="16"/>
     <constructor-arg index="2" value="0"/>