You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/03/20 15:54:12 UTC

svn commit: r925619 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/builder/ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/model/ camel-core/src/main/ja...

Author: davsclaus
Date: Sat Mar 20 14:54:11 2010
New Revision: 925619

URL: http://svn.apache.org/viewvc?rev=925619&view=rev
Log:
CAMEL-1588: Lookup thread pool will fallback to use thread pool profiles if not found in Registry, but a profile id matched the ref name.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateThreadPoolProfileTest.java
      - copied, changed from r925107, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutTest.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateThreadPoolProfileTest.java
      - copied, changed from r925107, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.java
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateThreadPoolProfileTest.xml
      - copied, changed from r925107, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.xml
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolBuilder.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java Sat Mar 20 14:54:11 2010
@@ -43,7 +43,6 @@ import org.apache.camel.spi.PackageScanC
 import org.apache.camel.spi.Registry;
 import org.apache.camel.spi.ServicePool;
 import org.apache.camel.spi.ShutdownStrategy;
-import org.apache.camel.spi.ThreadPoolProfile;
 import org.apache.camel.spi.TypeConverterRegistry;
 
 /**
@@ -461,7 +460,9 @@ public interface CamelContext extends Se
     List<String> getLanguageNames();
 
     /**
-     * Creates a new ProducerTemplate.
+     * Creates a new {@link ProducerTemplate} which is <b>not</b> started.
+     * <p/>
+     * You <b>must</b> start the template before its being used.
      * <p/>
      * See this FAQ before use: <a href="http://camel.apache.org/why-does-camel-use-too-many-threads-with-producertemplate.html">
      * Why does Camel use too many threads with ProducerTemplate?</a>
@@ -471,7 +472,9 @@ public interface CamelContext extends Se
     ProducerTemplate createProducerTemplate();
 
     /**
-     * Creates a new ConsumerTemplate.
+     * Creates a new {@link ConsumerTemplate} which is <b>not</b> started.
+     * <p/>
+     * You <b>must</b> start the template before its being used.
      * <p/>
      * See this FAQ before use: <a href="http://camel.apache.org/why-does-camel-use-too-many-threads-with-producertemplate.html">
      * Why does Camel use too many threads with ProducerTemplate?</a> as it also applies for ConsumerTemplate.

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolBuilder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolBuilder.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolBuilder.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolBuilder.java Sat Mar 20 14:54:11 2010
@@ -70,17 +70,6 @@ public final class ThreadPoolBuilder {
     }
 
     /**
-     * Lookup a {@link java.util.concurrent.ExecutorService} from the {@link org.apache.camel.spi.Registry}.
-     *
-     * @param source             the source object, usually it should be <tt>this</tt> passed in as parameter
-     * @param executorServiceRef reference to lookup
-     * @return the {@link java.util.concurrent.ExecutorService} or <tt>null</tt> if not found
-     */
-    public ExecutorService lookup(Object source, String executorServiceRef) {
-        return camelContext.getExecutorServiceStrategy().lookup(source, executorServiceRef);
-    }
-
-    /**
      * Builds the new thread pool
      *
      * @param name name which is appended to the thread name

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java Sat Mar 20 14:54:11 2010
@@ -134,11 +134,24 @@ public class DefaultExecutorServiceStrat
         this.threadNamePattern = threadNamePattern;
     }
 
-    public ExecutorService lookup(Object source, String executorServiceRef) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Looking up ExecutorService with ref: " + executorServiceRef);
+    public ExecutorService lookup(Object source, String name, String executorServiceRef) {
+        ExecutorService answer = camelContext.getRegistry().lookup(executorServiceRef, ExecutorService.class);
+        if (answer != null && LOG.isDebugEnabled()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Looking up ExecutorService with ref: " + executorServiceRef + " and found it from Registry: " + answer);
+            }
         }
-        return camelContext.getRegistry().lookup(executorServiceRef, ExecutorService.class);
+
+        if (answer == null) {
+            // try to see if we got a thread pool profile with that id
+            answer = newThreadPool(source, name, executorServiceRef);
+            if (answer != null && LOG.isDebugEnabled()) {
+                LOG.debug("Looking up ExecutorService with ref: " + executorServiceRef
+                        + " and found a matching ThreadPoolProfile to create the ExecutorService: " + answer);
+            }
+        }
+
+        return answer;
     }
 
     public ExecutorService newDefaultThreadPool(Object source, String name) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java Sat Mar 20 14:54:11 2010
@@ -505,6 +505,9 @@ public class DefaultProducerTemplate ext
             }
         };
 
+        if (executor == null) {
+            throw new IllegalStateException("ProducerTemplate has not been started");
+        }
         return executor.submit(task);
     }
 
@@ -515,6 +518,9 @@ public class DefaultProducerTemplate ext
             }
         };
 
+        if (executor == null) {
+            throw new IllegalStateException("ProducerTemplate has not been started");
+        }
         return executor.submit(task);
     }
 
@@ -526,6 +532,9 @@ public class DefaultProducerTemplate ext
             }
         };
 
+        if (executor == null) {
+            throw new IllegalStateException("ProducerTemplate has not been started");
+        }
         return executor.submit(task);
     }
 
@@ -537,6 +546,9 @@ public class DefaultProducerTemplate ext
             }
         };
 
+        if (executor == null) {
+            throw new IllegalStateException("ProducerTemplate has not been started");
+        }
         return executor.submit(task);
     }
 
@@ -549,6 +561,9 @@ public class DefaultProducerTemplate ext
             }
         };
 
+        if (executor == null) {
+            throw new IllegalStateException("ProducerTemplate has not been started");
+        }
         return executor.submit(task);
     }
 
@@ -560,6 +575,9 @@ public class DefaultProducerTemplate ext
             }
         };
 
+        if (executor == null) {
+            throw new IllegalStateException("ProducerTemplate has not been started");
+        }
         return executor.submit(task);
     }
 
@@ -570,6 +588,9 @@ public class DefaultProducerTemplate ext
             }
         };
 
+        if (executor == null) {
+            throw new IllegalStateException("ProducerTemplate has not been started");
+        }
         return executor.submit(task);
     }
 
@@ -580,6 +601,9 @@ public class DefaultProducerTemplate ext
             }
         };
 
+        if (executor == null) {
+            throw new IllegalStateException("ProducerTemplate has not been started");
+        }
         return executor.submit(task);
     }
 
@@ -592,6 +616,9 @@ public class DefaultProducerTemplate ext
             }
         };
 
+        if (executor == null) {
+            throw new IllegalStateException("ProducerTemplate has not been started");
+        }
         return executor.submit(task);
     }
    
@@ -620,6 +647,9 @@ public class DefaultProducerTemplate ext
             }
         };
 
+        if (executor == null) {
+            throw new IllegalStateException("ProducerTemplate has not been started");
+        }
         return executor.submit(task);
     }
 
@@ -641,6 +671,9 @@ public class DefaultProducerTemplate ext
             }
         };
 
+        if (executor == null) {
+            throw new IllegalStateException("ProducerTemplate has not been started");
+        }
         return executor.submit(task);
     }
 
@@ -662,6 +695,9 @@ public class DefaultProducerTemplate ext
             }
         };
 
+        if (executor == null) {
+            throw new IllegalStateException("ProducerTemplate has not been started");
+        }
         return executor.submit(task);
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Sat Mar 20 14:54:11 2010
@@ -150,7 +150,7 @@ public class AggregateDefinition extends
         AggregationStrategy strategy = createAggregationStrategy(routeContext);
 
         // executor service is mandatory for the Aggregator
-        executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
+        executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, "Aggregator", this);
         if (executorService == null) {
             if (isParallelProcessing()) {
                 // we are running in parallel so create a cached thread pool which grows/shrinks automatic

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java Sat Mar 20 14:54:11 2010
@@ -24,7 +24,7 @@ import org.apache.camel.ExecutorServiceA
  * Enables definitions to support concurrency using {@link java.util.concurrent.ExecutorService}
  *
  * @version $Revision$
- * @see org.apache.camel.util.concurrent.ExecutorServiceHelper#getConfiguredExecutorService(org.apache.camel.spi.RouteContext,
+ * @see org.apache.camel.util.concurrent.ExecutorServiceHelper#getConfiguredExecutorService(org.apache.camel.spi.RouteContext, String name,
  *                                                                                          ExecutorServiceAwareDefinition)
  */
 public interface ExecutorServiceAwareDefinition<Type extends ProcessorDefinition> extends ExecutorServiceAware {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java Sat Mar 20 14:54:11 2010
@@ -148,7 +148,7 @@ public class MulticastDefinition extends
             aggregationStrategy = new UseLatestAggregationStrategy();
         }
 
