You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/02/29 13:18:56 UTC

svn commit: r1295073 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/model/ test/java/org/apache/camel/processor/ test/java/org/apache/camel/processor/aggregator/

Author: davsclaus
Date: Wed Feb 29 12:18:55 2012
New Revision: 1295073

URL: http://svn.apache.org/viewvc?rev=1295073&view=rev
Log:
CAMEL-5053: Fixed some EIPs now working with custom thread pool profile.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastThreadPoolProfileTest.java   (contents, props changed)
      - copied, changed from r1294979, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateUnknownExecutorServiceRefTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=1295073&r1=1295072&r2=1295073&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Wed Feb 29 12:18:55 2012
@@ -159,16 +159,11 @@ public class AggregateDefinition extends
         Expression correlation = getExpression().createExpression(routeContext);
         AggregationStrategy strategy = createAggregationStrategy(routeContext);
 
-        executorService = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Aggregator", this);
-        if (executorService == null) {
+        executorService = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Aggregator", this, isParallelProcessing());
+        if (executorService == null && !isParallelProcessing()) {
             // executor service is mandatory for the Aggregator
-            ExecutorServiceManager executorServiceManager = routeContext.getCamelContext().getExecutorServiceManager();
-            if (isParallelProcessing()) {
-                executorService = executorServiceManager.newDefaultThreadPool(this, "Aggregator");
-            } else {
-                // we do not run in parallel mode, but use a synchronous executor, so we run in current thread
-                executorService = new SynchronousExecutorService();
-            }
+            // we do not run in parallel mode, but use a synchronous executor, so we run in current thread
+            executorService = new SynchronousExecutorService();
         }
        
         if (timeoutCheckerExecutorServiceRef != null && timeoutCheckerExecutorService == null) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java?rev=1295073&r1=1295072&r2=1295073&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java Wed Feb 29 12:18:55 2012
@@ -78,13 +78,7 @@ public class DelayDefinition extends Exp
         Processor childProcessor = this.createChildProcessor(routeContext, false);
         Expression delay = createAbsoluteTimeDelayExpression(routeContext);
 
-        ScheduledExecutorService scheduled = null;
-        if (getAsyncDelayed() != null && getAsyncDelayed()) {
-            scheduled = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Delay", this);
-            if (scheduled == null) {
-                scheduled = routeContext.getCamelContext().getExecutorServiceManager().newDefaultScheduledThreadPool(this, "Delay");
-            }
-        }
+        ScheduledExecutorService scheduled = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Delay", this, isAsyncDelayed());
 
         Delayer answer = new Delayer(childProcessor, delay, scheduled);
         if (getAsyncDelayed() != null) {
@@ -165,6 +159,10 @@ public class DelayDefinition extends Exp
         this.asyncDelayed = asyncDelayed;
     }
 
+    public boolean isAsyncDelayed() {
+        return asyncDelayed != null && asyncDelayed;
+    }
+
     public Boolean getCallerRunsWhenRejected() {
         return callerRunsWhenRejected;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java?rev=1295073&r1=1295072&r2=1295073&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java Wed Feb 29 12:18:55 2012
@@ -30,7 +30,6 @@ import org.apache.camel.processor.Multic
 import org.apache.camel.processor.SubUnitOfWorkProcessor;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
-import org.apache.camel.spi.ExecutorServiceManager;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.CamelContextHelper;
 
@@ -217,11 +216,8 @@ public class MulticastDefinition extends
             aggregationStrategy = new UseLatestAggregationStrategy();
         }
 
-        ExecutorServiceManager executorServiceManager = routeContext.getCamelContext().getExecutorServiceManager();
-        if (isParallelProcessing() && executorService == null) {
-            String ref = this.executorServiceRef != null ? this.executorServiceRef : "Delay";
-            executorService = executorServiceManager.newDefaultThreadPool(this, ref);
-        }
+        executorService = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Multicast", this, isParallelProcessing());
+
         long timeout = getTimeout() != null ? getTimeout() : 0;
         if (timeout > 0 && !isParallelProcessing()) {
             throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled.");

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java?rev=1295073&r1=1295072&r2=1295073&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java Wed Feb 29 12:18:55 2012
@@ -36,7 +36,6 @@ import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.processor.OnCompletionProcessor;
 import org.apache.camel.processor.UnitOfWorkProcessor;
-import org.apache.camel.spi.ExecutorServiceManager;
 import org.apache.camel.spi.RouteContext;
 
 /**
@@ -130,9 +129,8 @@ public class OnCompletionDefinition exte
             when = onWhen.getExpression().createPredicate(routeContext);
         }
 
-        String ref = this.executorServiceRef != null ? this.executorServiceRef : "OnCompletion";
-        ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
-        executorService = manager.newDefaultThreadPool(this, ref);
+        // executor service is mandatory for on completion
+        executorService = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "OnCompletion", this, true);
 
         // should be false by default
         boolean original = getUseOriginalMessagePolicy() != null ? getUseOriginalMessagePolicy() : false;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java?rev=1295073&r1=1295072&r2=1295073&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java Wed Feb 29 12:18:55 2012
@@ -223,11 +223,13 @@ public final class ProcessorDefinitionHe
      * @param name           name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService}
      *                       is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}.
      * @param definition     the node definition which may leverage executor service.
+     * @param useDefault     whether to fallback and use a default thread pool, if no explicit configured
      * @return the configured executor service, or <tt>null</tt> if none was configured.
-     * @throws NoSuchBeanException is thrown if lookup of executor service in {@link org.apache.camel.spi.Registry} was not found
+     * @throws IllegalArgumentException is thrown if lookup of executor service in {@link org.apache.camel.spi.Registry} was not found
      */
     public static ExecutorService getConfiguredExecutorService(RouteContext routeContext, String name,
-                                                               ExecutorServiceAwareDefinition<?> definition) throws NoSuchBeanException {
+                                                               ExecutorServiceAwareDefinition<?> definition,
+                                                               boolean useDefault) throws IllegalArgumentException {
         ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
         ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext());
 
@@ -242,9 +244,11 @@ public final class ProcessorDefinitionHe
                 answer = manager.newThreadPool(definition, name, definition.getExecutorServiceRef());
             }
             if (answer == null) {
-                throw new NoSuchBeanException(definition.getExecutorServiceRef(), "ExecutorService");
+                throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " not found in registry or as a thread pool profile.");
             }
             return answer;
