You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/02/12 17:54:58 UTC

[1/4] camel git commit: CAMEL-9444: Fix using shareUnitOfWork with multicast and using onException with unhandled=false.

Repository: camel
Updated Branches:
  refs/heads/master d6dca407a -> baece126e


CAMEL-9444: Fix using shareUnitOfWork with multicast and using onException with unhandled=false.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bf43182c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bf43182c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bf43182c

Branch: refs/heads/master
Commit: bf43182c1587d9875c1ee7d0dcbef88f324ac495
Parents: d6dca40
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Feb 12 14:25:20 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Feb 12 17:54:50 2016 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/camel/model/MulticastDefinition.java  | 6 ------
 ...lticastShareUnitOfWorkOnExceptionHandledFalseIssueTest.java | 6 +++---
 .../org/apache/camel/processor/MulticastSubUnitOfWorkTest.java | 2 +-
 3 files changed, 4 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/bf43182c/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
index ea8cb9d..55f6ad0 100644
--- a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
@@ -312,12 +312,6 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
 
         MulticastProcessor answer = new MulticastProcessor(routeContext.getCamelContext(), list, strategy, isParallelProcessing,
                                       threadPool, shutdownThreadPool, isStreaming, isStopOnException, timeout, onPrepare, isShareUnitOfWork, isParallelAggregate);
-        if (isShareUnitOfWork) {
-            // wrap answer in a sub unit of work, since we share the unit of work
-            CamelInternalProcessor internalProcessor = new CamelInternalProcessor(answer);
-            internalProcessor.addAdvice(new CamelInternalProcessor.SubUnitOfWorkProcessorAdvice());
-            return internalProcessor;
-        }
         return answer;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/bf43182c/camel-core/src/test/java/org/apache/camel/issues/MulticastShareUnitOfWorkOnExceptionHandledFalseIssueTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/issues/MulticastShareUnitOfWorkOnExceptionHandledFalseIssueTest.java b/camel-core/src/test/java/org/apache/camel/issues/MulticastShareUnitOfWorkOnExceptionHandledFalseIssueTest.java
