You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2008/12/17 14:31:27 UTC
svn commit: r727377 - in /activemq/camel/trunk/camel-core/src:
main/java/org/apache/camel/processor/
main/java/org/apache/camel/processor/aggregate/
test/java/org/apache/camel/processor/
Author: ningjiang
Date: Wed Dec 17 05:31:26 2008
New Revision: 727377
URL: http://svn.apache.org/viewvc?rev=727377&view=rev
Log:
CAMEL-159 added testcase for propagating exception in Splitter
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=727377&r1=727376&r2=727377&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Wed Dec 17 05:31:26 2008
@@ -187,8 +187,11 @@
Processor producer = pair.getProcessor();
Exchange subExchange = pair.getExchange();
updateNewExchange(subExchange, i, pairs);
-
- producer.process(subExchange);
+ try {
+ producer.process(subExchange);
+ } catch (Exception exception) {
+ subExchange.setException(exception);
+ }
doAggregate(result, subExchange);
i++;
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java?rev=727377&r1=727376&r2=727377&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java Wed Dec 17 05:31:26 2008
@@ -28,6 +28,13 @@
public class UseLatestAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ newExchange.setException(checkException(oldExchange, newExchange));
return newExchange;
}
+
+ protected Throwable checkException(Exchange oldExchange, Exchange newExchange) {
+ return newExchange.getException() != null
+ ? newExchange.getException()
+ : oldExchange.getException();
+ }
}
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java?rev=727377&r1=727376&r2=727377&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java Wed Dec 17 05:31:26 2008
@@ -20,6 +20,7 @@
import java.util.Set;
import java.util.TreeSet;
+import org.apache.camel.CamelException;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
@@ -179,14 +180,50 @@
}
}
+
+ public void testSplitterWithException() throws Exception {
+ MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
+ resultEndpoint.expectedMessageCount(4);
+ resultEndpoint.expectedHeaderReceived("foo", "bar");
+
+ MockEndpoint failedEndpoint = getMockEndpoint("mock:failed");
+ failedEndpoint.expectedMessageCount(1);
+ failedEndpoint.expectedHeaderReceived("foo", "bar");
+
+ Exchange result = template.send("direct:exception", new Processor() {
+ public void process(Exchange exchange) {
+ Message in = exchange.getIn();
+ in.setBody("James,Guillaume,Hiram,Rob,Exception");
+ in.setHeader("foo", "bar");
+ }
+ });
+
+ assertTrue("The result exchange should have a camel exception", result.getException() instanceof CamelException);
+
+ assertMockEndpointsSatisfied();
+ }
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
+ errorHandler(deadLetterChannel("mock:failed").maximumRedeliveries(0));
from("direct:seqential").split(body().tokenize(","), new UseLatestAggregationStrategy()).to("mock:result");
- from("direct:parallel").split(body().tokenize(","), new MyAggregationStrategy(), true).to("mock:result");
+ from("direct:parallel").split(body().tokenize(","), new MyAggregationStrategy()).parallelProcessing(true).to("mock:result");
from("direct:streaming").split(body().tokenize(",")).streaming().to("mock:result");
from("direct:parallel-streaming").split(body().tokenize(","), new MyAggregationStrategy(), true).streaming().to("mock:result");
+ from("direct:exception")
+ .split(body().tokenize(","))
+ .aggregationStrategy(new MyAggregationStrategy())
+ .parallelProcessing(true).streaming()
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ String string = exchange.getIn().getBody(String.class);
+ if ("Exception".equals(string)) {
+ throw new CamelException("Just want to throw exception here");
+ }
+
+ }
+ }).to("mock:result");
}
};
}