-        executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
+        executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, "Multicast", this);
         if (isParallelProcessing() && executorService == null) {
             // we are running in parallel so create a cached thread pool which grows/shrinks automatic
             executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, "Multicast");

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java Sat Mar 20 14:54:11 2010
@@ -97,7 +97,7 @@ public class OnCompletionDefinition exte
             when = onWhen.getExpression().createPredicate(routeContext);
         }
 
-        executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
+        executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, "OnCompletion", this);
         if (executorService == null) {
             executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, "OnCompletion");
         }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java Sat Mar 20 14:54:11 2010
@@ -95,7 +95,7 @@ public class RecipientListDefinition ext
             answer.setStopOnException(isStopOnException());
         }
 
-        executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
+        executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, "RecipientList", this);
         if (isParallelProcessing() && executorService == null) {
             // we are running in parallel so create a cached thread pool which grows/shrinks automatic
             executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, "RecipientList");

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java Sat Mar 20 14:54:11 2010
@@ -88,7 +88,7 @@ public class SplitDefinition extends Exp
 
         aggregationStrategy = createAggregationStrategy(routeContext);
 
-        executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
+        executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, "Split", this);
         if (isParallelProcessing() && executorService == null) {
             // we are running in parallel so create a cached thread pool which grows/shrinks automatic
             executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, "Split");

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java Sat Mar 20 14:54:11 2010
@@ -70,11 +70,13 @@ public class ThreadsDefinition extends O
 
     @Override
     public Processor createProcessor(RouteContext routeContext) throws Exception {
+        // The threads name
+        String name = getThreadName() != null ? getThreadName() : "Threads";
+
         // prefer any explicit configured executor service
-        executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
+        executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, name, this);
         if (executorService == null) {
             // none was configured so create an executor based on the other parameters
-            String name = getThreadName() != null ? getThreadName() : "Threads";
             if (poolSize == null || poolSize <= 0) {
                 // use the cached thread pool
                 executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, name);
@@ -92,8 +94,7 @@ public class ThreadsDefinition extends O
                 }
 
                 executorService = routeContext.getCamelContext().getExecutorServiceStrategy()
