You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2008/03/17 10:38:42 UTC

svn commit: r637802 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/ test/java/org/apache/camel/processor/

Author: ningjiang
Date: Mon Mar 17 02:38:36 2008
New Revision: 637802

URL: http://svn.apache.org/viewvc?rev=637802&view=rev
Log:
CAMEL-389 Parallel processing route

Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastType.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastType.java?rev=637802&r1=637801&r2=637802&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastType.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastType.java Mon Mar 17 02:38:36 2008
@@ -17,6 +17,7 @@
 package org.apache.camel.model;
 
 import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
@@ -35,16 +36,19 @@
 @XmlRootElement(name = "multicast")
 @XmlAccessorType(XmlAccessType.FIELD)
 public class MulticastType extends OutputType<ProcessorType> {
+    private boolean parallelProcessing;
     @XmlTransient
     private AggregationStrategy aggregationStrategy;
-    
+    @XmlTransient
+    private ThreadPoolExecutor threadPoolExecutor;
+
     @Override
     public String toString() {
         return "Multicast[" + getOutputs() + "]";
     }
 
     @Override
-    public Processor createProcessor(RouteContext routeContext) throws Exception {        
+    public Processor createProcessor(RouteContext routeContext) throws Exception {
         return createOutputsProcessor(routeContext);
     }
 
@@ -52,14 +56,31 @@
         if (aggregationStrategy == null) {
             aggregationStrategy = new UseLatestAggregationStrategy();
         }
-        return new MulticastProcessor(list, aggregationStrategy);
+        return new MulticastProcessor(list, aggregationStrategy, parallelProcessing, threadPoolExecutor);
     }
-    
+
     public AggregationStrategy getAggregationStrategy() {
         return aggregationStrategy;
     }
 
     public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
         this.aggregationStrategy = aggregationStrategy;
+    }
+
+    public boolean isParallelProcessing() {
+        return parallelProcessing;
+    }
+
+    public void setParallelProcessing(boolean parallelProcessing) {
+        this.parallelProcessing = parallelProcessing;
+    }
+
+    public ThreadPoolExecutor getThreadPoolExecutor() {
+        return threadPoolExecutor;
+    }
+
+    public void setThreadPoolExecutor(ThreadPoolExecutor executor) {
+        this.threadPoolExecutor = executor;
+
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java?rev=637802&r1=637801&r2=637802&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java Mon Mar 17 02:38:36 2008
@@ -164,11 +164,28 @@
         addOutput(answer);
         return answer;
     }
-    
+
     /**
      * Multicasts messages to all its child outputs; so that each processor and
      * destination gets a copy of the original message to avoid the processors
-     * interfering with each other.     
+     * interfering with each other.
+     * @param aggregationStrategy the strategy used to aggregate responses for
+     *          every part
+     * @param pralleProcessing if is true camel will fork thread to call the endpoint producer
+     * @return the multicast type
+     */
+    public MulticastType multicast(AggregationStrategy aggregationStrategy, boolean paralleProcessing) {
+        MulticastType answer = new MulticastType();
+        addOutput(answer);
+        answer.setAggregationStrategy(aggregationStrategy);
+        answer.setParallelProcessing(true);
+        return answer;
+    }
+
+    /**
+     * Multicasts messages to all its child outputs; so that each processor and
+     * destination gets a copy of the original message to avoid the processors
+     * interfering with each other.
      * @param aggregationStrategy the strategy used to aggregate responses for
      *          every part
      * @return the multicast type
@@ -308,13 +325,13 @@
     public FilterType filter(String language, String expression) {
         return filter(new LanguageExpression(language, expression));
     }
-    
+
     public LoadBalanceType loadBalance() {
         LoadBalanceType answer = new LoadBalanceType();
         addOutput(answer);
         return answer;
-    }    
-    
+    }
+
 
     /**
      * Creates a choice of one or more predicates with an otherwise clause
@@ -374,7 +391,7 @@
      * pattern where an expression is evaluated to iterate through each of the
      * parts of a message and then each part is then send to some endpoint.
      * This splitter responds with the latest message returned from destination
-     * endpoint. 
+     * endpoint.
      *
      * @param receipients the expression on which to split
      * @return the builder
@@ -391,8 +408,8 @@
      * pattern where an expression is evaluated to iterate through each of the
      * parts of a message and then each part is then send to some endpoint.
      * This splitter responds with the latest message returned from destination
-     * endpoint. 
-     * 
+     * endpoint.
+     *
      * @return the expression clause for the expression on which to split
      */
     public ExpressionClause<SplitterType> splitter() {
@@ -435,7 +452,7 @@
         answer.setAggregationStrategy(aggregationStrategy);
         return ExpressionClause.createAndSetExpression(answer);
     }
