You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2008/11/05 17:29:14 UTC

svn commit: r711602 - in /activemq/camel/branches/camel-1.x: ./ camel-core/src/main/java/org/apache/camel/model/ camel-core/src/test/java/org/apache/camel/processor/ components/camel-spring/src/test/java/org/apache/camel/spring/processor/ components/ca...

Author: janstey
Date: Wed Nov  5 08:29:14 2008
New Revision: 711602

URL: http://svn.apache.org/viewvc?rev=711602&view=rev
Log:
Merged revisions 711599 via svnmerge from 
https://svn.apache.org/repos/asf/activemq/camel/trunk

........
  r711599 | janstey | 2008-11-05 12:46:56 -0330 (Wed, 05 Nov 2008) | 1 line
  
  CAMEL-1040 - added configurable thread pool parameter to splitter
........

Added:
    activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java
      - copied unchanged from r711599, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java
    activemq/camel/branches/camel-1.x/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringSplitterWithCustomThreadPoolExecutorTest.java
      - copied unchanged from r711599, activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringSplitterWithCustomThreadPoolExecutorTest.java
    activemq/camel/branches/camel-1.x/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/splitterWithCustomThreadPoolExecutor.xml
      - copied unchanged from r711599, activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/splitterWithCustomThreadPoolExecutor.xml
Modified:
    activemq/camel/branches/camel-1.x/   (props changed)
    activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
    activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/SplitterType.java

Propchange: activemq/camel/branches/camel-1.x/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Wed Nov  5 08:29:14 2008
@@ -1 +1 @@
-/activemq/camel/trunk:1-708421,708553-709447,709449-709612,709614-709634,709636-710013,711200,711206,711219-711220,711523,711531
+/activemq/camel/trunk:1-708421,708553-709447,709449-709612,709614-709634,709636-710013,711200,711206,711219-711220,711523,711531,711599

Modified: activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
URL: http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java?rev=711602&r1=711601&r2=711602&view=diff
==============================================================================
--- activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java (original)
+++ activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java Wed Nov  5 08:29:14 2008
@@ -448,11 +448,11 @@
      * This splitter responds with the latest message returned from destination
      * endpoint.
      *
-     * @param receipients the expression on which to split
+     * @param recipients the expression on which to split
      * @return the builder
      */
