You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/08/07 05:14:32 UTC

[camel] branch master updated: CAMEL-13287: AggregationStrategy - Access original exchange in aggregate method for multicast, recipient list and splitter

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new c0bf3d4  CAMEL-13287: AggregationStrategy - Access original exchange in aggregate method for multicast, recipient list and splitter
c0bf3d4 is described below

commit c0bf3d4e73889905605b0ad3331013b2025cea36
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Aug 7 07:14:01 2019 +0200

    CAMEL-13287: AggregationStrategy - Access original exchange in aggregate method for multicast, recipient list and splitter
---
 .../java/org/apache/camel/AggregationStrategy.java | 15 ++++
 .../apache/camel/processor/MulticastProcessor.java | 29 ++++----
 .../ShareUnitOfWorkAggregationStrategy.java        | 10 +++
 .../src/main/docs/eips/multicast-eip.adoc          |  5 ++
 .../src/main/docs/eips/recipientList-eip.adoc      | 25 ++-----
 core/camel-core/src/main/docs/eips/split-eip.adoc  | 17 ++---
 .../camel/builder/AggregationStrategyClause.java   |  5 ++
 ...ticastAggregationStrategyInputExchangeTest.java | 82 ++++++++++++++++++++
 ...ntListAggregationStrategyInputExchangeTest.java | 80 ++++++++++++++++++++
 ...litterAggregationStrategyInputExchangeTest.java | 87 ++++++++++++++++++++++
 10 files changed, 311 insertions(+), 44 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java b/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java