-                                        .newThreadPool(this, name, poolSize, max, keepAlive, tu,
-                                                       maxQueue, rejected, true);
+                                        .newThreadPool(this, name, poolSize, max, keepAlive, tu, maxQueue, rejected, true);
             }
         }
         Processor childProcessor = routeContext.createProcessor(this);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java Sat Mar 20 14:54:11 2010
@@ -103,7 +103,7 @@ public class ToDefinition extends SendDe
 
         SendAsyncProcessor async = new SendAsyncProcessor(endpoint, getPattern(), uow);
 
-        executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
+        executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, "ToAsync", this);
         if (executorService != null) {
             async.setExecutorService(executorService);
         }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java Sat Mar 20 14:54:11 2010
@@ -67,7 +67,7 @@ public class WireTapDefinition extends S
     public Processor createProcessor(RouteContext routeContext) throws Exception {
         Endpoint endpoint = resolveEndpoint(routeContext);
 
-        executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
+        executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, "WireTap", this);
         if (executorService == null) {
             executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, "WireTap");
         }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java Sat Mar 20 14:54:11 2010
@@ -100,13 +100,15 @@ public interface ExecutorServiceStrategy
     void setThreadNamePattern(String pattern) throws IllegalArgumentException;
 
     /**
-     * Lookup a {@link java.util.concurrent.ExecutorService} from the {@link org.apache.camel.spi.Registry}.
+     * Lookup a {@link java.util.concurrent.ExecutorService} from the {@link org.apache.camel.spi.Registry}
+     * and from known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.
      *
      * @param source               the source object, usually it should be <tt>this</tt> passed in as parameter
+     * @param name                 name which is appended to the thread name
      * @param executorServiceRef   reference to lookup
      * @return the {@link java.util.concurrent.ExecutorService} or <tt>null</tt> if not found
      */
-    ExecutorService lookup(Object source, String executorServiceRef);
+    ExecutorService lookup(Object source, String name, String executorServiceRef);
 
     /**
      * Creates a new thread pool using the default thread pool profile.

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Sat Mar 20 14:54:11 2010
@@ -222,18 +222,22 @@ public final class ExecutorServiceHelper
      * <p/>
      * This method will lookup for configured thread pool in the following order
      * <ul>
-     * <li>from the definition if any explicit configured executor service.</li>
-     * <li>if none found, then <tt>null</tt> is returned.</li>
+     *   <li>from the definition if any explicit configured executor service.</li>
+     *   <li>from the {@link org.apache.camel.spi.Registry} if found</li>
+     *   <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.</li>
+     *   <li>if none found, then <tt>null</tt> is returned.</li>
      * </ul>
      * The various {@link ExecutorServiceAwareDefinition} should use this helper method to ensure they support
      * configured executor services in the same coherent way.
      *
-     * @param routeContext the rout context
-     * @param definition   the node definition which may leverage executor service.
+     * @param routeContext   the rout context
+     * @param name           name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService}
+     *                       is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}.
+     * @param definition     the node definition which may leverage executor service.
      * @return the configured executor service, or <tt>null</tt> if none was configured.
      * @throws IllegalArgumentException is thrown if lookup of executor service in {@link org.apache.camel.spi.Registry} was not found
      */
