You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/09/16 15:55:49 UTC
camel git commit: CAMEL-8587: Added unit test and made more docs how
to do this.
Repository: camel
Updated Branches:
refs/heads/master eeb09c827 -> e01fdb387
CAMEL-8587: Added unit test and made more docs how to do this.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e01fdb38
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e01fdb38
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e01fdb38
Branch: refs/heads/master
Commit: e01fdb387e8658c84214ea242599f963f51cbc4f
Parents: eeb09c8
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Sep 16 15:27:33 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Sep 16 15:27:33 2015 +0200
----------------------------------------------------------------------
.../apache/camel/model/MulticastDefinition.java | 6 +-
.../camel/processor/MulticastProcessor.java | 16 +++--
...stAggregationStrategyThrowExceptionTest.java | 69 ++++++++++++++++++++
3 files changed, 85 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/e01fdb38/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
index 2e7e76c..ea8cb9d 100644
--- a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
@@ -105,7 +105,9 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
/**
* Sets the AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast.
- * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy
+ * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy.
+ * If an exception is thrown from the aggregate method in the AggregationStrategy, then by default, that exception
+ * is not handled by the error handler. The error handler can be enabled to react if enabling the shareUnitOfWork option.
*/
public MulticastDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) {
setAggregationStrategy(aggregationStrategy);
@@ -115,6 +117,8 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
/**
* Sets a reference to the AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast.
* By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy
+ * If an exception is thrown from the aggregate method in the AggregationStrategy, then by default, that exception
+ * is not handled by the error handler. The error handler can be enabled to react if enabling the shareUnitOfWork option.
*/
public MulticastDefinition aggregationStrategyRef(String aggregationStrategyRef) {
setStrategyRef(aggregationStrategyRef);
http://git-wip-us.apache.org/repos/asf/camel/blob/e01fdb38/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 2358f91..acc35b9 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -518,12 +518,18 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
@Override
public void run() {
- if (parallelAggregate) {
- doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange);
- } else {
- doAggregate(getAggregationStrategy(subExchange), result, subExchange);
+ try {
+ if (parallelAggregate) {
+ doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange);
+ } else {
+ doAggregate(getAggregationStrategy(subExchange), result, subExchange);
+ }
+ } catch (Throwable e) {
+ // wrap in exception to explain where it failed
+ subExchange.setException(new CamelExchangeException("Parallel processing failed for number " + aggregated.get(), subExchange, e));
+ } finally {
+ aggregated.incrementAndGet();
}
- aggregated.incrementAndGet();
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e01fdb38/camel-core/src/test/java/org/apache/camel/issues/MulticastAggregationStrategyThrowExceptionTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/issues/MulticastAggregationStrategyThrowExceptionTest.java b/camel-core/src/test/java/org/apache/camel/issues/MulticastAggregationStrategyThrowExceptionTest.java
new file mode 100644
index 0000000..d98ee76
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/issues/MulticastAggregationStrategyThrowExceptionTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+/**
+ * @version
+ */
+public class MulticastAggregationStrategyThrowExceptionTest extends ContextTestSupport {
+
+ public void testThrowException() throws Exception {
+ getMockEndpoint("mock:a").expectedMessageCount(1);
+ getMockEndpoint("mock:b").expectedMessageCount(1);
+ getMockEndpoint("mock:dead").expectedMessageCount(1);
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ errorHandler(deadLetterChannel("mock:dead"));
+
+ // must use share UoW if we want the error handler to react on exceptions
+ // from the aggregation strategy also.
+ from("direct:start").multicast(new MyAggregateBean()).shareUnitOfWork()
+ .to("direct:a")
+ .to("direct:b")
+ .end();
+
+ from("direct:a").to("mock:a");
+ from("direct:b").to("mock:b");
+ }
+ };
+ }
+
+ public static class MyAggregateBean implements AggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange != null) {
+ throw new IllegalArgumentException("Forced");
+ }
+ return newExchange;
+ }
+ }
+
+}