You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2008/03/17 10:38:42 UTC
svn commit: r637802 - in /activemq/camel/trunk/camel-core/src:
main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/
test/java/org/apache/camel/processor/
Author: ningjiang
Date: Mon Mar 17 02:38:36 2008
New Revision: 637802
URL: http://svn.apache.org/viewvc?rev=637802&view=rev
Log:
CAMEL-389 Parallel processing route
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastType.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastType.java?rev=637802&r1=637801&r2=637802&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastType.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastType.java Mon Mar 17 02:38:36 2008
@@ -17,6 +17,7 @@
package org.apache.camel.model;
import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
@@ -35,16 +36,19 @@
@XmlRootElement(name = "multicast")
@XmlAccessorType(XmlAccessType.FIELD)
public class MulticastType extends OutputType<ProcessorType> {
+ private boolean parallelProcessing;
@XmlTransient
private AggregationStrategy aggregationStrategy;
-
+ @XmlTransient
+ private ThreadPoolExecutor threadPoolExecutor;
+
@Override
public String toString() {
return "Multicast[" + getOutputs() + "]";
}
@Override
- public Processor createProcessor(RouteContext routeContext) throws Exception {
+ public Processor createProcessor(RouteContext routeContext) throws Exception {
return createOutputsProcessor(routeContext);
}
@@ -52,14 +56,31 @@
if (aggregationStrategy == null) {
aggregationStrategy = new UseLatestAggregationStrategy();
}
- return new MulticastProcessor(list, aggregationStrategy);
+ return new MulticastProcessor(list, aggregationStrategy, parallelProcessing, threadPoolExecutor);
}
-
+
public AggregationStrategy getAggregationStrategy() {
return aggregationStrategy;
}
public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
this.aggregationStrategy = aggregationStrategy;
+ }
+
+ public boolean isParallelProcessing() {
+ return parallelProcessing;
+ }
+
+ public void setParallelProcessing(boolean parallelProcessing) {
+ this.parallelProcessing = parallelProcessing;
+ }
+
+ public ThreadPoolExecutor getThreadPoolExecutor() {
+ return threadPoolExecutor;
+ }
+
+ public void setThreadPoolExecutor(ThreadPoolExecutor executor) {
+ this.threadPoolExecutor = executor;
+
}
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java?rev=637802&r1=637801&r2=637802&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java Mon Mar 17 02:38:36 2008
@@ -164,11 +164,28 @@
addOutput(answer);
return answer;
}
-
+
/**
* Multicasts messages to all its child outputs; so that each processor and
* destination gets a copy of the original message to avoid the processors
- * interfering with each other.
+ * interfering with each other.
+ * @param aggregationStrategy the strategy used to aggregate responses for
+ * every part
+ * @param pralleProcessing if is true camel will fork thread to call the endpoint producer
+ * @return the multicast type
+ */
+ public MulticastType multicast(AggregationStrategy aggregationStrategy, boolean paralleProcessing) {
+ MulticastType answer = new MulticastType();
+ addOutput(answer);
+ answer.setAggregationStrategy(aggregationStrategy);
+ answer.setParallelProcessing(true);
+ return answer;
+ }
+
+ /**
+ * Multicasts messages to all its child outputs; so that each processor and
+ * destination gets a copy of the original message to avoid the processors
+ * interfering with each other.
* @param aggregationStrategy the strategy used to aggregate responses for
* every part
* @return the multicast type
@@ -308,13 +325,13 @@
public FilterType filter(String language, String expression) {
return filter(new LanguageExpression(language, expression));
}
-
+
public LoadBalanceType loadBalance() {
LoadBalanceType answer = new LoadBalanceType();
addOutput(answer);
return answer;
- }
-
+ }
+
/**
* Creates a choice of one or more predicates with an otherwise clause
@@ -374,7 +391,7 @@
* pattern where an expression is evaluated to iterate through each of the
* parts of a message and then each part is then send to some endpoint.
* This splitter responds with the latest message returned from destination
- * endpoint.
+ * endpoint.
*
* @param receipients the expression on which to split
* @return the builder
@@ -391,8 +408,8 @@
* pattern where an expression is evaluated to iterate through each of the
* parts of a message and then each part is then send to some endpoint.
* This splitter responds with the latest message returned from destination
- * endpoint.
- *
+ * endpoint.
+ *
* @return the expression clause for the expression on which to split
*/
public ExpressionClause<SplitterType> splitter() {
@@ -435,7 +452,7 @@
answer.setAggregationStrategy(aggregationStrategy);
return ExpressionClause.createAndSetExpression(answer);
}
-
+
/**
* Creates the <a
* href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
@@ -1151,11 +1168,11 @@
public ProcessorType<? extends ProcessorType> getParent() {
return parent;
}
-
+
public void setParent(ProcessorType<? extends ProcessorType> parent) {
this.parent = parent;
}
-
+
@XmlTransient
public ErrorHandlerBuilder getErrorHandlerBuilder() {
if (errorHandlerBuilder == null) {
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=637802&r1=637801&r2=637802&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Mon Mar 17 02:38:36 2008
@@ -20,11 +20,21 @@
import java.util.ArrayList;
import java.util.Collection;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.processor.ThreadProcessor.ProcessCall;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ServiceHelper;
@@ -32,22 +42,38 @@
/**
* Implements the Multicast pattern to send a message exchange to a number of
* endpoints, each endpoint receiving a copy of the message exchange.
- *
+ *
* @see Pipeline
* @version $Revision$
*/
public class MulticastProcessor extends ServiceSupport implements Processor {
private Collection<Processor> processors;
private AggregationStrategy aggregationStrategy;
-
+ private boolean isParallelProcessing;
+ private ThreadPoolExecutor executor;
+ private final AtomicBoolean shutdown = new AtomicBoolean(true);
+
public MulticastProcessor(Collection<Processor> processors) {
this(processors, null);
}
public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy) {
+ this(processors, null, false, null);
+ }
+
+ public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing, ThreadPoolExecutor executor) {
this.processors = processors;
this.aggregationStrategy = aggregationStrategy;
- notNull(processors, "processors");
+ this.isParallelProcessing = parallelProcessing;
+ if (isParallelProcessing) {
+ if (executor != null) {
+ this.executor = executor;
+ } else {// setup default Executor
+ this.executor = new ThreadPoolExecutor(1, processors.size(), 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(processors.size()));
+ }
+
+ }
+ notNull(processors, "processors");
}
/**
@@ -67,18 +93,74 @@
return "Multicast" + getProcessors();
}
+ class ProcessCall implements Runnable {
+ private final Exchange exchange;
+ private final AsyncCallback callback;
+ private final Processor processor;
+
+ public ProcessCall(Exchange exchange, Processor processor, AsyncCallback callback) {
+ this.exchange = exchange;
+ this.callback = callback;
+ this.processor = processor;
+ }
+
+ public void run() {
+ if( shutdown.get() ) {
+ exchange.setException(new RejectedExecutionException());
+ callback.done(false);
+ } else {
+ try {
+ processor.process(exchange);
+ } catch (Exception ex) {
+ exchange.setException(ex);
+ }
+ callback.done(false);
+ }
+ }
+ }
+
public void process(Exchange exchange) throws Exception {
Exchange result = null;
- for (Processor producer : processors) {
- Exchange copy = copyExchangeStrategy(producer, exchange);
- producer.process(copy);
+ // Parallel Processing the producer
+ if (isParallelProcessing) {
+ Exchange[] exchanges = new Exchange[processors.size()];
+ final CountDownLatch completedExchanges = new CountDownLatch(exchanges.length);
+ int i = 0;
+ for (Processor producer : processors) {
+ exchanges[i] = copyExchangeStrategy(producer, exchange);
+ ProcessCall call = new ProcessCall(exchanges[i], producer, new AsyncCallback(){
+ public void done(boolean doneSynchronously) {
+ completedExchanges.countDown();
+ }
+
+ });
+ executor.execute(call);
+ i++;
+ }
+ completedExchanges.await();
if (aggregationStrategy != null) {
- if (result == null) {
- result = copy;
- } else {
- result = aggregationStrategy.aggregate(result, copy);
+ for (Exchange resultExchange: exchanges) {
+ if (result == null) {
+ result = resultExchange;
+ } else {
+ result = aggregationStrategy.aggregate(result, resultExchange);
+ }
+ }
+ }
+
+ } else {
+ // we call the producer one by one sequentially
+ for (Processor producer : processors) {
+ Exchange copy = copyExchangeStrategy(producer, exchange);
+ producer.process(copy);
+ if (aggregationStrategy != null) {
+ if (result == null) {
+ result = copy;
+ } else {
+ result = aggregationStrategy.aggregate(result, copy);
+ }
}
- }
+ }
}
if (result != null) {
ExchangeHelper.copyResults(exchange, result);
@@ -86,10 +168,25 @@
}
protected void doStop() throws Exception {
+ shutdown.set(true);
+ if (executor != null) {
+ executor.shutdown();
+ executor.awaitTermination(0, TimeUnit.SECONDS);
+ }
ServiceHelper.stopServices(processors);
}
protected void doStart() throws Exception {
+ shutdown.set(false);
+ if (executor != null) {
+ executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
+ public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
+ ProcessCall call = (ProcessCall)runnable;
+ call.exchange.setException(new RejectedExecutionException());
+ call.callback.done(false);
+ }
+ });
+ }
ServiceHelper.startServices(processors);
}
@@ -103,7 +200,7 @@
/**
* Strategy method to copy the exchange before sending to another endpoint.
* Derived classes such as the {@link Pipeline} will not clone the exchange
- *
+ *
* @param processor the processor that will send the exchange
* @param exchange
* @return the current exchange if no copying is required such as for a
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java?rev=637802&r1=637801&r2=637802&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java Mon Mar 17 02:38:36 2008
@@ -34,7 +34,7 @@
/**
* A processor that forces async processing of the exchange using a thread pool.
- *
+ *
* @version $Revision$
*/
public class ThreadProcessor implements AsyncProcessor, Service {
@@ -49,7 +49,7 @@
private long keepAliveTime;
private int maxSize = 1;
private int coreSize = 1;
- private final AtomicBoolean shutdown = new AtomicBoolean(true);;
+ private final AtomicBoolean shutdown = new AtomicBoolean(true);
class ProcessCall implements Runnable {
private final Exchange exchange;
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java?rev=637802&r1=637801&r2=637802&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java Mon Mar 17 02:38:36 2008
@@ -30,17 +30,32 @@
protected Endpoint<Exchange> startEndpoint;
protected MockEndpoint result;
+ public void testMulticastReceivesItsOwnExchangeParallelly() throws Exception {
+ sendingAMessageUsingMulticastReceivesItsOwnExchange(true);
+ }
+
+ public void testMulticastReceivesItsOwnExchangeSequentially() throws Exception {
+ sendingAMessageUsingMulticastReceivesItsOwnExchange(false);
+ }
- public void testSendingAMessageUsingMulticastReceivesItsOwnExchange() throws Exception {
+ private void sendingAMessageUsingMulticastReceivesItsOwnExchange(boolean isParallel) throws Exception {
result.expectedBodiesReceived("inputx+inputy+inputz");
+ String url;
+ if (isParallel) {
+ url = "direct:parallel";
+ } else {
+ url = "direct:sequential";
+ }
- Exchange result = template.send("direct:a", new Processor() {
+ Exchange result = template.send(url, new Processor() {
public void process(Exchange exchange) {
Message in = exchange.getIn();
in.setBody("input");
in.setHeader("foo", "bar");
}
});
+
+
assertNotNull("We should get result here", result);
assertEquals("Can't get the right result", "inputx+inputy+inputz", result.getOut().getBody(String.class));
@@ -117,7 +132,9 @@
return new RouteBuilder() {
public void configure() {
- from("direct:a").multicast(new BodyOutAggregatingStrategy()).to("direct:x", "direct:y", "direct:z");
+ from("direct:parallel").multicast(new BodyOutAggregatingStrategy(), true).to("direct:x", "direct:y", "direct:z");
+
+ from("direct:sequential").multicast(new BodyOutAggregatingStrategy()).to("direct:x", "direct:y", "direct:z");
from("direct:x").process(new AppendingProcessor("x")).to("direct:aggregater");
from("direct:y").process(new AppendingProcessor("y")).to("direct:aggregater");