You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/03/12 07:57:09 UTC
svn commit: r922155 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/model/
camel-core/src/main/java/org/apache/camel/util/concurrent/
camel-core/src/test/java/org/apache/camel/processor/ component...
Author: davsclaus
Date: Fri Mar 12 06:57:08 2010
New Revision: 922155
URL: http://svn.apache.org/viewvc?rev=922155&view=rev
Log:
CAMEL-1588: EIPs using thread pools now use common code from helper to lookup configuration.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/ExecutorServiceAware.java (with props)
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java (contents, props changed)
- copied, changed from r921818, camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAware.java
Removed:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAware.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringSplitterWithCustomThreadPoolExecutorTest.java
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/splitterWithCustomThreadPoolExecutor.xml
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/ExecutorServiceAware.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ExecutorServiceAware.java?rev=922155&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/ExecutorServiceAware.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/ExecutorServiceAware.java Fri Mar 12 06:57:08 2010
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Is used for easy configuration of {@link ExecutorService}.
+ *
+ * @version $Revision$
+ */
+public interface ExecutorServiceAware {
+
+ /**
+ * Gets the executor service
+ *
+ * @return the executor
+ */
+ ExecutorService getExecutorService();
+
+ /**
+ * Sets the executor service to be used.
+ *
+ * @param executorService the executor
+ */
+ void setExecutorService(ExecutorService executorService);
+
+ /**
+ * Gets the reference to lookup in the {@link org.apache.camel.spi.Registry} for the executor service to be used.
+ *
+ * @return the reference, or <tt>null</tt> if the executor was set directly
+ */
+ String getExecutorServiceRef();
+
+ /**
+ * Sets a reference to lookup in the {@link org.apache.camel.spi.Registry} for the executor service to be used.
+ *
+ * @param executorServiceRef reference for the executor
+ */
+ void setExecutorServiceRef(String executorServiceRef);
+
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/ExecutorServiceAware.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/ExecutorServiceAware.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=922155&r1=922154&r2=922155&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Fri Mar 12 06:57:08 2010
@@ -38,6 +38,7 @@ import org.apache.camel.processor.aggreg
import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
import org.apache.camel.spi.AggregationRepository;
import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
/**
* Represents an XML <aggregate/> element
@@ -46,7 +47,7 @@ import org.apache.camel.spi.RouteContext
*/
@XmlRootElement(name = "aggregate")
@XmlAccessorType(XmlAccessType.FIELD)
-public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition> implements ExecutorServiceAware<AggregateDefinition> {
+public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition> implements ExecutorServiceAwareDefinition<AggregateDefinition> {
@XmlElement(name = "correlationExpression", required = true)
private ExpressionSubElementDefinition correlationExpression;
@XmlElement(name = "completionPredicate", required = false)
@@ -150,8 +151,8 @@ public class AggregateDefinition extends
AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(), processor, correlation, strategy);
- ExecutorService executor = createExecutorService(routeContext);
- answer.setExecutorService(executor);
+ executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
+ answer.setExecutorService(executorService);
if (isParallelProcessing() != null) {
answer.setParallelProcessing(isParallelProcessing());
}
@@ -220,16 +221,6 @@ public class AggregateDefinition extends
return repository;
}
- private ExecutorService createExecutorService(RouteContext routeContext) {
- if (executorService == null && executorServiceRef != null) {
- executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
- if (executorService == null) {
- throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef + " not found in registry.");
- }
- }
- return executorService;
- }
-
public AggregationStrategy getAggregationStrategy() {
return aggregationStrategy;
}
Copied: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java (from r921818, camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAware.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java?p2=camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java&p1=camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAware.java&r1=921818&r2=922155&rev=922155&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAware.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java Fri Mar 12 06:57:08 2010
@@ -18,12 +18,16 @@ package org.apache.camel.model;
import java.util.concurrent.ExecutorService;
+import org.apache.camel.ExecutorServiceAware;
+
/**
* Enables definitions to support concurrency using {@link java.util.concurrent.ExecutorService}
*
* @version $Revision$
+ * @see org.apache.camel.util.concurrent.ExecutorServiceHelper#getConfiguredExecutorService(org.apache.camel.spi.RouteContext,
+ * ExecutorServiceAwareDefinition)
*/
-public interface ExecutorServiceAware<Type extends ProcessorDefinition> {
+public interface ExecutorServiceAwareDefinition<Type extends ProcessorDefinition> extends ExecutorServiceAware {
/**
* Setting the executor service for executing
@@ -31,7 +35,7 @@ public interface ExecutorServiceAware<Ty
* @param executorService the executor service
* @return the builder
*/
- <Type> Type executorService(ExecutorService executorService);
+ Type executorService(ExecutorService executorService);
/**
* Setting the executor service for executing
@@ -40,14 +44,6 @@ public interface ExecutorServiceAware<Ty
* to lookup in the {@link org.apache.camel.spi.Registry}
* @return the builder
*/
- <Type> Type executorServiceRef(String executorServiceRef);
-
- ExecutorService getExecutorService();
-
- void setExecutorService(ExecutorService executorService);
-
- String getExecutorServiceRef();
-
- void setExecutorServiceRef(String executorServiceRef);
+ Type executorServiceRef(String executorServiceRef);
}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java?rev=922155&r1=922154&r2=922155&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java Fri Mar 12 06:57:08 2010
@@ -29,6 +29,7 @@ import org.apache.camel.processor.Multic
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
/**
* Represents an XML <multicast/> element
@@ -37,7 +38,7 @@ import org.apache.camel.spi.RouteContext
*/
@XmlRootElement(name = "multicast")
@XmlAccessorType(XmlAccessType.FIELD)
-public class MulticastDefinition extends OutputDefinition<ProcessorDefinition> implements ExecutorServiceAware<MulticastDefinition> {
+public class MulticastDefinition extends OutputDefinition<ProcessorDefinition> implements ExecutorServiceAwareDefinition<MulticastDefinition> {
@XmlAttribute(required = false)
private Boolean parallelProcessing;
@XmlAttribute(required = false)
@@ -146,12 +147,7 @@ public class MulticastDefinition extends
// default to use latest aggregation strategy
aggregationStrategy = new UseLatestAggregationStrategy();
}
- if (executorServiceRef != null) {
- executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
- if (executorService == null) {
- throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef + " not found in registry.");
- }
- }
+ executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
return new MulticastProcessor(routeContext.getCamelContext(), list, aggregationStrategy, isParallelProcessing(),
executorService, isStreaming(), isStopOnException());
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java?rev=922155&r1=922154&r2=922155&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java Fri Mar 12 06:57:08 2010
@@ -34,6 +34,7 @@ import org.apache.camel.builder.Expressi
import org.apache.camel.processor.OnCompletionProcessor;
import org.apache.camel.processor.UnitOfWorkProcessor;
import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
/**
* Represents an XML <onCompletion/> element
@@ -42,7 +43,7 @@ import org.apache.camel.spi.RouteContext
*/
@XmlRootElement(name = "onCompletion")
@XmlAccessorType(XmlAccessType.FIELD)
-public class OnCompletionDefinition extends ProcessorDefinition<ProcessorDefinition> implements ExecutorServiceAware<OnCompletionDefinition> {
+public class OnCompletionDefinition extends ProcessorDefinition<ProcessorDefinition> implements ExecutorServiceAwareDefinition<OnCompletionDefinition> {
@XmlAttribute(required = false)
private Boolean onCompleteOnly = Boolean.FALSE;
@@ -82,6 +83,10 @@ public class OnCompletionDefinition exte
@Override
public Processor createProcessor(RouteContext routeContext) throws Exception {
+ if (onCompleteOnly && onFailureOnly) {
+ throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this);
+ }
+
Processor childProcessor = createOutputsProcessor(routeContext);
// wrap the on completion route in a unit of work processor
@@ -92,19 +97,9 @@ public class OnCompletionDefinition exte
when = onWhen.getExpression().createPredicate(routeContext);
}
- if (onCompleteOnly && onFailureOnly) {
- throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this);
- }
-
- if (executorServiceRef != null) {
- executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
- if (executorService == null) {
- throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef + " not found in registry.");
- }
- }
-
OnCompletionProcessor answer = new OnCompletionProcessor(routeContext.getCamelContext(), childProcessor,
onCompleteOnly, onFailureOnly, when);
+ executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
answer.setExecutorService(executorService);
return answer;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java?rev=922155&r1=922154&r2=922155&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java Fri Mar 12 06:57:08 2010
@@ -32,6 +32,7 @@ import org.apache.camel.processor.Recipi
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
/**
* Represents an XML <recipientList/> element
@@ -40,7 +41,7 @@ import org.apache.camel.spi.RouteContext
*/
@XmlRootElement(name = "recipientList")
@XmlAccessorType(XmlAccessType.FIELD)
-public class RecipientListDefinition extends ExpressionNode implements ExecutorServiceAware<RecipientListDefinition> {
+public class RecipientListDefinition extends ExpressionNode implements ExecutorServiceAwareDefinition<RecipientListDefinition> {
@XmlTransient
private AggregationStrategy aggregationStrategy;
@@ -97,7 +98,12 @@ public class RecipientListDefinition ext
}
answer.setAggregationStrategy(createAggregationStrategy(routeContext));
- answer.setExecutorService(createExecutorService(routeContext));
+ executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
+ if (executorService == null) {
+ // fallback to create a new executor
+ executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("RecipientList");
+ }
+ answer.setExecutorService(executorService);
return answer;
}
@@ -113,20 +119,6 @@ public class RecipientListDefinition ext
return aggregationStrategy;
}
- private ExecutorService createExecutorService(RouteContext routeContext) {
- if (executorService == null && executorServiceRef != null) {
- executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
- if (executorService == null) {
- throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef + " not found in registry.");
- }
- }
- if (executorService == null) {
- // fall back and use default
- executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("RecipientList");
- }
- return executorService;
- }
-
@Override
@SuppressWarnings("unchecked")
public List<ProcessorDefinition> getOutputs() {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java?rev=922155&r1=922154&r2=922155&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java Fri Mar 12 06:57:08 2010
@@ -278,6 +278,8 @@ public class RouteDefinition extends Pro
/**
* Disable stream caching for this route.
+ *
+ * @return the builder
*/
public RouteDefinition noStreamCaching() {
setStreamCache(Boolean.FALSE);
@@ -287,6 +289,8 @@ public class RouteDefinition extends Pro
/**
* Enable stream caching for this route.
+ *
+ * @return the builder
*/
public RouteDefinition streamCaching() {
setStreamCache(Boolean.TRUE);
@@ -301,6 +305,8 @@ public class RouteDefinition extends Pro
/**
* Disable tracing for this route.
+ *
+ * @return the builder
*/
public RouteDefinition noTracing() {
setTrace(false);
@@ -309,6 +315,8 @@ public class RouteDefinition extends Pro
/**
* Enable tracing for this route.
+ *
+ * @return the builder
*/
public RouteDefinition tracing() {
setTrace(true);
@@ -317,6 +325,8 @@ public class RouteDefinition extends Pro
/**
* Disable handle fault for this route.
+ *
+ * @return the builder
*/
public RouteDefinition noHandleFault() {
setHandleFault(false);
@@ -325,6 +335,8 @@ public class RouteDefinition extends Pro
/**
* Enable handle fault for this route.
+ *
+ * @return the builder
*/
public RouteDefinition handleFault() {
setHandleFault(true);
@@ -333,6 +345,8 @@ public class RouteDefinition extends Pro
/**
* Disable delayer for this route.
+ *
+ * @return the builder
*/
public RouteDefinition noDelayer() {
setDelayer(0L);
@@ -343,6 +357,7 @@ public class RouteDefinition extends Pro
* Enable delayer for this route.
*
* @param delay delay in millis
+ * @return the builder
*/
public RouteDefinition delayer(long delay) {
setDelayer(delay);
@@ -362,6 +377,8 @@ public class RouteDefinition extends Pro
/**
* Disables this route from being auto started when Camel starts.
+ *
+ * @return the builder
*/
public RouteDefinition noAutoStartup() {
setAutoStartup(Boolean.FALSE);
@@ -375,7 +392,7 @@ public class RouteDefinition extends Pro
* Camel will stop routes in reverse order when its stopping.
*
* @param order the order represented as a number
- * @return this builder
+ * @return the builder
*/
public RouteDefinition startupOrder(int order) {
setStartupOrder(order);
@@ -383,8 +400,11 @@ public class RouteDefinition extends Pro
}
/**
- * Disables this route from being auto started when Camel starts.
- */
+ * Configures a route policy for this route
+ *
+ * @param routePolicy the route policy
+ * @return the builder
+ */
public RouteDefinition routePolicy(RoutePolicy routePolicy) {
setRoutePolicy(routePolicy);
return this;
@@ -394,6 +414,7 @@ public class RouteDefinition extends Pro
* Configures a route policy for this route
*
* @param routePolicyRef reference to a {@link RoutePolicy} to lookup and use.
+ * @return the builder
*/
public RouteDefinition routePolicyRef(String routePolicyRef) {
setRoutePolicyRef(routePolicyRef);
@@ -404,6 +425,7 @@ public class RouteDefinition extends Pro
* Configures a shutdown route option.
*
* @param shutdownRoute the option to use when shutting down this route
+ * @return the builder
*/
public RouteDefinition shutdownRoute(ShutdownRoute shutdownRoute) {
setShutdownRoute(shutdownRoute);
@@ -414,6 +436,7 @@ public class RouteDefinition extends Pro
* Configures a shutdown running task option.
*
* @param shutdownRunningTask the option to use when shutting down and how to act upon running tasks.
+ * @return the builder
*/
public RouteDefinition shutdownRunningTask(ShutdownRunningTask shutdownRunningTask) {
setShutdownRunningTask(shutdownRunningTask);
@@ -663,14 +686,23 @@ public class RouteDefinition extends Pro
// configure auto startup
if (autoStartup != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Using AutoStartup " + isAutoStartup() + " on route: " + this);
+ }
routeContext.setAutoStartup(isAutoStartup());
}
// configure shutdown
if (shutdownRoute != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Using ShutdownRoute " + getShutdownRoute() + " on route: " + this);
+ }
routeContext.setShutdownRoute(getShutdownRoute());
}
if (shutdownRunningTask != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Using ShutdownRunningTask " + getShutdownRunningTask() + " on route: " + this);
+ }
routeContext.setShutdownRunningTask(getShutdownRunningTask());
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java?rev=922155&r1=922154&r2=922155&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java Fri Mar 12 06:57:08 2010
@@ -31,6 +31,7 @@ import org.apache.camel.processor.Splitt
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
/**
* Represents an XML <split/> element
@@ -39,7 +40,7 @@ import org.apache.camel.spi.RouteContext
*/
@XmlRootElement(name = "split")
@XmlAccessorType(XmlAccessType.FIELD)
-public class SplitDefinition extends ExpressionNode implements ExecutorServiceAware<SplitDefinition> {
+public class SplitDefinition extends ExpressionNode implements ExecutorServiceAwareDefinition<SplitDefinition> {
@XmlTransient
private AggregationStrategy aggregationStrategy;
@XmlTransient
@@ -84,8 +85,15 @@ public class SplitDefinition extends Exp
@Override
public Processor createProcessor(RouteContext routeContext) throws Exception {
Processor childProcessor = routeContext.createProcessor(this);
+
aggregationStrategy = createAggregationStrategy(routeContext);
- executorService = createExecutorService(routeContext);
+
+ executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
+ if (executorService == null) {
+ // fallback to create a new executor
+ executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("Split");
+ }
+
Expression exp = getExpression().createExpression(routeContext);
return new Splitter(routeContext.getCamelContext(), exp, childProcessor, aggregationStrategy,
isParallelProcessing(), executorService, isStreaming(), isStopOnException());
@@ -103,21 +111,7 @@ public class SplitDefinition extends Exp
}
return strategy;
}
-
- private ExecutorService createExecutorService(RouteContext routeContext) {
- if (executorService == null && executorServiceRef != null) {
- executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
- if (executorService == null) {
- throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef + " not found in registry.");
- }
- }
- if (executorService == null) {
- // fall back and use default
- executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool("Split");
- }
- return executorService;
- }
-
+
// Fluent API
// -------------------------------------------------------------------------
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=922155&r1=922154&r2=922155&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java Fri Mar 12 06:57:08 2010
@@ -31,6 +31,7 @@ import org.apache.camel.builder.xml.Time
import org.apache.camel.processor.ThreadsProcessor;
import org.apache.camel.processor.UnitOfWorkProcessor;
import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
/**
* Represents an XML <threads/> element
@@ -39,7 +40,7 @@ import org.apache.camel.spi.RouteContext
*/
@XmlRootElement(name = "threads")
@XmlAccessorType(XmlAccessType.FIELD)
-public class ThreadsDefinition extends OutputDefinition<ProcessorDefinition> implements ExecutorServiceAware<ThreadsDefinition> {
+public class ThreadsDefinition extends OutputDefinition<ProcessorDefinition> implements ExecutorServiceAwareDefinition<ThreadsDefinition> {
@XmlTransient
private ExecutorService executorService;
@@ -61,14 +62,10 @@ public class ThreadsDefinition extends O
@Override
public Processor createProcessor(RouteContext routeContext) throws Exception {
- if (executorService == null && executorServiceRef != null) {
- executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
- if (executorService == null) {
- throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef + " not found in registry.");
- }
- }
-
+ // prefer any explicit configured executor service
+ executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
if (executorService == null) {
+ // none was configured so create an executor based on the other parameters
String name = getThreadName() != null ? getThreadName() : "Threads";
if (poolSize == null || poolSize <= 0) {
// use the cached thread pool
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java?rev=922155&r1=922154&r2=922155&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java Fri Mar 12 06:57:08 2010
@@ -31,6 +31,7 @@ import org.apache.camel.Processor;
import org.apache.camel.processor.SendAsyncProcessor;
import org.apache.camel.processor.UnitOfWorkProcessor;
import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
/**
* Represents an XML <to/> element
@@ -39,7 +40,7 @@ import org.apache.camel.spi.RouteContext
*/
@XmlRootElement(name = "to")
@XmlAccessorType(XmlAccessType.FIELD)
-public class ToDefinition extends SendDefinition<ToDefinition> implements ExecutorServiceAware<ToDefinition> {
+public class ToDefinition extends SendDefinition<ToDefinition> implements ExecutorServiceAwareDefinition<ToDefinition> {
@XmlTransient
private final List<ProcessorDefinition> outputs = new ArrayList<ProcessorDefinition>();
@XmlAttribute(required = false)
@@ -89,13 +90,9 @@ public class ToDefinition extends SendDe
// this code below is only for creating when async is enabled
// ----------------------------------------------------------
- if (executorServiceRef != null) {
- executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
- if (executorService == null) {
- throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef + " not found in registry.");
- }
- }
+ executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
if (executorService == null && poolSize != null) {
+ // crete a new based on the other options
executorService = routeContext.getCamelContext().getExecutorServiceStrategy()
.newThreadPool("ToAsync[" + getLabel() + "]", poolSize, poolSize);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java?rev=922155&r1=922154&r2=922155&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java Fri Mar 12 06:57:08 2010
@@ -30,6 +30,7 @@ import org.apache.camel.Expression;
import org.apache.camel.Processor;
import org.apache.camel.processor.WireTapProcessor;
import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
/**
* Represents an XML <wireTap/> element
@@ -38,7 +39,7 @@ import org.apache.camel.spi.RouteContext
*/
@XmlRootElement(name = "wireTap")
@XmlAccessorType(XmlAccessType.FIELD)
-public class WireTapDefinition extends SendDefinition<WireTapDefinition> implements ExecutorServiceAware<ProcessorDefinition> {
+public class WireTapDefinition extends SendDefinition<WireTapDefinition> implements ExecutorServiceAwareDefinition<ProcessorDefinition> {
@XmlTransient
private Processor newExchangeProcessor;
@@ -75,12 +76,7 @@ public class WireTapDefinition extends S
answer.setNewExchangeExpression(newExchangeExpression.createExpression(routeContext));
}
- if (executorServiceRef != null) {
- executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
- if (executorService == null) {
- throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef + " not found in registry.");
- }
- }
+ executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
answer.setExecutorService(executorService);
return answer;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=922155&r1=922154&r2=922155&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Fri Mar 12 06:57:08 2010
@@ -25,14 +25,23 @@ import java.util.concurrent.ThreadPoolEx
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.camel.model.ExecutorServiceAwareDefinition;
+import org.apache.camel.spi.ExecutorServiceStrategy;
+import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.ObjectHelper;
+
/**
* Helper for {@link java.util.concurrent.ExecutorService} to construct executors using a thread factory that
* create thread names with Camel prefix.
+ * <p/>
+ * This helper should <b>NOT</b> be used by end users of Camel, as you should use
+ * {@link org.apache.camel.spi.ExecutorServiceStrategy} which you obtain from {@link org.apache.camel.CamelContext}
+ * to create thread pools.
+ * <p/>
+ * This helper should only be used internally in Camel.
*
* @version $Revision$
- * @deprecated replaced with {@link org.apache.camel.spi.ExecutorServiceStrategy}
*/
-@Deprecated
public final class ExecutorServiceHelper {
private static AtomicInteger threadCounter = new AtomicInteger();
@@ -160,4 +169,39 @@ public final class ExecutorServiceHelper
return answer;
}
+ /**
+ * Will lookup and get the configured {@link java.util.concurrent.ExecutorService} from the given definition.
+ * <p/>
+ * This method will lookup for configured thread pool in the following order
+ * <ul>
+ * <li>from the definition if any explicit configured executor service.</li>
+ * <li>if none found, then <tt>null</tt> is returned.</li>
+ * </ul>
+ * The various {@link ExecutorServiceAwareDefinition} should use this helper method to ensure they support
+ * configured executor services in the same coherent way.
+ *
+ * @param routeContext the rout context
+ * @param definition the node definition which may leverage executor service.
+ * @return the configured executor service, or <tt>null</tt> if none was configured.
+ * @throws IllegalArgumentException is thrown if lookup of executor service in {@link org.apache.camel.spi.Registry} was not found
+ */
+ public static ExecutorService getConfiguredExecutorService(RouteContext routeContext,
+ ExecutorServiceAwareDefinition definition) throws IllegalArgumentException {
+ ExecutorServiceStrategy strategy = routeContext.getCamelContext().getExecutorServiceStrategy();
+ ObjectHelper.notNull(strategy, "ExecutorServiceStrategy", routeContext.getCamelContext());
+
+ // prefer to use explicit configured executor on the definition
+ if (definition.getExecutorService() != null) {
+ return definition.getExecutorService();
+ } else if (definition.getExecutorServiceRef() != null) {
+ ExecutorService answer = strategy.lookup(definition.getExecutorServiceRef());
+ if (answer == null) {
+ throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " not found in registry.");
+ }
+ return answer;
+ }
+
+ return null;
+ }
+
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java?rev=922155&r1=922154&r2=922155&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java Fri Mar 12 06:57:08 2010
@@ -33,9 +33,16 @@ public class SplitterWithCustomThreadPoo
public void testSplitterWithCustomThreadPoolExecutor() throws Exception {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) getSplitter().getExecutorService();
+ if (threadPoolExecutor == null) {
+ threadPoolExecutor = context.getRegistry().lookup(getSplitter().getExecutorServiceRef(), ThreadPoolExecutor.class);
+ }
// this should be sufficient as core pool size is the only thing I changed from the default
- assertTrue(threadPoolExecutor.getCorePoolSize() == customThreadPoolExecutor.getCorePoolSize());
- assertTrue(threadPoolExecutor.getMaximumPoolSize() == customThreadPoolExecutor.getMaximumPoolSize());
+ assertTrue(threadPoolExecutor.getCorePoolSize() == getThreadPoolExecutor().getCorePoolSize());
+ assertTrue(threadPoolExecutor.getMaximumPoolSize() == getThreadPoolExecutor().getMaximumPoolSize());
+ }
+
+ protected ThreadPoolExecutor getThreadPoolExecutor() {
+ return customThreadPoolExecutor;
}
protected SplitDefinition getSplitter() {
Modified: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringSplitterWithCustomThreadPoolExecutorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringSplitterWithCustomThreadPoolExecutorTest.java?rev=922155&r1=922154&r2=922155&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringSplitterWithCustomThreadPoolExecutorTest.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringSplitterWithCustomThreadPoolExecutorTest.java Fri Mar 12 06:57:08 2010
@@ -16,11 +16,20 @@
*/
package org.apache.camel.spring.processor;
+import java.util.concurrent.ThreadPoolExecutor;
+
import org.apache.camel.CamelContext;
import org.apache.camel.processor.SplitterWithCustomThreadPoolExecutorTest;
+
import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
public class SpringSplitterWithCustomThreadPoolExecutorTest extends SplitterWithCustomThreadPoolExecutorTest {
+
+ @Override
+ protected ThreadPoolExecutor getThreadPoolExecutor() {
+ return context.getRegistry().lookup("myThreadPoolExecutor", ThreadPoolExecutor.class);
+ }
+
protected CamelContext createCamelContext() throws Exception {
return createSpringCamelContext(this, "org/apache/camel/spring/processor/splitterWithCustomThreadPoolExecutor.xml");
}
Modified: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/splitterWithCustomThreadPoolExecutor.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/splitterWithCustomThreadPoolExecutor.xml?rev=922155&r1=922154&r2=922155&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/splitterWithCustomThreadPoolExecutor.xml (original)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/splitterWithCustomThreadPoolExecutor.xml Fri Mar 12 06:57:08 2010
@@ -26,7 +26,7 @@
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:parallel-custom-pool"/>
- <split executorServiceRef="threadPoolExecutor">
+ <split executorServiceRef="myThreadPoolExecutor">
<xpath>/invoice/lineItems</xpath>
<to uri="mock:result"/>
</split>
@@ -35,7 +35,7 @@
<!-- There's an easier way of specifying constructor args, just can't remember it
at the moment... old Spring syntax will do for now! -->
- <bean id="threadPoolExecutor" class="java.util.concurrent.ThreadPoolExecutor">
+ <bean id="myThreadPoolExecutor" class="java.util.concurrent.ThreadPoolExecutor">
<constructor-arg index="0" value="8"/>
<constructor-arg index="1" value="16"/>
<constructor-arg index="2" value="0"/>