+        } else if (useDefault) {
+            return manager.newDefaultThreadPool(definition, name);
         }
 
         return null;
@@ -267,12 +271,14 @@ public final class ProcessorDefinitionHe
      * @param name           name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService}
      *                       is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}.
      * @param definition     the node definition which may leverage executor service.
+     * @param useDefault     whether to fallback and use a default thread pool, if no explicit configured
      * @return the configured executor service, or <tt>null</tt> if none was configured.
-     * @throws IllegalArgumentException is thrown if the found instance is not a ScheduledExecutorService type.
-     * @throws NoSuchBeanException is thrown if lookup of executor service in {@link org.apache.camel.spi.Registry} was not found
+     * @throws IllegalArgumentException is thrown if the found instance is not a ScheduledExecutorService type,
+     * or lookup of executor service in {@link org.apache.camel.spi.Registry} was not found
      */
     public static ScheduledExecutorService getConfiguredScheduledExecutorService(RouteContext routeContext, String name,
-                                                               ExecutorServiceAwareDefinition<?> definition) throws IllegalArgumentException, NoSuchBeanException {
+                                                               ExecutorServiceAwareDefinition<?> definition,
+                                                               boolean useDefault) throws IllegalArgumentException {
         ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
         ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext());
 
@@ -293,9 +299,11 @@ public final class ProcessorDefinitionHe
                 }
             }
             if (answer == null) {
-                throw new NoSuchBeanException(definition.getExecutorServiceRef(), "ScheduledExecutorService");
+                throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " not found in registry or as a thread pool profile.");
             }
             return answer;