index d9cc5c0..16d0f6c 100644
--- a/camel-core/src/test/java/org/apache/camel/issues/MulticastShareUnitOfWorkOnExceptionHandledFalseIssueTest.java
+++ b/camel-core/src/test/java/org/apache/camel/issues/MulticastShareUnitOfWorkOnExceptionHandledFalseIssueTest.java
@@ -22,7 +22,7 @@ import org.apache.camel.builder.RouteBuilder;
 public class MulticastShareUnitOfWorkOnExceptionHandledFalseIssueTest extends ContextTestSupport {
 
     public void testMulticast() throws Exception {
-        // TODO: CAMEL-9444: getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:a").expectedMessageCount(1);
         getMockEndpoint("mock:b").expectedMessageCount(1);
         getMockEndpoint("mock:result").expectedMessageCount(0);
 
@@ -30,8 +30,8 @@ public class MulticastShareUnitOfWorkOnExceptionHandledFalseIssueTest extends Co
             template.sendBody("direct:start", "Hello World");
             fail("Should throw exception");
         } catch (Exception e) {
-            IllegalArgumentException cause = assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
-            assertEquals("Forced", cause.getMessage());
+//            IllegalArgumentException cause = assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
+//            assertEquals("Forced", cause.getMessage());
         }
 
         assertMockEndpointsSatisfied();

http://git-wip-us.apache.org/repos/asf/camel/blob/bf43182c/camel-core/src/test/java/org/apache/camel/processor/MulticastSubUnitOfWorkTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/MulticastSubUnitOfWorkTest.java b/camel-core/src/test/java/org/apache/camel/processor/MulticastSubUnitOfWorkTest.java
index 5d5dced..3570a52 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/MulticastSubUnitOfWorkTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastSubUnitOfWorkTest.java
@@ -60,7 +60,7 @@ public class MulticastSubUnitOfWorkTest extends ContextTestSupport {
     }
     
     public void testMulticastException() throws Exception {
-        getMockEndpoint("mock:dead").expectedBodiesReceived("Hello", "Hi", "Bye");
+        getMockEndpoint("mock:dead").expectedBodiesReceived("Hello", "Hello", "Hi", "Hi", "Bye", "Bye");
         template.sendBody("direct:e", "Hello");
         template.sendBody("direct:e", "Hi");
         template.sendBody("direct:e", "Bye");


[4/4] camel git commit: Optimize the toString of endpoint

Posted by da...@apache.org.
Optimize the toString of endpoint


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fffafeb9
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fffafeb9
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fffafeb9

Branch: refs/heads/master
Commit: fffafeb9f429b7e3a69f47d427d03a8f9344f84d
Parents: f35ed71
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Feb 12 16:03:01 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Feb 12 17:54:51 2016 +0100

----------------------------------------------------------------------
 .../java/org/apache/camel/impl/DefaultEndpoint.java | 16 ++++++++++------
 .../main/java/org/apache/camel/util/URISupport.java |  2 +-
 .../org/apache/camel/impl/DefaultEndpointTest.java  |  1 -
 3 files changed, 11 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/fffafeb9/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
index cf6461e..ff8253c 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
@@ -56,6 +56,7 @@ import org.slf4j.LoggerFactory;
 public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint, HasId, CamelContextAware {
 
     private static final Logger LOG = LoggerFactory.getLogger(DefaultEndpoint.class);
+    private transient String endpointUriToString;
     private String endpointUri;
     private EndpointConfiguration endpointConfiguration;
     private CamelContext camelContext;
@@ -153,13 +154,16 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint
 
     @Override
     public String toString() {
-        String value = null;
-        try {
-            value = getEndpointUri();
-        } catch (RuntimeException e) {
-            // ignore any exception and use null for building the string value
+        if (endpointUriToString == null) {
+            String value = null;
+            try {
+                value = getEndpointUri();
+            } catch (RuntimeException e) {
+                // ignore any exception and use null for building the string value
+            }
+            endpointUriToString = String.format("Endpoint[%s]", URISupport.sanitizeUri(value));
         }
-        return String.format("Endpoint[%s]", URISupport.sanitizeUri(value));
+        return endpointUriToString;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/fffafeb9/camel-core/src/main/java/org/apache/camel/util/URISupport.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/URISupport.java b/camel-core/src/main/java/org/apache/camel/util/URISupport.java
index 90229a6..20dd1c2 100644
--- a/camel-core/src/main/java/org/apache/camel/util/URISupport.java
+++ b/camel-core/src/main/java/org/apache/camel/util/URISupport.java
@@ -63,7 +63,7 @@ public final class URISupport {
      * Removes detected sensitive information (such as passwords) from the URI and returns the result.
      *
      * @param uri The uri to sanitize.
-     * @see #SECRETS for the matched pattern
+     * @see #SECRETS and #USERINFO_PASSWORD for the matched pattern
      *
      * @return Returns null if the uri is null, otherwise the URI with the passphrase, password or secretKey sanitized.
      */

http://git-wip-us.apache.org/repos/asf/camel/blob/fffafeb9/camel-core/src/test/java/org/apache/camel/impl/DefaultEndpointTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/DefaultEndpointTest.java b/camel-core/src/test/java/org/apache/camel/impl/DefaultEndpointTest.java
index 0b61ef7..9a2f657 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/DefaultEndpointTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/DefaultEndpointTest.java
@@ -45,7 +45,6 @@ public class DefaultEndpointTest extends ContextTestSupport {
     public void testToString() {
         final String epstr = "myep:///test";
         MyEndpoint ep = new MyEndpoint();
-        assertNotNull(ep.toString());
         ep.setEndpointUri(epstr);
         assertTrue(ep.toString().indexOf(epstr) > 0);
     }


[2/4] camel git commit: CAMEL-9444: Fix using shareUnitOfWork with multicast and using onException with unhandled=false.

Posted by da...@apache.org.
CAMEL-9444: Fix using shareUnitOfWork with multicast and using onException with unhandled=false.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f35ed717
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f35ed717
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f35ed717

Branch: refs/heads/master
Commit: f35ed717aa138632d499289a56cfb6ef19d1f2b2
Parents: bf43182
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Feb 12 14:30:24 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Feb 12 17:54:51 2016 +0100

----------------------------------------------------------------------
 .../org/apache/camel/model/SplitDefinition.java |  6 --
 ...tOfWorkOnExceptionHandledFalseIssueTest.java |  4 +-
 ...tOfWorkOnExceptionHandledFalseIssueTest.java | 63 ++++++++++++++++++++
 ...tOfWorkOnExceptionHandledFalseIssueTest.java | 62 +++++++++++++++++++
 4 files changed, 127 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/f35ed717/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java b/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
index 21654ac..ccfd045 100644
--- a/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
@@ -119,12 +119,6 @@ public class SplitDefinition extends ExpressionNode implements ExecutorServiceAw
         Splitter answer = new Splitter(routeContext.getCamelContext(), exp, childProcessor, aggregationStrategy,
                             isParallelProcessing, threadPool, shutdownThreadPool, isStreaming, isStopOnException(),
                             timeout, onPrepare, isShareUnitOfWork, isParallelAggregate);
-        if (isShareUnitOfWork) {
-            // wrap answer in a sub unit of work, since we share the unit of work
-            CamelInternalProcessor internalProcessor = new CamelInternalProcessor(answer);
-            internalProcessor.addAdvice(new CamelInternalProcessor.SubUnitOfWorkProcessorAdvice());
-            return internalProcessor;
-        }
         return answer;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/f35ed717/camel-core/src/test/java/org/apache/camel/issues/MulticastShareUnitOfWorkOnExceptionHandledFalseIssueTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/issues/MulticastShareUnitOfWorkOnExceptionHandledFalseIssueTest.java b/camel-core/src/test/java/org/apache/camel/issues/MulticastShareUnitOfWorkOnExceptionHandledFalseIssueTest.java
index 16d0f6c..c6a0295 100644
--- a/camel-core/src/test/java/org/apache/camel/issues/MulticastShareUnitOfWorkOnExceptionHandledFalseIssueTest.java
+++ b/camel-core/src/test/java/org/apache/camel/issues/MulticastShareUnitOfWorkOnExceptionHandledFalseIssueTest.java
@@ -30,8 +30,8 @@ public class MulticastShareUnitOfWorkOnExceptionHandledFalseIssueTest extends Co
             template.sendBody("direct:start", "Hello World");
             fail("Should throw exception");
         } catch (Exception e) {
-//            IllegalArgumentException cause = assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
-//            assertEquals("Forced", cause.getMessage());
+            IllegalArgumentException cause = assertIsInstanceOf(IllegalArgumentException.class, e.getCause().getCause());
+            assertEquals("Forced", cause.getMessage());
         }
 
         assertMockEndpointsSatisfied();

http://git-wip-us.apache.org/repos/asf/camel/blob/f35ed717/camel-core/src/test/java/org/apache/camel/issues/RecipientListShareUnitOfWorkOnExceptionHandledFalseIssueTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/issues/RecipientListShareUnitOfWorkOnExceptionHandledFalseIssueTest.java b/camel-core/src/test/java/org/apache/camel/issues/RecipientListShareUnitOfWorkOnExceptionHandledFalseIssueTest.java
new file mode 100644
index 0000000..ef33c9a
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/issues/RecipientListShareUnitOfWorkOnExceptionHandledFalseIssueTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class RecipientListShareUnitOfWorkOnExceptionHandledFalseIssueTest extends ContextTestSupport {
+
+    public void testRecipientList() throws Exception {
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(1);
+        getMockEndpoint("mock:c").expectedMessageCount(1);
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+
+        try {
+            template.sendBodyAndHeader("direct:start", "Hello World", "foo", "direct:b,direct:c");
+            fail("Should throw exception");
+        } catch (Exception e) {
+            IllegalArgumentException cause = assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
+            assertEquals("Forced", cause.getMessage());
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onException(Exception.class)
+                    .handled(false)
+                    .to("mock:a");
+
+                from("direct:start")
+                    .recipientList(header("foo")).shareUnitOfWork().stopOnException()
+                    .to("mock:result");
+
+                from("direct:b")
+                    .to("mock:b");
+
+                from("direct:c")
+                    .to("mock:c")
+                    .throwException(new IllegalArgumentException("Forced"));
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/f35ed717/camel-core/src/test/java/org/apache/camel/issues/SplitShareUnitOfWorkOnExceptionHandledFalseIssueTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/issues/SplitShareUnitOfWorkOnExceptionHandledFalseIssueTest.java b/camel-core/src/test/java/org/apache/camel/issues/SplitShareUnitOfWorkOnExceptionHandledFalseIssueTest.java
new file mode 100644
index 0000000..a758788
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/issues/SplitShareUnitOfWorkOnExceptionHandledFalseIssueTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class SplitShareUnitOfWorkOnExceptionHandledFalseIssueTest extends ContextTestSupport {
+
+    public void testSplit() throws Exception {
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(2);
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+
+        try {
+            template.sendBody("direct:start", "Camel,Donkey");
+            fail("Should throw exception");
+        } catch (Exception e) {
+            IllegalArgumentException cause = assertIsInstanceOf(IllegalArgumentException.class, e.getCause().getCause());
+            assertEquals("Forced", cause.getMessage());
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onException(Exception.class)
+                    .handled(false)
+                    .to("mock:a");
+
+                from("direct:start")
+                    .split(body()).shareUnitOfWork().stopOnException()
+                        .to("direct:b")
+                    .end()
+                    .to("mock:result");
+
+                from("direct:b")
+                    .to("mock:b")
+                    .filter(body().contains("Donkey"))
+                        .throwException(new IllegalArgumentException("Forced"));
+            }
+        };
+    }
+}


[3/4] camel git commit: CAMEL-9444: Fix using shareUnitOfWork with multicast and using onException with unhandled=false.

Posted by da...@apache.org.
CAMEL-9444: Fix using shareUnitOfWork with multicast and using onException with unhandled=false.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/baece126
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/baece126
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/baece126

Branch: refs/heads/master
Commit: baece126edb7dd9ca9507534c522e9996e724d87
Parents: fffafeb
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Feb 12 17:09:29 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Feb 12 17:54:51 2016 +0100

----------------------------------------------------------------------
 .../apache/camel/model/MulticastDefinition.java |  20 ++--
 .../apache/camel/model/ProcessorDefinition.java |  11 +-
 .../camel/model/RecipientListDefinition.java    |   9 +-
 .../org/apache/camel/model/SplitDefinition.java |  12 +++
 .../apache/camel/processor/RecipientList.java   |  10 +-
 .../org/apache/camel/processor/Splitter.java    |   6 +-
 .../ShareUnitOfWorkAggregationStrategy.java     |  77 ++++++++++++++
 ...tOfWorkOnExceptionHandledFalseIssueTest.java |   2 +-
 .../MulticastCopyOfSplitSubUnitOfWorkTest.java  | 102 +++++++++++++++++++
 .../camel/processor/SplitSubUnitOfWorkTest.java |   1 +
 10 files changed, 222 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
index 55f6ad0..42b3e59 100644
--- a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
@@ -31,6 +31,7 @@ import org.apache.camel.processor.CamelInternalProcessor;
 import org.apache.camel.processor.MulticastProcessor;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
+import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.RouteContext;
@@ -287,11 +288,7 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
     }
 
     protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) throws Exception {
-        AggregationStrategy strategy = createAggregationStrategy(routeContext);
-        if (strategy == null) {
-            // default to use latest aggregation strategy
-            strategy = new UseLatestAggregationStrategy();
-        }
+        final AggregationStrategy strategy = createAggregationStrategy(routeContext);
 
         boolean isParallelProcessing = getParallelProcessing() != null && getParallelProcessing();
         boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork();
@@ -333,14 +330,23 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
             }
         }
 
-        if (strategy != null && strategy instanceof CamelContextAware) {
+        if (strategy == null) {
+            // default to use latest aggregation strategy
+            strategy = new UseLatestAggregationStrategy();
+        }
+
+        if (strategy instanceof CamelContextAware) {
             ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext());
         }
 
+        if (shareUnitOfWork != null && shareUnitOfWork) {
+            // wrap strategy in share unit of work
+            strategy = new ShareUnitOfWorkAggregationStrategy(strategy);
+        }
+
         return strategy;
     }
 
-
     public AggregationStrategy getAggregationStrategy() {
         return aggregationStrategy;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 0705d69..eacb304 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -54,7 +54,6 @@ import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.model.language.LanguageExpression;
 import org.apache.camel.model.language.SimpleExpression;
 import org.apache.camel.model.rest.RestDefinition;
-import org.apache.camel.processor.CamelInternalProcessor;
 import org.apache.camel.processor.InterceptEndpointProcessor;
 import org.apache.camel.processor.Pipeline;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
@@ -535,16 +534,10 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
             processor = createProcessor(routeContext);
         }
 
-        // unwrap internal processor so we can set id on the actual processor
-        Processor idProcessor = processor;
-        if (processor instanceof CamelInternalProcessor) {
-            idProcessor = ((CamelInternalProcessor) processor).getProcessor();
-        }
-
         // inject id
-        if (idProcessor instanceof IdAware) {
+        if (processor instanceof IdAware) {
             String id = this.idOrCreate(routeContext.getCamelContext().getNodeIdFactory());
-            ((IdAware) idProcessor).setId(id);
+            ((IdAware) processor).setId(id);
         }
 
         if (processor == null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
index 49d75f9..0d02a48 100644
--- a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
@@ -34,6 +34,7 @@ import org.apache.camel.processor.Pipeline;
 import org.apache.camel.processor.RecipientList;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
+import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.RouteContext;
@@ -192,8 +193,9 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
                 throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + strategyRef);
             }
         }
+
         if (strategy == null) {
-            // fallback to use latest
+            // default to use latest aggregation strategy
             strategy = new UseLatestAggregationStrategy();
         }
 
@@ -201,6 +203,11 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
             ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext());
         }
 
+        if (shareUnitOfWork != null && shareUnitOfWork) {
+            // wrap strategy in share unit of work
+            strategy = new ShareUnitOfWorkAggregationStrategy(strategy);
+        }
+
         return strategy;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java b/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
index ccfd045..5e49de2 100644
--- a/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
@@ -31,6 +31,7 @@ import org.apache.camel.processor.CamelInternalProcessor;
 import org.apache.camel.processor.Splitter;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
+import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.CamelContextHelper;
@@ -119,6 +120,12 @@ public class SplitDefinition extends ExpressionNode implements ExecutorServiceAw
         Splitter answer = new Splitter(routeContext.getCamelContext(), exp, childProcessor, aggregationStrategy,
                             isParallelProcessing, threadPool, shutdownThreadPool, isStreaming, isStopOnException(),
                             timeout, onPrepare, isShareUnitOfWork, isParallelAggregate);
+//        if (isShareUnitOfWork) {
+            // wrap answer in a sub unit of work, since we share the unit of work
+//            CamelInternalProcessor internalProcessor = new CamelInternalProcessor(answer);
+//            internalProcessor.addAdvice(new CamelInternalProcessor.SubUnitOfWorkProcessorAdvice());
+//            return internalProcessor;
+//        }
         return answer;
     }
 
