You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2008/12/17 14:31:27 UTC

svn commit: r727377 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/processor/ main/java/org/apache/camel/processor/aggregate/ test/java/org/apache/camel/processor/

Author: ningjiang
Date: Wed Dec 17 05:31:26 2008
New Revision: 727377

URL: http://svn.apache.org/viewvc?rev=727377&view=rev
Log:
CAMEL-159 added testcase for propagating exception in Splitter

Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=727377&r1=727376&r2=727377&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Wed Dec 17 05:31:26 2008
@@ -187,8 +187,11 @@
                 Processor producer = pair.getProcessor();
                 Exchange subExchange = pair.getExchange();
                 updateNewExchange(subExchange, i, pairs);
-
-                producer.process(subExchange);
+                try {
+                    producer.process(subExchange);
+                } catch (Exception exception) {
+                    subExchange.setException(exception);
+                }
                 doAggregate(result, subExchange);
                 i++;
             }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java?rev=727377&r1=727376&r2=727377&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java Wed Dec 17 05:31:26 2008
@@ -28,6 +28,13 @@
 public class UseLatestAggregationStrategy implements AggregationStrategy {
 
     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        newExchange.setException(checkException(oldExchange, newExchange));
         return newExchange;
     }
+    
+    protected Throwable checkException(Exchange oldExchange, Exchange newExchange) {
+        return newExchange.getException() != null
+                ? newExchange.getException()
+                : oldExchange.getException();
+    }
 }

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java?rev=727377&r1=727376&r2=727377&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java Wed Dec 17 05:31:26 2008
@@ -20,6 +20,7 @@
 import java.util.Set;
 import java.util.TreeSet;
 
+import org.apache.camel.CamelException;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -179,14 +180,50 @@
         }
 
     }
+    
+    public void testSplitterWithException() throws Exception {
+        MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
+        resultEndpoint.expectedMessageCount(4);
+        resultEndpoint.expectedHeaderReceived("foo", "bar");
+        
+        MockEndpoint failedEndpoint = getMockEndpoint("mock:failed");
+        failedEndpoint.expectedMessageCount(1);
+        failedEndpoint.expectedHeaderReceived("foo", "bar");
+        
+        Exchange result = template.send("direct:exception", new Processor() {
+            public void process(Exchange exchange) {
+                Message in = exchange.getIn();
+                in.setBody("James,Guillaume,Hiram,Rob,Exception");
+                in.setHeader("foo", "bar");
+            }
+        });
+        
+        assertTrue("The result exchange should have a camel exception", result.getException() instanceof CamelException);
+
+        assertMockEndpointsSatisfied();
+    }
 
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
+                errorHandler(deadLetterChannel("mock:failed").maximumRedeliveries(0));
                 from("direct:seqential").split(body().tokenize(","), new UseLatestAggregationStrategy()).to("mock:result");
-                from("direct:parallel").split(body().tokenize(","), new MyAggregationStrategy(), true).to("mock:result");
+                from("direct:parallel").split(body().tokenize(","), new MyAggregationStrategy()).parallelProcessing(true).to("mock:result");
                 from("direct:streaming").split(body().tokenize(",")).streaming().to("mock:result");
                 from("direct:parallel-streaming").split(body().tokenize(","), new MyAggregationStrategy(), true).streaming().to("mock:result");
+                from("direct:exception")
+                    .split(body().tokenize(","))
+                    .aggregationStrategy(new MyAggregationStrategy())
+                    .parallelProcessing(true).streaming()
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            String string = exchange.getIn().getBody(String.class);
+                            if ("Exception".equals(string)) {
+                                throw new CamelException("Just want to throw exception here");
+                            }
+                        
+                        }                    
+                    }).to("mock:result");
             }
         };
     }