+        } else if (useDefault) {
+            return manager.newDefaultScheduledThreadPool(definition, name);
         }
 
         return null;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java?rev=1295073&r1=1295072&r2=1295073&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java Wed Feb 29 12:18:55 2012
@@ -34,7 +34,6 @@ import org.apache.camel.processor.Pipeli
 import org.apache.camel.processor.RecipientList;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
-import org.apache.camel.spi.ExecutorServiceManager;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.CamelContextHelper;
 
@@ -128,11 +127,8 @@ public class RecipientListDefinition<Typ
         if (getTimeout() != null) {
             answer.setTimeout(getTimeout());
         }
-        if (isParallelProcessing() && executorService == null) {
-            String ref = this.executorServiceRef != null ? this.executorServiceRef : "RecipientList";
-            ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
-            executorService = manager.newDefaultThreadPool(this, ref);
-        }
+
+        executorService = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "RecipientList", this, isParallelProcessing());
         answer.setExecutorService(executorService);
         long timeout = getTimeout() != null ? getTimeout() : 0;
         if (timeout > 0 && !isParallelProcessing()) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java?rev=1295073&r1=1295072&r2=1295073&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java Wed Feb 29 12:18:55 2012
@@ -30,7 +30,6 @@ import org.apache.camel.model.language.E
 import org.apache.camel.processor.Splitter;
 import org.apache.camel.processor.SubUnitOfWorkProcessor;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
-import org.apache.camel.spi.ExecutorServiceManager;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.CamelContextHelper;
 