-    public SplitterType splitter(Expression receipients) {
-        SplitterType answer = new SplitterType(receipients);
+    public SplitterType splitter(Expression recipients) {
+        SplitterType answer = new SplitterType(recipients);
         addOutput(answer);
         return answer;
     }
@@ -516,12 +516,12 @@
      * This splitter responds with the latest message returned from destination
      * endpoint.
      *
-     * @param receipients the expression on which to split
+     * @param recipients the expression on which to split
      * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
      * @return the builder
      */
-    public SplitterType splitter(Expression receipients, boolean parallelProcessing) {
-        SplitterType answer = new SplitterType(receipients);
+    public SplitterType splitter(Expression recipients, boolean parallelProcessing) {
+        SplitterType answer = new SplitterType(recipients);
         addOutput(answer);
         answer.setParallelProcessing(parallelProcessing);
         return answer;
@@ -535,6 +535,27 @@
      * This splitter responds with the latest message returned from destination
      * endpoint.
      *
+     * @param recipients the expression on which to split
+     * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
+     * @param threadPoolExecutor override the default {@link ThreadPoolExecutor} 
+     * @return the builder
+     */
+    public SplitterType splitter(Expression recipients, boolean parallelProcessing, ThreadPoolExecutor threadPoolExecutor) {
+        SplitterType answer = new SplitterType(recipients);
+        addOutput(answer);
+        answer.setParallelProcessing(parallelProcessing);
+        answer.setThreadPoolExecutor(threadPoolExecutor);
+        return answer;
+    }    
+    
+    /**
+     * Creates the <a
+     * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
+     * pattern where an expression is evaluated to iterate through each of the
+     * parts of a message and then each part is then send to some endpoint.
+     * This splitter responds with the latest message returned from destination
+     * endpoint.
+     *
      * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
      * @return the expression clause for the expression on which to split
      */
@@ -550,6 +571,26 @@
      * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
      * pattern where an expression is evaluated to iterate through each of the
      * parts of a message and then each part is then send to some endpoint.
+     * This splitter responds with the latest message returned from destination
+     * endpoint.
+     *
+     * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
+     * @param threadPoolExecutor override the default {@link ThreadPoolExecutor} 
+     * @return the expression clause for the expression on which to split
+     */
+    public ExpressionClause<SplitterType> splitter(boolean parallelProcessing, ThreadPoolExecutor threadPoolExecutor) {
+        SplitterType answer = new SplitterType();
+        addOutput(answer);
+        answer.setParallelProcessing(parallelProcessing);
+        answer.setThreadPoolExecutor(threadPoolExecutor);
+        return ExpressionClause.createAndSetExpression(answer);
+    }    
+    
+    /**
+     * Creates the <a
+     * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
+     * pattern where an expression is evaluated to iterate through each of the
+     * parts of a message and then each part is then send to some endpoint.
      * Answer from the splitter is produced using given {@link AggregationStrategy}
      * @param partsExpression the expression on which to split
      * @param aggregationStrategy the strategy used to aggregate responses for
@@ -572,6 +613,29 @@
      * pattern where an expression is evaluated to iterate through each of the
      * parts of a message and then each part is then send to some endpoint.
      * Answer from the splitter is produced using given {@link AggregationStrategy}
+     * @param partsExpression the expression on which to split
+     * @param aggregationStrategy the strategy used to aggregate responses for
+     *          every part
+     * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
+     * @param threadPoolExecutor override the default {@link ThreadPoolExecutor} 
+     * @return the builder
+     */
+    public SplitterType splitter(Expression partsExpression,
+            AggregationStrategy aggregationStrategy, boolean parallelProcessing, ThreadPoolExecutor threadPoolExecutor) {
+        SplitterType answer = new SplitterType(partsExpression);
+        addOutput(answer);
+        answer.setAggregationStrategy(aggregationStrategy);
+        answer.setParallelProcessing(parallelProcessing);
+        answer.setThreadPoolExecutor(threadPoolExecutor);        
+        return answer;
+    }    
+    
+    /**
+     * Creates the <a
+     * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
+     * pattern where an expression is evaluated to iterate through each of the
+     * parts of a message and then each part is then send to some endpoint.
+     * Answer from the splitter is produced using given {@link AggregationStrategy}
      * @param aggregationStrategy the strategy used to aggregate responses for
      *          every part
      * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
@@ -585,7 +649,27 @@
         return ExpressionClause.createAndSetExpression(answer);
     }
 
-
+    /**
+     * Creates the <a
+     * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
+     * pattern where an expression is evaluated to iterate through each of the
+     * parts of a message and then each part is then send to some endpoint.
+     * Answer from the splitter is produced using given {@link AggregationStrategy}
+     * @param aggregationStrategy the strategy used to aggregate responses for
+     *          every part
+     * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
+     * @param threadPoolExecutor override the default {@link ThreadPoolExecutor} 
+     * @return the expression clause for the expression on which to split
+     */
+    public ExpressionClause<SplitterType> splitter(AggregationStrategy aggregationStrategy, boolean parallelProcessing, ThreadPoolExecutor threadPoolExecutor) {
+        SplitterType answer = new SplitterType();
+        addOutput(answer);
+        answer.setAggregationStrategy(aggregationStrategy);
+        answer.setParallelProcessing(parallelProcessing);
+        answer.setThreadPoolExecutor(threadPoolExecutor);           
+        return ExpressionClause.createAndSetExpression(answer);
+    }   
+    
     /**
      * Creates the <a
      * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>

Modified: activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/SplitterType.java
URL: http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/SplitterType.java?rev=711602&r1=711601&r2=711602&view=diff
==============================================================================
--- activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/SplitterType.java (original)
+++ activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/SplitterType.java Wed Nov  5 08:29:14 2008
@@ -49,6 +49,8 @@
     @XmlTransient
     private ThreadPoolExecutor threadPoolExecutor;
     @XmlAttribute(required = false)
+    private String threadPoolExecutorRef;
+    @XmlAttribute(required = false)
     private Boolean streaming = false;
     
     public SplitterType() {
@@ -78,9 +80,7 @@
         if (aggregationStrategy == null) {
             aggregationStrategy = new UseLatestAggregationStrategy();
         }
-        if (threadPoolExecutor == null) {
-            threadPoolExecutor = new ThreadPoolExecutor(4, 16, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
-        }
+        threadPoolExecutor = createThreadPoolExecutor(routeContext);
         return new Splitter(getExpression().createExpression(routeContext), childProcessor, aggregationStrategy,
                 isParallelProcessing(), threadPoolExecutor, streaming);
     }
@@ -125,6 +125,18 @@
         return this;
     }
 
+    private ThreadPoolExecutor createThreadPoolExecutor(RouteContext routeContext) {
+        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
+        if (threadPoolExecutor == null && threadPoolExecutorRef != null) {
+            threadPoolExecutor = routeContext.lookup(threadPoolExecutorRef, ThreadPoolExecutor.class);
+        }
+        if (threadPoolExecutor == null) {
+            // fall back and use default
+            threadPoolExecutor = new ThreadPoolExecutor(4, 16, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
+        }
+        return threadPoolExecutor;
+    }    
+   
     public ThreadPoolExecutor getThreadPoolExecutor() {
         return threadPoolExecutor;
     }