You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2008/12/30 20:36:03 UTC

svn commit: r730218 - /activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java

Author: janstey
Date: Tue Dec 30 11:36:03 2008
New Revision: 730218

URL: http://svn.apache.org/viewvc?rev=730218&view=rev
Log:
CAMEL-1041 - Added ability to customize aggregation strategy for the Splitter in Spring DSL

Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java?rev=730218&r1=730217&r2=730218&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java Tue Dec 30 11:36:03 2008
@@ -51,6 +51,8 @@
     @XmlTransient
     private Executor executor;
     @XmlAttribute(required = false)
+    private String strategyRef;
+    @XmlAttribute(required = false)
     private String threadPoolExecutorRef;
     @XmlAttribute(required = false)
     private Boolean streaming = false;
@@ -79,13 +81,36 @@
     @Override
     public Processor createProcessor(RouteContext routeContext) throws Exception {
         Processor childProcessor = routeContext.createProcessor(this);
-        if (aggregationStrategy == null) {
-            aggregationStrategy = new UseLatestAggregationStrategy();
-        }
+        aggregationStrategy = createAggregationStrategy(routeContext);
         executor = createThreadPoolExecutor(routeContext);
         return new Splitter(getExpression().createExpression(routeContext), childProcessor, aggregationStrategy,
                 isParallelProcessing(), executor, streaming);
     }
+
+    
+    private AggregationStrategy createAggregationStrategy(RouteContext routeContext) {
+        AggregationStrategy strategy = getAggregationStrategy();
+        if (strategy == null && strategyRef != null) {
+            strategy = routeContext.lookup(strategyRef, AggregationStrategy.class);
+        }
+        if (strategy == null) {
+            // fallback to use latest
+            strategy = new UseLatestAggregationStrategy();
+        }
+        return strategy;
+    }        
+    
+    private Executor createThreadPoolExecutor(RouteContext routeContext) {
+        Executor executor = getExecutor();
+        if (executor == null && threadPoolExecutorRef != null) {
+            executor = routeContext.lookup(threadPoolExecutorRef, ThreadPoolExecutor.class);
+        }
+        if (executor == null) {
+            // fall back and use default
+            executor = new ThreadPoolExecutor(4, 16, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+        }
+        return executor;
+    }         
     
     // Fluent API
     // -------------------------------------------------------------------------
@@ -181,20 +206,8 @@
 
     public void setStreaming(boolean streaming) {
         this.streaming = streaming;
-    }
-
-    private Executor createThreadPoolExecutor(RouteContext routeContext) {
-        Executor executor = getExecutor();
-        if (executor == null && threadPoolExecutorRef != null) {
-            executor = routeContext.lookup(threadPoolExecutorRef, ThreadPoolExecutor.class);
-        }
-        if (executor == null) {
-            // fall back and use default
-            executor = new ThreadPoolExecutor(4, 16, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
-        }
-        return executor;
-    }    
-   
+    }  
+    
     public Executor getExecutor() {
         return executor;
     }