@@ -95,11 +94,7 @@ public class SplitDefinition extends Exp
     public Processor createProcessor(RouteContext routeContext) throws Exception {
         Processor childProcessor = this.createChildProcessor(routeContext, true);
         aggregationStrategy = createAggregationStrategy(routeContext);
-        if (isParallelProcessing() && executorService == null) {
-            String ref = this.executorServiceRef != null ? this.executorServiceRef : "Split";
-            ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
-            executorService = manager.newDefaultThreadPool(this, ref);
-        }
+        executorService = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Split", this, isParallelProcessing());
 
         long timeout = getTimeout() != null ? getTimeout() : 0;
         if (timeout > 0 && !isParallelProcessing()) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=1295073&r1=1295072&r2=1295073&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java Wed Feb 29 12:18:55 2012
@@ -80,7 +80,7 @@ public class ThreadsDefinition extends O
         // the threads name
         String name = getThreadName() != null ? getThreadName() : "Threads";
         // prefer any explicit configured executor service
-        executorService = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, name, this);
+        executorService = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, name, this, false);
         // if no explicit then create from the options
         if (executorService == null) {
             ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java?rev=1295073&r1=1295072&r2=1295073&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java Wed Feb 29 12:18:55 2012
@@ -83,13 +83,7 @@ public class ThrottleDefinition extends 
     public Processor createProcessor(RouteContext routeContext) throws Exception {
         Processor childProcessor = this.createChildProcessor(routeContext, true);
 
-        ScheduledExecutorService scheduled = null;
-        if (getAsyncDelayed() != null && getAsyncDelayed()) {
-            scheduled = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Throttle", this);
-            if (scheduled == null) {
-                scheduled = routeContext.getCamelContext().getExecutorServiceManager().newDefaultScheduledThreadPool(this, "Throttle");
-            }
-        }
+        ScheduledExecutorService scheduled = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Throttle", this, isAsyncDelayed());
 
         // should be default 1000 millis
         long period = getTimePeriodMillis() != null ? getTimePeriodMillis() : 1000L;
@@ -195,6 +189,10 @@ public class ThrottleDefinition extends 
         this.asyncDelayed = asyncDelayed;
     }
 
+    public boolean isAsyncDelayed() {
+        return asyncDelayed != null && asyncDelayed;
+    }
+
     public Boolean getCallerRunsWhenRejected() {
         return callerRunsWhenRejected;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java?rev=1295073&r1=1295072&r2=1295073&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java Wed Feb 29 12:18:55 2012
@@ -33,7 +33,6 @@ import org.apache.camel.ExchangePattern;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
 import org.apache.camel.processor.WireTapProcessor;
-import org.apache.camel.spi.ExecutorServiceManager;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.CamelContextHelper;
 
@@ -83,9 +82,9 @@ public class WireTapDefinition<Type exte
     public Processor createProcessor(RouteContext routeContext) throws Exception {
         Endpoint endpoint = resolveEndpoint(routeContext);
 
-        String ref = this.executorServiceRef != null ? this.executorServiceRef : "WireTap";
-        ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
-        executorService = manager.newDefaultThreadPool(this, ref);
+        // executor service is mandatory for wire tap
+        executorService = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "WireTap", this, true);
+
         WireTapProcessor answer = new WireTapProcessor(endpoint, getPattern(), executorService);
 
         answer.setCopy(isCopy());

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastThreadPoolProfileTest.java (from r1294979, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastThreadPoolProfileTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastThreadPoolProfileTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTest.java&r1=1294979&r2=1295073&rev=1295073&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastThreadPoolProfileTest.java Wed Feb 29 12:18:55 2012
@@ -16,49 +16,26 @@
  */
 package org.apache.camel.processor;
 
-import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.builder.ThreadPoolProfileBuilder;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.spi.ThreadPoolProfile;
 
 /**
  * @version 
  */
-public class MulticastParallelTest extends ContextTestSupport {
-
-    public void testSingleMulticastParallel() throws Exception {
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("AB");
-
-        template.sendBody("direct:start", "Hello");
-
-        assertMockEndpointsSatisfied();
-    }
-
-    public void testMulticastParallel() throws Exception {
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedMessageCount(20);
-        mock.whenAnyExchangeReceived(new Processor() {
-            public void process(Exchange exchange) throws Exception {
-                // they should all be AB even though A is slower than B
-                assertEquals("AB", exchange.getIn().getBody(String.class));
-            }
-        });
-
-        for (int i = 0; i < 20; i++) {
-            template.sendBody("direct:start", "Hello");
-        }
-
-        assertMockEndpointsSatisfied();
-    }
+public class MulticastThreadPoolProfileTest extends MulticastParallelTest {
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
+                // register thread pool profile
+                ThreadPoolProfile profile = new ThreadPoolProfileBuilder("myProfile").poolSize(5).maxPoolSize(10).maxQueueSize(20).build();
+                context.getExecutorServiceManager().registerThreadPoolProfile(profile);
+
                 from("direct:start")
                     .multicast(new AggregationStrategy() {
                             public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
@@ -71,7 +48,8 @@ public class MulticastParallelTest exten
                                 return oldExchange;
                             }
                         })
-                        .parallelProcessing().to("direct:a", "direct:b")
+                        // and refer to the profile here
+                        .parallelProcessing().executorServiceRef("myProfile").to("direct:a", "direct:b")
                     // use end to indicate end of multicast route
                     .end()
                     .to("mock:result");

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastThreadPoolProfileTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastThreadPoolProfileTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateUnknownExecutorServiceRefTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateUnknownExecutorServiceRefTest.java?rev=1295073&r1=1295072&r2=1295073&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateUnknownExecutorServiceRefTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateUnknownExecutorServiceRefTest.java Wed Feb 29 12:18:55 2012
@@ -48,8 +48,8 @@ public class AggregateUnknownExecutorSer
             context.start();
             fail("Should have thrown exception");
         } catch (FailedToCreateRouteException e) {
-            NoSuchBeanException cause = assertIsInstanceOf(NoSuchBeanException.class, e.getCause());
-            assertEquals("myUnknownProfile", cause.getName());
+            IllegalArgumentException cause = assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
+            assertTrue(cause.getMessage().contains("myUnknownProfile"));
         }
     }