-    public static ExecutorService getConfiguredExecutorService(RouteContext routeContext,
+    public static ExecutorService getConfiguredExecutorService(RouteContext routeContext, String name,
                                                                ExecutorServiceAwareDefinition definition) throws IllegalArgumentException {
         ExecutorServiceStrategy strategy = routeContext.getCamelContext().getExecutorServiceStrategy();
         ObjectHelper.notNull(strategy, "ExecutorServiceStrategy", routeContext.getCamelContext());
@@ -242,7 +246,7 @@ public final class ExecutorServiceHelper
         if (definition.getExecutorService() != null) {
             return definition.getExecutorService();
         } else if (definition.getExecutorServiceRef() != null) {
-            ExecutorService answer = strategy.lookup(definition, definition.getExecutorServiceRef());
+            ExecutorService answer = strategy.lookup(definition, name, definition.getExecutorServiceRef());
             if (answer == null) {
                 throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " not found in registry.");
             }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java Sat Mar 20 14:54:11 2010
@@ -37,17 +37,6 @@ public class ThreadPoolBuilderTest exten
         return jndi;
     }
 
-    public void testThreadPoolBuilderLookup() throws Exception {
-        ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
-        ExecutorService executor = builder.lookup(this, "someonesPool");
-        assertNotNull(executor);
-
-        assertEquals(false, executor.isShutdown());
-        context.stop();
-        // you need to manage this pool yourself
-        assertEquals(false, executor.isShutdown());
-    }
-
     public void testThreadPoolBuilderDefault() throws Exception {
         ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
         ExecutorService executor = builder.build(this, "myPool");

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java?rev=925619&r1=925618&r2=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java Sat Mar 20 14:54:11 2010
@@ -267,4 +267,33 @@ public class DefaultExecutorServiceStrat
         assertTrue(tp.isShutdown());
     }
 
+    public void testLookupThreadPoolProfile() throws Exception {
+        ExecutorService pool = context.getExecutorServiceStrategy().lookup(this, "Cool", "fooProfile");
+        // does not exists yet
+        assertNull(pool);
+
+        assertNull(context.getExecutorServiceStrategy().getThreadPoolProfile("fooProfile"));
+
+        ThreadPoolProfileSupport foo = new ThreadPoolProfileSupport("fooProfile");
+        foo.setKeepAliveTime(20L);
+        foo.setMaxPoolSize(40);
+        foo.setPoolSize(5);
+        foo.setMaxQueueSize(2000);
+
+        context.getExecutorServiceStrategy().registerThreadPoolProfile(foo);
+
+        pool = context.getExecutorServiceStrategy().lookup(this, "Cool", "fooProfile");
+        assertNotNull(pool);
+
+        ThreadPoolExecutor tp = assertIsInstanceOf(ThreadPoolExecutor.class, pool);
+        assertEquals(20, tp.getKeepAliveTime(TimeUnit.SECONDS));
+        assertEquals(40, tp.getMaximumPoolSize());
+        assertEquals(5, tp.getCorePoolSize());
+        assertFalse(tp.isShutdown());
+
+        context.stop();
+
+        assertTrue(tp.isShutdown());
+    }
+
 }

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateThreadPoolProfileTest.java (from r925107, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateThreadPoolProfileTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateThreadPoolProfileTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutTest.java&r1=925107&r2=925619&rev=925619&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateThreadPoolProfileTest.java Sat Mar 20 14:54:11 2010
@@ -16,28 +16,24 @@
  */
 package org.apache.camel.processor.aggregator;
 
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ThreadPoolRejectedPolicy;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.ThreadPoolProfileSupport;
 import org.apache.camel.processor.BodyInAggregatingStrategy;
+import org.apache.camel.spi.ThreadPoolProfile;
 
 /**
  * @version $Revision$
  */
-public class AggregateExpressionTimeoutTest extends ContextTestSupport {
+public class AggregateThreadPoolProfileTest extends ContextTestSupport {
 
-    public void testAggregateExpressionTimeout() throws Exception {
+    public void testAggregateThreadPoolProfile() throws Exception {
         getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C");
 
-        Map headers = new HashMap();
-        headers.put("id", 123);
-        headers.put("timeout", 2000);
-
-        template.sendBodyAndHeaders("direct:start", "A", headers);
-        template.sendBodyAndHeaders("direct:start", "B", headers);
-        template.sendBodyAndHeaders("direct:start", "C", headers);
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
 
         assertMockEndpointsSatisfied();
     }
@@ -47,15 +43,19 @@ public class AggregateExpressionTimeoutT
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                // START SNIPPET: e1
+                // create and register thread pool profile
+                ThreadPoolProfile profile = new ThreadPoolProfileSupport("myProfile");
+                profile.setPoolSize(2);
+                profile.setMaxPoolSize(8);
+                profile.setRejectedPolicy(ThreadPoolRejectedPolicy.Abort);
+                context.getExecutorServiceStrategy().registerThreadPoolProfile(profile);
+
                 from("direct:start")
-                    // aggregate all exchanges correlated by the id header.
-                    // Aggregate them using the BodyInAggregatingStrategy strategy which
-                    // and the timeout header contains the timeout in millis of inactivity them timeout and complete the aggregation
-                    // and send it to mock:aggregated
-                    .aggregate(header("id"), new BodyInAggregatingStrategy()).completionTimeout(header("timeout"))
+                    .aggregate(header("id"), new BodyInAggregatingStrategy())
+                        // use our custom thread pool profile
+                        .completionSize(3).executorServiceRef("myProfile")
+                        .to("log:foo")
                         .to("mock:aggregated");
-                // END SNIPPET: e1
             }
         };
     }

Copied: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateThreadPoolProfileTest.java (from r925107, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateThreadPoolProfileTest.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateThreadPoolProfileTest.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.java&r1=925107&r2=925619&rev=925619&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateThreadPoolProfileTest.java Sat Mar 20 14:54:11 2010
@@ -17,17 +17,17 @@
 package org.apache.camel.spring.processor.aggregator;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.processor.aggregator.AggregateExpressionSizeFallbackTest;
+import org.apache.camel.processor.aggregator.AggregateThreadPoolProfileTest;
 
 import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
 
 /**
  * @version $Revision$
  */
-public class SpringAggregateExpressionSizeFallbackTest extends AggregateExpressionSizeFallbackTest {
+public class SpringAggregateThreadPoolProfileTest extends AggregateThreadPoolProfileTest {
 
     protected CamelContext createCamelContext() throws Exception {
-        return createSpringCamelContext(this, "org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.xml");
+        return createSpringCamelContext(this, "org/apache/camel/spring/processor/aggregator/SpringAggregateThreadPoolProfileTest.xml");
     }
 
 }
\ No newline at end of file

Copied: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateThreadPoolProfileTest.xml (from r925107, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.xml)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateThreadPoolProfileTest.xml?p2=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateThreadPoolProfileTest.xml&p1=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.xml&r1=925107&r2=925619&rev=925619&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.xml (original)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateThreadPoolProfileTest.xml Sat Mar 20 14:54:11 2010
@@ -24,21 +24,24 @@
 
     <!-- START SNIPPET: e1 -->
     <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
+
+        <!-- define a thread pool profile -->
+        <threadPoolProfile id="myProfile" poolSize="2" maxQueueSize="8" rejectedPolicy="Abort"/>
+
         <route>
             <from uri="direct:start"/>
-            <aggregate strategyRef="aggregatorStrategy" completionSize="3">
+            <!-- use the custom thread pool profile in this aggregate EIP, by referring to it -->
+            <aggregate strategyRef="aggregatorStrategy" completionSize="3" executorServiceRef="myProfile">
                 <correlationExpression>
                     <simple>header.id</simple>
-                </correlationExpression>
-                <completionSize>
-                    <header>mySize</header>
-                </completionSize>
+                </correlationExpression>      
+                <to uri="log:foo"/>
                 <to uri="mock:aggregated"/>
             </aggregate>
         </route>
     </camelContext>
+    <!-- END SNIPPET: e1 -->
 
     <bean id="aggregatorStrategy" class="org.apache.camel.processor.BodyInAggregatingStrategy"/>
-    <!-- END SNIPPET: e1 -->
 
 </beans>