You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2008/12/30 20:36:03 UTC
svn commit: r730218 -
/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java
Author: janstey
Date: Tue Dec 30 11:36:03 2008
New Revision: 730218
URL: http://svn.apache.org/viewvc?rev=730218&view=rev
Log:
CAMEL-1041 - Added ability to customize aggregation strategy for the Splitter in Spring DSL
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java?rev=730218&r1=730217&r2=730218&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java Tue Dec 30 11:36:03 2008
@@ -51,6 +51,8 @@
@XmlTransient
private Executor executor;
@XmlAttribute(required = false)
+ private String strategyRef;
+ @XmlAttribute(required = false)
private String threadPoolExecutorRef;
@XmlAttribute(required = false)
private Boolean streaming = false;
@@ -79,13 +81,36 @@
@Override
public Processor createProcessor(RouteContext routeContext) throws Exception {
Processor childProcessor = routeContext.createProcessor(this);
- if (aggregationStrategy == null) {
- aggregationStrategy = new UseLatestAggregationStrategy();
- }
+ aggregationStrategy = createAggregationStrategy(routeContext);
executor = createThreadPoolExecutor(routeContext);
return new Splitter(getExpression().createExpression(routeContext), childProcessor, aggregationStrategy,
isParallelProcessing(), executor, streaming);
}
+
+
+ private AggregationStrategy createAggregationStrategy(RouteContext routeContext) {
+ AggregationStrategy strategy = getAggregationStrategy();
+ if (strategy == null && strategyRef != null) {
+ strategy = routeContext.lookup(strategyRef, AggregationStrategy.class);
+ }
+ if (strategy == null) {
+ // fallback to use latest
+ strategy = new UseLatestAggregationStrategy();
+ }
+ return strategy;
+ }
+
+ private Executor createThreadPoolExecutor(RouteContext routeContext) {
+ Executor executor = getExecutor();
+ if (executor == null && threadPoolExecutorRef != null) {
+ executor = routeContext.lookup(threadPoolExecutorRef, ThreadPoolExecutor.class);
+ }
+ if (executor == null) {
+ // fall back and use default
+ executor = new ThreadPoolExecutor(4, 16, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+ }
+ return executor;
+ }
// Fluent API
// -------------------------------------------------------------------------
@@ -181,20 +206,8 @@
public void setStreaming(boolean streaming) {
this.streaming = streaming;
- }
-
- private Executor createThreadPoolExecutor(RouteContext routeContext) {
- Executor executor = getExecutor();
- if (executor == null && threadPoolExecutorRef != null) {
- executor = routeContext.lookup(threadPoolExecutorRef, ThreadPoolExecutor.class);
- }
- if (executor == null) {
- // fall back and use default
- executor = new ThreadPoolExecutor(4, 16, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
- }
- return executor;
- }
-
+ }
+
public Executor getExecutor() {
return executor;
}