-    
+
     /**
      * Creates the <a
      * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
@@ -1151,11 +1168,11 @@
     public ProcessorType<? extends ProcessorType> getParent() {
     	return parent;
     }
-    
+
     public void setParent(ProcessorType<? extends ProcessorType> parent) {
     	this.parent = parent;
     }
-    
+
     @XmlTransient
     public ErrorHandlerBuilder getErrorHandlerBuilder() {
         if (errorHandlerBuilder == null) {

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=637802&r1=637801&r2=637802&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Mon Mar 17 02:38:36 2008
@@ -20,11 +20,21 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.processor.ThreadProcessor.ProcessCall;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ServiceHelper;
@@ -32,22 +42,38 @@
 /**
  * Implements the Multicast pattern to send a message exchange to a number of
  * endpoints, each endpoint receiving a copy of the message exchange.
- * 
+ *
  * @see Pipeline
  * @version $Revision$
  */
 public class MulticastProcessor extends ServiceSupport implements Processor {
     private Collection<Processor> processors;
     private AggregationStrategy aggregationStrategy;
-    
+    private boolean isParallelProcessing;
+    private ThreadPoolExecutor executor;
+    private final AtomicBoolean shutdown = new AtomicBoolean(true);
+
     public MulticastProcessor(Collection<Processor> processors) {
         this(processors, null);
     }
 
     public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy) {
+        this(processors, null, false, null);
+    }
+
+    public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing, ThreadPoolExecutor executor) {
         this.processors = processors;
         this.aggregationStrategy = aggregationStrategy;
-        notNull(processors, "processors");        
+        this.isParallelProcessing = parallelProcessing;
+        if (isParallelProcessing) {
+            if (executor != null) {
+                this.executor = executor;
+            } else {// setup default Executor
+                this.executor = new ThreadPoolExecutor(1, processors.size(), 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(processors.size()));
+            }
+
+        }
+        notNull(processors, "processors");
     }
 
     /**
@@ -67,18 +93,74 @@
         return "Multicast" + getProcessors();
     }
 
+    class ProcessCall implements Runnable {
+        private final Exchange exchange;
+        private final AsyncCallback callback;
+        private final Processor processor;
+
+        public ProcessCall(Exchange exchange, Processor processor, AsyncCallback callback) {
+            this.exchange = exchange;
+            this.callback = callback;
+            this.processor = processor;
+        }
+
+        public void run() {
+            if( shutdown.get() ) {
+                exchange.setException(new RejectedExecutionException());
+                callback.done(false);
+            } else {
+                try {
+                    processor.process(exchange);
+                } catch (Exception ex) {
+                    exchange.setException(ex);
+                }
+                callback.done(false);
+            }
+        }
+    }
+
     public void process(Exchange exchange) throws Exception {
         Exchange result = null;
-        for (Processor producer : processors) {
-            Exchange copy = copyExchangeStrategy(producer, exchange);
-            producer.process(copy);
+        // Parallel Processing the producer
+        if (isParallelProcessing) {
+            Exchange[] exchanges = new Exchange[processors.size()];
+            final CountDownLatch completedExchanges = new CountDownLatch(exchanges.length);
+            int i = 0;
+            for (Processor producer : processors) {
+                exchanges[i] = copyExchangeStrategy(producer, exchange);
+                ProcessCall call = new ProcessCall(exchanges[i], producer, new AsyncCallback(){
+                    public void done(boolean doneSynchronously) {
+                        completedExchanges.countDown();
+                    }
+
+                });
+                executor.execute(call);
+                i++;
+            }
+            completedExchanges.await();
             if (aggregationStrategy != null) {
-                if (result == null) {
-                    result = copy;
-                } else {
-                    result = aggregationStrategy.aggregate(result, copy);                    
+                for (Exchange resultExchange: exchanges) {
+                    if (result == null) {
+                        result = resultExchange;
+                    } else {
+                        result = aggregationStrategy.aggregate(result, resultExchange);
+                    }
+                }
+            }
+
+        } else {
+            // we call the producer one by one sequentially
+            for (Processor producer : processors) {
+                Exchange copy = copyExchangeStrategy(producer, exchange);
+                producer.process(copy);
+                if (aggregationStrategy != null) {
+                    if (result == null) {
+                        result = copy;
+                    } else {
+                        result = aggregationStrategy.aggregate(result, copy);
+                    }
                 }
-            }    
+            }
         }
         if (result != null) {
             ExchangeHelper.copyResults(exchange, result);
@@ -86,10 +168,25 @@
     }
 
     protected void doStop() throws Exception {
+        shutdown.set(true);
+        if (executor != null) {
+            executor.shutdown();
+            executor.awaitTermination(0, TimeUnit.SECONDS);
+        }
         ServiceHelper.stopServices(processors);
     }
 
     protected void doStart() throws Exception {
+        shutdown.set(false);
+        if (executor != null) {
+            executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
+                public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
+                    ProcessCall call = (ProcessCall)runnable;
+                    call.exchange.setException(new RejectedExecutionException());
+                    call.callback.done(false);
+                }
+            });
+        }
         ServiceHelper.startServices(processors);
     }
 
@@ -103,7 +200,7 @@
     /**
      * Strategy method to copy the exchange before sending to another endpoint.
      * Derived classes such as the {@link Pipeline} will not clone the exchange
-     * 
+     *
      * @param processor the processor that will send the exchange
      * @param exchange
      * @return the current exchange if no copying is required such as for a

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java?rev=637802&r1=637801&r2=637802&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java Mon Mar 17 02:38:36 2008
@@ -34,7 +34,7 @@
 
 /**
  * A processor that forces async processing of the exchange using a thread pool.
- * 
+ *
  * @version $Revision$
  */
 public class ThreadProcessor implements AsyncProcessor, Service {
@@ -49,7 +49,7 @@
     private long keepAliveTime;
     private int maxSize = 1;
     private int coreSize = 1;
-    private final AtomicBoolean shutdown = new AtomicBoolean(true);;
+    private final AtomicBoolean shutdown = new AtomicBoolean(true);
 
     class ProcessCall implements Runnable {
         private final Exchange exchange;

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java?rev=637802&r1=637801&r2=637802&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java Mon Mar 17 02:38:36 2008
@@ -30,17 +30,32 @@
     protected Endpoint<Exchange> startEndpoint;
     protected MockEndpoint result;
 
+    public void testMulticastReceivesItsOwnExchangeParallelly() throws Exception {
+        sendingAMessageUsingMulticastReceivesItsOwnExchange(true);        
+    }
+    
+    public void testMulticastReceivesItsOwnExchangeSequentially() throws Exception {        
+        sendingAMessageUsingMulticastReceivesItsOwnExchange(false);
+    }
 
-    public void testSendingAMessageUsingMulticastReceivesItsOwnExchange() throws Exception {
+    private void sendingAMessageUsingMulticastReceivesItsOwnExchange(boolean isParallel) throws Exception {
         result.expectedBodiesReceived("inputx+inputy+inputz");
+        String url;
+        if (isParallel) {
+            url = "direct:parallel";
+        } else {
+            url = "direct:sequential";
+        }
 
-        Exchange result = template.send("direct:a", new Processor() {
+        Exchange result = template.send(url, new Processor() {
             public void process(Exchange exchange) {
                 Message in = exchange.getIn();
                 in.setBody("input");
                 in.setHeader("foo", "bar");
             }
         });
+
+
         assertNotNull("We should get result here", result);
         assertEquals("Can't get the right result", "inputx+inputy+inputz", result.getOut().getBody(String.class));
 
@@ -117,7 +132,9 @@
 
         return new RouteBuilder() {
             public void configure() {
-                from("direct:a").multicast(new BodyOutAggregatingStrategy()).to("direct:x", "direct:y", "direct:z");
+                from("direct:parallel").multicast(new BodyOutAggregatingStrategy(), true).to("direct:x", "direct:y", "direct:z");
+
+                from("direct:sequential").multicast(new BodyOutAggregatingStrategy()).to("direct:x", "direct:y", "direct:z");
 
                 from("direct:x").process(new AppendingProcessor("x")).to("direct:aggregater");
                 from("direct:y").process(new AppendingProcessor("y")).to("direct:aggregater");