index c018c17..6ae3f10 100644
--- a/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java
@@ -59,6 +59,21 @@ public interface AggregationStrategy {
     Exchange aggregate(Exchange oldExchange, Exchange newExchange);
 
     /**
+     * Aggregates an old and new exchange together to create a single combined exchange.
+     * <p/>
+     * Important: Only Multicast and Recipient List EIP supports this method with access to the input exchange. All other EIPs
+     * does not and uses the {@link #aggregate(Exchange, Exchange)} method instead.
+     *
+     * @param oldExchange    the oldest exchange (is <tt>null</tt> on first aggregation as we only have the new exchange)
+     * @param newExchange    the newest exchange (can be <tt>null</tt> if there was no data possible to acquire)
+     * @param inputExchange  the input exchange (input to the EIP)
+     * @return a combined composite of the two exchanges, favor returning the <tt>oldExchange</tt> whenever possible
+     */
+    default Exchange aggregate(Exchange oldExchange, Exchange newExchange, Exchange inputExchange) {
+        return aggregate(oldExchange, newExchange);
+    }
+
+    /**
      * Indicates if this aggregation strategy uses pre-completion mode.
      * @return <tt>true</tt> if this strategy uses pre-completion mode, or <tt>false</tt> otherwise.
      */
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 64b20c9..9172b76 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -65,10 +65,10 @@ import org.apache.camel.util.concurrent.AsyncCompletionService;
 
 import static org.apache.camel.util.ObjectHelper.notNull;
 
-
 /**
  * Implements the Multicast pattern to send a message exchange to a number of
  * endpoints, each endpoint receiving a copy of the message exchange.
+ *
  * @see Pipeline
  */
 public class MulticastProcessor extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, IdAware {
@@ -281,7 +281,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
 
         @Override
         public String toString() {
-            return "Step[" + original.getExchangeId() + "," + MulticastProcessor.this + "]";
+            return "MulticastTask[" + original.getExchangeId() + "," + MulticastProcessor.this + "]";
         }
 
         @Override
@@ -370,7 +370,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
                 try {
                     Exchange exchange;
                     while (!done.get() && (exchange = completion.poll()) != null) {
-                        doAggregate(result, exchange);
+                        doAggregate(result, exchange, original);
                         if (nbAggregated.incrementAndGet() >= nbExchangeSent.get() && allSent.get()) {
                             doDone(result.get(), true);
                         }
@@ -398,7 +398,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
                                     nbAggregated.getAndIncrement(), nbExchangeSent.get(), timeout);
                         }
                         if (exchange != null) {
-                            doAggregate(result, exchange);
+                            doAggregate(result, exchange, original);
                             nbAggregated.incrementAndGet();
                         }
                     }
@@ -544,14 +544,13 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
      *
      * @param result   the current result
      * @param exchange the exchange to be added to the result
-     * @see #doAggregateInternal(AggregationStrategy, AtomicReference, org.apache.camel.Exchange)
-     * @see #doAggregateSync(AggregationStrategy, AtomicReference, org.apache.camel.Exchange)
+     * @param inputExchange the input exchange that was sent as input to this EIP
      */
-    protected void doAggregate(AtomicReference<Exchange> result, Exchange exchange) {
+    protected void doAggregate(AtomicReference<Exchange> result, Exchange exchange, Exchange inputExchange) {
         if (parallelAggregate) {
-            doAggregateInternal(getAggregationStrategy(exchange), result, exchange);
+            doAggregateInternal(getAggregationStrategy(exchange), result, exchange, inputExchange);
         } else {
-            doAggregateSync(getAggregationStrategy(exchange), result, exchange);
+            doAggregateSync(getAggregationStrategy(exchange), result, exchange, inputExchange);
         }
     }
 
@@ -562,10 +561,10 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
      * @param strategy the aggregation strategy to use
      * @param result   the current result
      * @param exchange the exchange to be added to the result
-     * @see #doAggregateInternal(AggregationStrategy, AtomicReference, org.apache.camel.Exchange)
+     * @param inputExchange the input exchange that was sent as input to this EIP
      */
-    protected synchronized void doAggregateSync(AggregationStrategy strategy, AtomicReference<Exchange> result, Exchange exchange) {
-        doAggregateInternal(strategy, result, exchange);
+    private synchronized void doAggregateSync(AggregationStrategy strategy, AtomicReference<Exchange> result, Exchange exchange, Exchange inputExchange) {
+        doAggregateInternal(strategy, result, exchange, inputExchange);
     }
 
     /**
@@ -576,14 +575,14 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
      * @param strategy the aggregation strategy to use
      * @param result   the current result
      * @param exchange the exchange to be added to the result
-     * @see #doAggregateSync
+     * @param inputExchange the input exchange that was sent as input to this EIP
      */
-    protected void doAggregateInternal(AggregationStrategy strategy, AtomicReference<Exchange> result, Exchange exchange) {
+    private void doAggregateInternal(AggregationStrategy strategy, AtomicReference<Exchange> result, Exchange exchange, Exchange inputExchange) {
         if (strategy != null) {
             // prepare the exchanges for aggregation
             Exchange oldExchange = result.get();
             ExchangeHelper.prepareAggregation(oldExchange, exchange);
-            result.set(strategy.aggregate(oldExchange, exchange));
+            result.set(strategy.aggregate(oldExchange, exchange, inputExchange));
         }
     }
 
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
index 5628ed5..b367070 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
@@ -94,6 +94,16 @@ public final class ShareUnitOfWorkAggregationStrategy extends ServiceSupport imp
         return answer;
     }
 
+    @Override
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange, Exchange inputExchange) {
+        // aggregate using the actual strategy first
+        Exchange answer = strategy.aggregate(oldExchange, newExchange, inputExchange);
+        // ensure any errors is propagated from the new exchange to the answer
+        propagateFailure(answer, newExchange);
+
+        return answer;
+    }
+
     protected void propagateFailure(Exchange answer, Exchange newExchange) {
         // if new exchange failed then propagate all the error related properties to the answer
         boolean exceptionHandled = hasExceptionBeenHandledByErrorHandler(newExchange);
diff --git a/core/camel-core/src/main/docs/eips/multicast-eip.adoc b/core/camel-core/src/main/docs/eips/multicast-eip.adoc
index d08b278..2d699e7 100644
--- a/core/camel-core/src/main/docs/eips/multicast-eip.adoc
+++ b/core/camel-core/src/main/docs/eips/multicast-eip.adoc
@@ -59,6 +59,11 @@ from("direct:start")
   .to("mock:result");
 ----
 
+NOTE: The Multicast, Recipient List, and Splitter EIPs have special support for using `AggregationStrategy` with
+access to the original input exchange. You may want to use this when you aggregate messages and
+there has been a failure in one of the messages, which you then want to enrich on the original
+input message and return as response; its the aggregate method with 3 exchange parameters.
+
 == Stop processing in case of exception
 
 The mutlicast EIP will by default continue to process
diff --git a/core/camel-core/src/main/docs/eips/recipientList-eip.adoc b/core/camel-core/src/main/docs/eips/recipientList-eip.adoc
index 19203fe..da32e34 100644
--- a/core/camel-core/src/main/docs/eips/recipientList-eip.adoc
+++ b/core/camel-core/src/main/docs/eips/recipientList-eip.adoc
@@ -211,8 +211,12 @@ And in XML it is again an attribute on the recipient list tag.
 <bean id="myStrategy" class="com.mycompany.MyOwnAggregationStrategy"/>
 ----
 
+NOTE: The Multicast, Recipient List, and Splitter EIPs have special support for using `AggregationStrategy` with
+access to the original input exchange. You may want to use this when you aggregate messages and
+there has been a failure in one of the messages, which you then want to enrich on the original
+input message and return as response; its the aggregate method with 3 exchange parameters.
+
 == Knowing which endpoint when using custom AggregationStrategy
-*Available as of Camel 2.12*
 
 When using a custom `AggregationStrategy` then the `aggregate` method is always invoked in sequential order
 (also if parallel processing is enabled) of the endpoints the Recipient List is using.
@@ -315,23 +319,7 @@ from("direct:c").to("mock:C").setBody(constant("C"));
 This timeout feature is also supported by Splitter and both multicast and recipientList.
 ===
 
-By default if a timeout occurs the `AggregationStrategy` is not invoked. However you can implement a special version `TimeoutAwareAggregationStrategy`
-
-[source,java]
-----
-public interface TimeoutAwareAggregationStrategy extends AggregationStrategy {
-
-    /**
-     * A timeout occurred
-     *
-     * @param oldExchange  the oldest exchange (is <tt>null</tt> on first aggregation as we only have the new exchange)
-     * @param index        the index
-     * @param total        the total
-     * @param timeout      the timeout value in millis
-     */
-    void timeout(Exchange oldExchange, int index, int total, long timeout);
-----
-
+By default if a timeout occurs the `AggregationStrategy` is not invoked. However you can implement the `timeout` method:
 This allows you to deal with the timeout in the `AggregationStrategy` if you really need to.
 
 [NOTE]
@@ -345,7 +333,6 @@ The remainders will be cancelled. Camel will also only invoke the `timeout` meth
 See details at the Multicast EIP
 
 == Using ExchangePattern in recipients
-*Available as of Camel 2.15*
 
 The recipient list will by default use the current Exchange Pattern. Though one can imagine use-cases where one wants to send
 a message to a recipient using a different exchange pattern. For example you may have a route that initiates as an `InOnly` route,
diff --git a/core/camel-core/src/main/docs/eips/split-eip.adoc b/core/camel-core/src/main/docs/eips/split-eip.adoc
index 6109aef..f2ea37b 100644
--- a/core/camel-core/src/main/docs/eips/split-eip.adoc
+++ b/core/camel-core/src/main/docs/eips/split-eip.adoc
@@ -136,6 +136,12 @@ There is a sample on this page (Split aggregate request/reply sample).
 Notice its the same strategy as the Aggregate EIP supports.
 This Splitter can be viewed as having a build in light weight Aggregate EIP.
 
+NOTE: The Multicast, Recipient List, and Splitter EIPs have special support for using `AggregationStrategy` with
+access to the original input exchange. You may want to use this when you aggregate messages and
+there has been a failure in one of the messages, which you then want to enrich on the original
+input message and return as response; its the aggregate method with 3 exchange parameters.
+
+
 == Parallel execution of distinct parts
 
 If you want to execute all parts in parallel you can use the `parallelProcessing` option as show:
@@ -183,8 +189,6 @@ from("direct:streaming")
 
 There are two tokenizers that can be used to tokenize an XML payload. The first tokenizer uses the same principle as in the text tokenizer to scan the XML payload and extract a sequence of tokens.
 
-*Available as of Camel 2.9*
-
 If you have a big XML payload, from a file source, and want to split it in streaming mode, then you can use the Tokenizer language with start/end tokens to do this with low memory footprint.
 
 [NOTE]
@@ -258,14 +262,10 @@ from("file:inbox")
      .to("activemq:queue:order");
 ----
 
-Available as of Camel 2.13.1, you can set the above `inheritNamsepaceTagName` property to `*` to include the preceding context in each token (i.e., generating each token enclosed in its ancestor elements). It is noted that each token must share the same ancestor elements in this case.
-
+You can set the above `inheritNamsepaceTagName` property to `*` to include the preceding context in each token (i.e., generating each token enclosed in its ancestor elements). It is noted that each token must share the same ancestor elements in this case.
 The above tokenizer works well on simple structures but has some inherent limitations in handling more complex XML structures.
 
-*Available as of Camel 2.14*
-
 The second tokenizer uses a StAX parser to overcome these limitations. This tokenizer recognizes XML namespaces and also handles simple and complex XML structures more naturally and efficiently.
-
 To split using this tokenizer at {urn:shop}order, we can write
 
 [source,java]
@@ -344,7 +344,6 @@ it results in invalid xml snippets after the split. For example the snippet coul
 ----
 
 == Splitting files by grouping N lines together
-*Available as of Camel 2.10*
 
 The Tokenizer language has a new option group that allows you to group N parts together, for example to split big files into chunks of 1000 lines.
 
@@ -610,12 +609,10 @@ And using XML DSL you specify it as follows:
 ----
 
 == Using onPrepare to execute custom logic when preparing messages
-*Available as of Camel 2.8*
 
 See details at Multicast EIP
 
 == Sharing unit of work
-*Available as of Camel 2.8*
 
 The Splitter will by default not share unit of work between the parent exchange and each split exchange.
 This means each sub exchange has its own individual unit of work.
diff --git a/core/camel-core/src/main/java/org/apache/camel/builder/AggregationStrategyClause.java b/core/camel-core/src/main/java/org/apache/camel/builder/AggregationStrategyClause.java
index 480c6f2..82a8c68 100644
--- a/core/camel-core/src/main/java/org/apache/camel/builder/AggregationStrategyClause.java
+++ b/core/camel-core/src/main/java/org/apache/camel/builder/AggregationStrategyClause.java
@@ -37,6 +37,11 @@ public class AggregationStrategyClause<T> implements AggregationStrategy {
         return ObjectHelper.notNull(strategy, "AggregationStrategy").aggregate(oldExchange, newExchange);
     }
 
+    @Override
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange, Exchange inputExchange) {
+        return ObjectHelper.notNull(strategy, "AggregationStrategy").aggregate(oldExchange, newExchange, inputExchange);
+    }
+
     // *******************************
     // Exchange
     // *******************************
diff --git a/core/camel-core/src/test/java/org/apache/camel/issues/MulticastAggregationStrategyInputExchangeTest.java b/core/camel-core/src/test/java/org/apache/camel/issues/MulticastAggregationStrategyInputExchangeTest.java
new file mode 100644
index 0000000..979a72d
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/issues/MulticastAggregationStrategyInputExchangeTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class MulticastAggregationStrategyInputExchangeTest extends ContextTestSupport {
+
+    @Test
+    public void testInputExchange() throws Exception {
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(0);
+
+        Exchange out = template.request("direct:start", p -> p.getMessage().setBody("Hello World"));
+        assertNotNull(out);
+        assertEquals("Hello World", out.getMessage().getBody());
+        assertEquals("Forced", out.getMessage().getHeader("FailedDue"));
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").multicast(new MyAggregateBean())
+                    .to("direct:a")
+                    .to("direct:b")
+                .end();
+
+                from("direct:a")
+                        .setHeader("foo", constant("123"))
+                        .transform(constant("A"))
+                        .to("mock:a");
+                from("direct:b")
+                        .setHeader("bar", constant("456"))
+                        .transform(constant("B"))
+                        .throwException(new IllegalArgumentException("Forced"))
+                        .to("mock:b");
+            }
+        };
+    }
+
+    public static class MyAggregateBean implements AggregationStrategy {
+
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            // NOT in use
+            return null;
+        }
+
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange, Exchange inputExchange) {
+            if (newExchange.isFailed()) {
+                inputExchange.getMessage().setHeader("FailedDue", newExchange.getException().getMessage());
+                return inputExchange;
+            }
+            // dont care so much about merging in this unit test
+            return newExchange;
+        }
+    }
+
+}
diff --git a/core/camel-core/src/test/java/org/apache/camel/issues/RecipientListAggregationStrategyInputExchangeTest.java b/core/camel-core/src/test/java/org/apache/camel/issues/RecipientListAggregationStrategyInputExchangeTest.java
new file mode 100644
index 0000000..5386ab7
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/issues/RecipientListAggregationStrategyInputExchangeTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class RecipientListAggregationStrategyInputExchangeTest extends ContextTestSupport {
+
+    @Test
+    public void testInputExchange() throws Exception {
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(0);
+
+        Exchange out = template.request("direct:start", p -> p.getMessage().setBody("Hello World"));
+        assertNotNull(out);
+        assertEquals("Hello World", out.getMessage().getBody());
+        assertEquals("Forced", out.getMessage().getHeader("FailedDue"));
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .recipientList(constant("direct:a,direct:b")).aggregationStrategy(new MyAggregateBean());
+
+                from("direct:a")
+                        .setHeader("foo", constant("123"))
+                        .transform(constant("A"))
+                        .to("mock:a");
+                from("direct:b")
+                        .setHeader("bar", constant("456"))
+                        .transform(constant("B"))
+                        .throwException(new IllegalArgumentException("Forced"))
+                        .to("mock:b");
+            }
+        };
+    }
+
+    public static class MyAggregateBean implements AggregationStrategy {
+
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            // NOT in use
+            return null;
+        }
+
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange, Exchange inputExchange) {
+            if (newExchange.isFailed()) {
+                inputExchange.getMessage().setHeader("FailedDue", newExchange.getException().getMessage());
+                return inputExchange;
+            }
+            // dont care so much about merging in this unit test
+            return newExchange;
+        }
+    }
+
+}
diff --git a/core/camel-core/src/test/java/org/apache/camel/issues/SplitterAggregationStrategyInputExchangeTest.java b/core/camel-core/src/test/java/org/apache/camel/issues/SplitterAggregationStrategyInputExchangeTest.java
new file mode 100644
index 0000000..f6831ea
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/issues/SplitterAggregationStrategyInputExchangeTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class SplitterAggregationStrategyInputExchangeTest extends ContextTestSupport {
+
+    @Test
+    public void testInputExchange() throws Exception {
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(0);
+
+        Exchange out = template.request("direct:start", p -> p.getMessage().setBody("A,B"));
+        assertNotNull(out);
+        assertEquals("A,B", out.getMessage().getBody());
+        assertEquals("Forced", out.getMessage().getHeader("FailedDue"));
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .split(body(), new MyAggregateBean())
+                        .choice()
+                            .when(body().contains("A"))
+                                .to("direct:a")
+                            .otherwise()
+                                .to("direct:b")
+                        .end()
+                    .end();
+
+                from("direct:a")
+                        .setHeader("foo", constant("123"))
+                        .transform(constant("A"))
+                        .to("mock:a");
+                from("direct:b")
+                        .setHeader("bar", constant("456"))
+                        .transform(constant("B"))
+                        .throwException(new IllegalArgumentException("Forced"))
+                        .to("mock:b");
+            }
+        };
+    }
+
+    public static class MyAggregateBean implements AggregationStrategy {
+
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            // NOT in use
+            return null;
+        }
+
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange, Exchange inputExchange) {
+            if (newExchange.isFailed()) {
+                inputExchange.getMessage().setHeader("FailedDue", newExchange.getException().getMessage());
+                return inputExchange;
+            }
+            // dont care so much about merging in this unit test
+            return newExchange;
+        }
+    }
+
+}