You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/09/16 15:55:49 UTC

camel git commit: CAMEL-8587: Added unit test and made more docs how to do this.

Repository: camel
Updated Branches:
  refs/heads/master eeb09c827 -> e01fdb387


CAMEL-8587: Added unit test and made more docs how to do this.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e01fdb38
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e01fdb38
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e01fdb38

Branch: refs/heads/master
Commit: e01fdb387e8658c84214ea242599f963f51cbc4f
Parents: eeb09c8
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Sep 16 15:27:33 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Sep 16 15:27:33 2015 +0200

----------------------------------------------------------------------
 .../apache/camel/model/MulticastDefinition.java |  6 +-
 .../camel/processor/MulticastProcessor.java     | 16 +++--
 ...stAggregationStrategyThrowExceptionTest.java | 69 ++++++++++++++++++++
 3 files changed, 85 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e01fdb38/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
index 2e7e76c..ea8cb9d 100644
--- a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
@@ -105,7 +105,9 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
 
     /**
      * Sets the AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast.
-     * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy
+     * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy.
+     * If an exception is thrown from the aggregate method in the AggregationStrategy, then by default, that exception
+     * is not handled by the error handler. The error handler can be enabled to react if enabling the shareUnitOfWork option.
      */
     public MulticastDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) {
         setAggregationStrategy(aggregationStrategy);
@@ -115,6 +117,8 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
     /**
      * Sets a reference to the AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast.
      * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy
+     * If an exception is thrown from the aggregate method in the AggregationStrategy, then by default, that exception
+     * is not handled by the error handler. The error handler can be enabled to react if enabling the shareUnitOfWork option.
      */
     public MulticastDefinition aggregationStrategyRef(String aggregationStrategyRef) {
         setStrategyRef(aggregationStrategyRef);

http://git-wip-us.apache.org/repos/asf/camel/blob/e01fdb38/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 2358f91..acc35b9 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -518,12 +518,18 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
 
         @Override
         public void run() {
-            if (parallelAggregate) {
-                doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange);
-            } else {
-                doAggregate(getAggregationStrategy(subExchange), result, subExchange);
+            try {
+                if (parallelAggregate) {
+                    doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange);
+                } else {
+                    doAggregate(getAggregationStrategy(subExchange), result, subExchange);
+                }
+            } catch (Throwable e) {
+                // wrap in exception to explain where it failed
+                subExchange.setException(new CamelExchangeException("Parallel processing failed for number " + aggregated.get(), subExchange, e));
+            } finally {
+                aggregated.incrementAndGet();
             }
-            aggregated.incrementAndGet();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/e01fdb38/camel-core/src/test/java/org/apache/camel/issues/MulticastAggregationStrategyThrowExceptionTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/issues/MulticastAggregationStrategyThrowExceptionTest.java b/camel-core/src/test/java/org/apache/camel/issues/MulticastAggregationStrategyThrowExceptionTest.java
new file mode 100644
index 0000000..d98ee76
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/issues/MulticastAggregationStrategyThrowExceptionTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+/**
+ * @version 
+ */
+public class MulticastAggregationStrategyThrowExceptionTest extends ContextTestSupport {
+
+    public void testThrowException() throws Exception {
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(1);
+        getMockEndpoint("mock:dead").expectedMessageCount(1);
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dead"));
+
+                // must use share UoW if we want the error handler to react on exceptions
+                // from the aggregation strategy also.
+                from("direct:start").multicast(new MyAggregateBean()).shareUnitOfWork()
+                    .to("direct:a")
+                    .to("direct:b")
+                .end();
+
+                from("direct:a").to("mock:a");
+                from("direct:b").to("mock:b");
+            }
+        };
+    }
+
+    public static class MyAggregateBean implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange != null) {
+                throw new IllegalArgumentException("Forced");
+            }
+            return newExchange;
+        }
+    }
+
+}