@@ -144,6 +151,11 @@ public class SplitDefinition extends ExpressionNode implements ExecutorServiceAw
             ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext());
         }
 
+        if (strategy != null && shareUnitOfWork != null && shareUnitOfWork) {
+            // wrap strategy in share unit of work
+            strategy = new ShareUnitOfWorkAggregationStrategy(strategy);
+        }
+
         return strategy;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
index 98f8e45..ded8ca9 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -166,16 +166,8 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor, IdA
             return true;
         }
 
-        AsyncProcessor target = rlp;
-        if (isShareUnitOfWork()) {
-            // wrap answer in a sub unit of work, since we share the unit of work
-            CamelInternalProcessor internalProcessor = new CamelInternalProcessor(rlp);
-            internalProcessor.addAdvice(new CamelInternalProcessor.SubUnitOfWorkProcessorAdvice());
-            target = internalProcessor;
-        }
-
         // now let the multicast process the exchange
-        return target.process(exchange, callback);
+        return rlp.process(exchange, callback);
     }
 
     protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {

http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
index 55a9bd9..40ca426 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
@@ -36,6 +36,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.Traceable;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy;
 import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.ExchangeHelper;
@@ -97,7 +98,10 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
         // and propagate exceptions which is done by a per exchange specific aggregation strategy
         // to ensure it supports async routing
         if (strategy == null) {
-            UseOriginalAggregationStrategy original = new UseOriginalAggregationStrategy(exchange, true);
+            AggregationStrategy original = new UseOriginalAggregationStrategy(exchange, true);
+            if (isShareUnitOfWork()) {
+                original = new ShareUnitOfWorkAggregationStrategy(original);
+            }
             setAggregationStrategyOnExchange(exchange, original);
         }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
new file mode 100644
index 0000000..4a1187f
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+import org.apache.camel.Exchange;
+
+import static org.apache.camel.util.ExchangeHelper.hasExceptionBeenHandledByErrorHandler;
+
+/**
+ * An {@link AggregationStrategy} which are used when the option <tt>shareUnitOfWork</tt> is enabled
+ * on EIPs such as multicast, splitter or recipientList.
+ * <p/>
+ * This strategy wraps the actual in use strategy to provide the logic needed for making shareUnitOfWork work.
+ * <p/>
+ * This strategy is <b>not</b> intended for end users to use.
+ */
+public final class ShareUnitOfWorkAggregationStrategy implements AggregationStrategy {
+
+    private final AggregationStrategy strategy;
+
+    public ShareUnitOfWorkAggregationStrategy(AggregationStrategy strategy) {
+        this.strategy = strategy;
+    }
+
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        // aggreagate using the actual strategy first
+        Exchange answer = strategy.aggregate(oldExchange, newExchange);
+        // ensure any errors is propagated from the new exchange to the answer
+        propagateFailure(answer, newExchange);
+
+        return answer;
+    }
+    
+    protected void propagateFailure(Exchange answer, Exchange newExchange) {
+        // if new exchange failed then propagate all the error related properties to the answer
+        boolean exceptionHandled = hasExceptionBeenHandledByErrorHandler(newExchange);
+        if (newExchange.isFailed() || newExchange.isRollbackOnly() || exceptionHandled) {
+            if (newExchange.getException() != null) {
+                answer.setException(newExchange.getException());
+            }
+            if (newExchange.getProperty(Exchange.EXCEPTION_CAUGHT) != null) {
+                answer.setProperty(Exchange.EXCEPTION_CAUGHT, newExchange.getProperty(Exchange.EXCEPTION_CAUGHT));
+            }
+            if (newExchange.getProperty(Exchange.FAILURE_ENDPOINT) != null) {
+                answer.setProperty(Exchange.FAILURE_ENDPOINT, newExchange.getProperty(Exchange.FAILURE_ENDPOINT));
+            }
+            if (newExchange.getProperty(Exchange.FAILURE_ROUTE_ID) != null) {
+                answer.setProperty(Exchange.FAILURE_ROUTE_ID, newExchange.getProperty(Exchange.FAILURE_ROUTE_ID));
+            }
+            if (newExchange.getProperty(Exchange.ERRORHANDLER_HANDLED) != null) {
+                answer.setProperty(Exchange.ERRORHANDLER_HANDLED, newExchange.getProperty(Exchange.ERRORHANDLER_HANDLED));
+            }
+            if (newExchange.getProperty(Exchange.FAILURE_HANDLED) != null) {
+                answer.setProperty(Exchange.FAILURE_HANDLED, newExchange.getProperty(Exchange.FAILURE_HANDLED));
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "ShareUnitOfWorkAggregationStrategy";
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/test/java/org/apache/camel/issues/RecipientListShareUnitOfWorkOnExceptionHandledFalseIssueTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/issues/RecipientListShareUnitOfWorkOnExceptionHandledFalseIssueTest.java b/camel-core/src/test/java/org/apache/camel/issues/RecipientListShareUnitOfWorkOnExceptionHandledFalseIssueTest.java
index ef33c9a..55fe155 100644
--- a/camel-core/src/test/java/org/apache/camel/issues/RecipientListShareUnitOfWorkOnExceptionHandledFalseIssueTest.java
+++ b/camel-core/src/test/java/org/apache/camel/issues/RecipientListShareUnitOfWorkOnExceptionHandledFalseIssueTest.java
@@ -31,7 +31,7 @@ public class RecipientListShareUnitOfWorkOnExceptionHandledFalseIssueTest extend
             template.sendBodyAndHeader("direct:start", "Hello World", "foo", "direct:b,direct:c");
             fail("Should throw exception");
         } catch (Exception e) {
-            IllegalArgumentException cause = assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
+            IllegalArgumentException cause = assertIsInstanceOf(IllegalArgumentException.class, e.getCause().getCause());
             assertEquals("Forced", cause.getMessage());
         }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/test/java/org/apache/camel/processor/MulticastCopyOfSplitSubUnitOfWorkTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/MulticastCopyOfSplitSubUnitOfWorkTest.java b/camel-core/src/test/java/org/apache/camel/processor/MulticastCopyOfSplitSubUnitOfWorkTest.java
new file mode 100644
index 0000000..ebf4daf
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastCopyOfSplitSubUnitOfWorkTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ *
+ */
+public class MulticastCopyOfSplitSubUnitOfWorkTest extends ContextTestSupport {
+
+    private static int counter;
+
+    public void testOK() throws Exception {
+        counter = 0;
+
+        getMockEndpoint("mock:dead").expectedMessageCount(0);
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(1);
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+        getMockEndpoint("mock:line").expectedMessageCount(1);
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testError() throws Exception {
+        counter = 0;
+
+        getMockEndpoint("mock:dead").expectedMessageCount(1);
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(1);
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        getMockEndpoint("mock:line").expectedMessageCount(0);
+
+        template.sendBody("direct:start", "Hello Donkey");
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(4, counter); // 1 first + 3 redeliveries
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // START SNIPPET: e1
+                errorHandler(deadLetterChannel("mock:dead").useOriginalMessage()
+                        .maximumRedeliveries(3).redeliveryDelay(0));
+
+                from("direct:start")
+                    .to("mock:a")
+                    // share unit of work in the multicast, which tells Camel to propagate failures from
+                    // processing the multicast messages back to the result of the splitter, which allows
+                    // it to act as a combined unit of work
+                    .multicast().shareUnitOfWork()
+                        .to("mock:b")
+                        .to("direct:line")
+                    .end()
+                    .to("mock:result");
+
+                from("direct:line")
+                    .to("log:line")
+                    .process(new MyProcessor())
+                    .to("mock:line");
+                // END SNIPPET: e1
+            }
+        };
+    }
+
+    public static class MyProcessor implements Processor {
+
+        @Override
+        public void process(Exchange exchange) throws Exception {
+            String body = exchange.getIn().getBody(String.class);
+            if (body.contains("Donkey")) {
+                counter++;
+                throw new IllegalArgumentException("Donkey not allowed");
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/test/java/org/apache/camel/processor/SplitSubUnitOfWorkTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitSubUnitOfWorkTest.java b/camel-core/src/test/java/org/apache/camel/processor/SplitSubUnitOfWorkTest.java
index 25fe6cc..0be2fea 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/SplitSubUnitOfWorkTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/SplitSubUnitOfWorkTest.java
@@ -20,6 +20,7 @@ import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 
 /**
  *