You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gn...@apache.org on 2020/03/25 15:23:04 UTC

[camel] 06/11: Support for the splitter eip in lightweight mode

This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 7d70ad80c7dd81c872132e21e3f8dc581a10e74c
Author: Guillaume Nodet <gn...@gmail.com>
AuthorDate: Wed Mar 18 18:35:44 2020 +0100

    Support for the splitter eip in lightweight mode
---
 .../camel/impl/engine/SimpleCamelContext.java      |   2 +-
 .../apache/camel/processor/MulticastProcessor.java |  31 +-
 .../org/apache/camel/processor/RecipientList.java  |   2 +-
 .../camel/processor/RecipientListProcessor.java    |  12 +-
 .../java/org/apache/camel/processor/Splitter.java  |  16 +-
 .../impl/lw/RuntimeImmutableCamelContext.java      |   4 +-
 .../org/apache/camel/reifier/MulticastReifier.java |   2 +-
 .../org/apache/camel/reifier/SplitReifier.java     |   2 +-
 .../camel/reifier/language/ExpressionReifier.java  |   2 +
 .../test/java/org/apache/camel/TestSupport.java    |  10 +-
 .../apache/camel/builder/NotifyBuilderTest.java    |   2 +-
 .../camel/impl/lw/SplitterLightweightTest.java     | 320 +++++++++++++++++++++
 .../apache/camel/model/ChoiceDefinitionTest.java   |   2 +-
 13 files changed, 381 insertions(+), 26 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
index 04f71cd..3f959c7 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
@@ -372,7 +372,7 @@ public class SimpleCamelContext extends AbstractCamelContext {
 
     @Override
     public AsyncProcessor createMulticast(Collection<Processor> processors, ExecutorService executor, boolean shutdownExecutorService) {
-        return new MulticastProcessor(getCamelContextReference(), processors, null, true, executor, shutdownExecutorService, false, false, 0, null, false, false);
+        return new MulticastProcessor(getCamelContextReference(), null, processors, null, true, executor, shutdownExecutorService, false, false, 0, null, false, false);
     }
 
     @Override
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 3f8bf40..1a3c9de 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -57,12 +57,14 @@ import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorSupport;
+import org.apache.camel.support.DefaultExchange;
 import org.apache.camel.support.EventHelper;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.KeyValueHolder;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.StopWatch;
 import org.apache.camel.util.concurrent.AsyncCompletionService;
 import org.slf4j.Logger;
@@ -146,6 +148,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
 
     protected final Processor onPrepare;
     private final CamelContext camelContext;
+    private final Route route;
     private final ReactiveExecutor reactiveExecutor;
     private String id;
     private String routeId;
@@ -164,27 +167,28 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
     private final ConcurrentMap<ErrorHandlerKey, Processor> errorHandlers = new ConcurrentHashMap<>();
     private final boolean shareUnitOfWork;
 
-    public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors) {
-        this(camelContext, processors, null);
+    public MulticastProcessor(CamelContext camelContext, Route route, Collection<Processor> processors) {
+        this(camelContext, route, processors, null);
     }
 
-    public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy) {
-        this(camelContext, processors, aggregationStrategy, false, null, false, false, false, 0, null, false, false);
+    public MulticastProcessor(CamelContext camelContext, Route route, Collection<Processor> processors, AggregationStrategy aggregationStrategy) {
+        this(camelContext, route, processors, aggregationStrategy, false, null, false, false, false, 0, null, false, false);
     }
 
-    public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing,
+    public MulticastProcessor(CamelContext camelContext, Route route, Collection<Processor> processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing,
                               ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare,
                               boolean shareUnitOfWork, boolean parallelAggregate) {
-        this(camelContext, processors, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout, onPrepare,
+        this(camelContext, route, processors, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout, onPrepare,
              shareUnitOfWork, parallelAggregate, false);
     }
     
-    public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy,
+    public MulticastProcessor(CamelContext camelContext, Route route, Collection<Processor> processors, AggregationStrategy aggregationStrategy,
                               boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming,
                               boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork,
                               boolean parallelAggregate, boolean stopOnAggregateException) {
         notNull(camelContext, "camelContext");
         this.camelContext = camelContext;
+        this.route = route;
         this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor();
         this.processors = processors;
         this.aggregationStrategy = aggregationStrategy;
@@ -236,6 +240,16 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
     }
 
     @Override
+    protected void doInit() throws Exception {
+        if (route != null) {
+            Exchange exchange = new DefaultExchange(getCamelContext());
+            for (Processor processor : getProcessors()) {
+                createErrorHandler(route, exchange, processor);
+            }
+        }
+    }
+
+    @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         Iterable<ProcessorExchangePair> pairs;
         try {
@@ -710,6 +724,9 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
     protected Processor createErrorHandler(Route route, Exchange exchange, Processor processor) {
         Processor answer;
 
+        if (route != this.route && this.route != null) {
+            throw new UnsupportedOperationException("Is this really correct ?");
+        }
         boolean tryBlock = exchange.getProperty(Exchange.TRY_ROUTE_BLOCK, false, boolean.class);
 
         // do not wrap in error handler if we are inside a try block
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java
index 43d58c6..2f3802c 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -186,7 +186,7 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou
             iter = ObjectHelper.createIterator(recipientList, delimiter);
         }
 
-        RecipientListProcessor rlp = new RecipientListProcessor(exchange.getContext(), producerCache, iter, getAggregationStrategy(),
+        RecipientListProcessor rlp = new RecipientListProcessor(exchange.getContext(), null, producerCache, iter, getAggregationStrategy(),
                 isParallelProcessing(), getExecutorService(), isShutdownExecutorService(),
                 isStreaming(), isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork(), isParallelAggregate(),
                 isStopOnAggregateException());
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index c78f13d..8114541 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -153,22 +153,22 @@ public class RecipientListProcessor extends MulticastProcessor {
 
     // TODO: camel-bean @RecipientList cacheSize
 
-    public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<?> iter) {
-        super(camelContext, null);
+    public RecipientListProcessor(CamelContext camelContext, Route route, ProducerCache producerCache, Iterator<?> iter) {
+        super(camelContext, route, null);
         this.producerCache = producerCache;
         this.iter = iter;
     }
 
-    public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<?> iter, AggregationStrategy aggregationStrategy) {
-        super(camelContext, null, aggregationStrategy);
+    public RecipientListProcessor(CamelContext camelContext, Route route, ProducerCache producerCache, Iterator<?> iter, AggregationStrategy aggregationStrategy) {
+        super(camelContext, route, null, aggregationStrategy);
         this.producerCache = producerCache;
         this.iter = iter;
     }
 
-    public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<?> iter, AggregationStrategy aggregationStrategy,
+    public RecipientListProcessor(CamelContext camelContext, Route route, ProducerCache producerCache, Iterator<?> iter, AggregationStrategy aggregationStrategy,
                                   boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException,
                                   long timeout, Processor onPrepare, boolean shareUnitOfWork, boolean parallelAggregate, boolean stopOnAggregateException) {
-        super(camelContext, null, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout, onPrepare,
+        super(camelContext, route, null, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout, onPrepare,
               shareUnitOfWork, parallelAggregate, stopOnAggregateException);
         this.producerCache = producerCache;
         this.iter = iter;
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java b/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java
index f3463db..3618c71 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java
@@ -55,17 +55,17 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
 
     private final Expression expression;
 
-    public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy, boolean parallelProcessing,
+    public Splitter(CamelContext camelContext, Route route, Expression expression, Processor destination, AggregationStrategy aggregationStrategy, boolean parallelProcessing,
                     ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare,
                     boolean useSubUnitOfWork, boolean parallelAggregate) {
-        this(camelContext, expression, destination, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout,
-             onPrepare, useSubUnitOfWork, false, false);
+        this(camelContext, route, expression, destination, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout,
+             onPrepare, useSubUnitOfWork, parallelAggregate, false);
     }
 
-    public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy, boolean parallelProcessing,
+    public Splitter(CamelContext camelContext, Route route, Expression expression, Processor destination, AggregationStrategy aggregationStrategy, boolean parallelProcessing,
                     ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare,
                     boolean useSubUnitOfWork, boolean parallelAggregate, boolean stopOnAggregateException) {
-        super(camelContext, Collections.singleton(destination), aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException,
+        super(camelContext, route, Collections.singleton(destination), aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException,
               timeout, onPrepare, useSubUnitOfWork, parallelAggregate, stopOnAggregateException);
         this.expression = expression;
         notNull(expression, "expression");
@@ -78,6 +78,12 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
     }
 
     @Override
+    protected void doInit() throws Exception {
+        super.doInit();
+        expression.init(getCamelContext());
+    }
+
+    @Override
     public boolean process(Exchange exchange, final AsyncCallback callback) {
         AggregationStrategy strategy = getAggregationStrategy();
 
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/RuntimeImmutableCamelContext.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/RuntimeImmutableCamelContext.java
index e7ed47d..3de7132 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/RuntimeImmutableCamelContext.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/RuntimeImmutableCamelContext.java
@@ -176,6 +176,7 @@ public class RuntimeImmutableCamelContext implements ExtendedCamelContext, Catal
     private final List<Route> routes;
     private final boolean messageHistory;
     private final boolean allowUseOriginalMessage;
+    private final boolean logExhaustedMessageBody;
     private final String version;
     private Date startDate;
 
@@ -218,6 +219,7 @@ public class RuntimeImmutableCamelContext implements ExtendedCamelContext, Catal
         useMDCLogging = context.isUseMDCLogging();
         messageHistory = context.isMessageHistory();
         allowUseOriginalMessage = context.isAllowUseOriginalMessage();
+        logExhaustedMessageBody = context.isLogExhaustedMessageBody();
         version = context.getVersion();
     }
 
@@ -350,7 +352,7 @@ public class RuntimeImmutableCamelContext implements ExtendedCamelContext, Catal
 
     @Override
     public Boolean isLogExhaustedMessageBody() {
-        throw new UnsupportedOperationException();
+        return logExhaustedMessageBody;
     }
 
     @Override
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/MulticastReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/MulticastReifier.java
index 50b0d23..8d7a4d0 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/MulticastReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/MulticastReifier.java
@@ -73,7 +73,7 @@ public class MulticastReifier extends ProcessorReifier<MulticastDefinition> {
             definition.setOnPrepare(mandatoryLookup(definition.getOnPrepareRef(), Processor.class));
         }
 
-        MulticastProcessor answer = new MulticastProcessor(camelContext, list, strategy, isParallelProcessing, threadPool, shutdownThreadPool, isStreaming,
+        MulticastProcessor answer = new MulticastProcessor(camelContext, route, list, strategy, isParallelProcessing, threadPool, shutdownThreadPool, isStreaming,
                                                            isStopOnException, timeout, definition.getOnPrepare(), isShareUnitOfWork, isParallelAggregate,
                                                            isStopOnAggregateException);
         return answer;
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/SplitReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/SplitReifier.java
index a34a2e8..2191831 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/SplitReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/SplitReifier.java
@@ -59,7 +59,7 @@ public class SplitReifier extends ExpressionReifier<SplitDefinition> {
 
         Expression exp = createExpression(definition.getExpression());
 
-        Splitter answer = new Splitter(camelContext, exp, childProcessor, definition.getAggregationStrategy(), isParallelProcessing, threadPool,
+        Splitter answer = new Splitter(camelContext, route, exp, childProcessor, definition.getAggregationStrategy(), isParallelProcessing, threadPool,
                                        shutdownThreadPool, isStreaming, isStopOnException, timeout, definition.getOnPrepare(), isShareUnitOfWork, isParallelAggregate,
                                        isStopOnAggregateException);
         return answer;
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/language/ExpressionReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/language/ExpressionReifier.java
index 8831b6e..e0d420b 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/language/ExpressionReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/language/ExpressionReifier.java
@@ -136,6 +136,7 @@ public class ExpressionReifier<T extends ExpressionDefinition> extends AbstractR
         if (expression instanceof CamelContextAware) {
             ((CamelContextAware) expression).setCamelContext(camelContext);
         }
+        expression.init(camelContext);
         return expression;
     }
 
@@ -171,6 +172,7 @@ public class ExpressionReifier<T extends ExpressionDefinition> extends AbstractR
         if (predicate instanceof CamelContextAware) {
             ((CamelContextAware) predicate).setCamelContext(camelContext);
         }
+        predicate.init(camelContext);
         return predicate;
     }
 
diff --git a/core/camel-core/src/test/java/org/apache/camel/TestSupport.java b/core/camel-core/src/test/java/org/apache/camel/TestSupport.java
index f1eab1c..19a1bf0 100644
--- a/core/camel-core/src/test/java/org/apache/camel/TestSupport.java
+++ b/core/camel-core/src/test/java/org/apache/camel/TestSupport.java
@@ -100,7 +100,7 @@ public abstract class TestSupport extends Assert {
      * Returns a predicate and value builder for the inbound body on an exchange
      */
     public static ValueBuilder body() {
-        return Builder.body();
+        return init(Builder.body());
     }
 
     /**
@@ -570,4 +570,12 @@ public abstract class TestSupport extends Assert {
             }
         }
     }
+
+    private static ValueBuilder init(ValueBuilder builder) {
+        Expression exp = builder.getExpression();
+        if (exp != null) {
+            exp.init(new DefaultCamelContext());
+        }
+        return builder;
+    }
 }
diff --git a/core/camel-core/src/test/java/org/apache/camel/builder/NotifyBuilderTest.java b/core/camel-core/src/test/java/org/apache/camel/builder/NotifyBuilderTest.java
index 858e6b4..0922883 100644
--- a/core/camel-core/src/test/java/org/apache/camel/builder/NotifyBuilderTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/builder/NotifyBuilderTest.java
@@ -355,7 +355,7 @@ public class NotifyBuilderTest extends ContextTestSupport {
     public void testFilterWhenExchangeDone() throws Exception {
         NotifyBuilder notify = new NotifyBuilder(context).filter(body().contains("World")).whenDone(3).create();
 
-        assertEquals("filter(simple{${body}} contains World).whenDone(3)", notify.toString());
+        assertEquals("filter(${body} contains World).whenDone(3)", notify.toString());
 
         assertEquals(false, notify.matches());
 
diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/lw/SplitterLightweightTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/lw/SplitterLightweightTest.java
new file mode 100644
index 0000000..d458707
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/lw/SplitterLightweightTest.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.lw;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.camel.CamelException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.MyAggregationStrategy;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class SplitterLightweightTest extends ContextTestSupport {
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        setUseImmutableContext(true);
+        super.setUp();
+    }
+
+    @Test
+    public void testSendingAMessageUsingMulticastReceivesItsOwnExchange() throws Exception {
+        MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
+        resultEndpoint.expectedBodiesReceived("James", "Guillaume", "Hiram", "Rob");
+
+        // InOnly
+        template.send("direct:seqential", new Processor() {
+            public void process(Exchange exchange) {
+                Message in = exchange.getIn();
+                in.setBody("James,Guillaume,Hiram,Rob");
+                in.setHeader("foo", "bar");
+            }
+        });
+
+        assertMockEndpointsSatisfied();
+
+        Set<String> ids = new HashSet<>();
+        Set<String> ids2 = new HashSet<>();
+
+        List<Exchange> list = resultEndpoint.getReceivedExchanges();
+        for (int i = 0; i < 4; i++) {
+            Exchange exchange = list.get(i);
+            Message in = exchange.getIn();
+            ids.add(in.getMessageId());
+            ids2.add(exchange.getExchangeId());
+            assertNotNull("The in message should not be null.", in);
+            assertProperty(exchange, Exchange.SPLIT_INDEX, i);
+            assertProperty(exchange, Exchange.SPLIT_SIZE, 4);
+        }
+
+        assertEquals("The sub messages should have unique message ids", 4, ids.size());
+        assertEquals("The sub messages should have unique exchange ids", 4, ids2.size());
+    }
+
+    @Test
+    public void testSplitterWithAggregationStrategy() throws Exception {
+        MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
+        resultEndpoint.expectedBodiesReceived("James", "Guillaume", "Hiram", "Rob", "Roman");
+
+        Exchange result = template.request("direct:seqential", new Processor() {
+            public void process(Exchange exchange) {
+                Message in = exchange.getIn();
+                in.setBody("James,Guillaume,Hiram,Rob,Roman");
+                in.setHeader("foo", "bar");
+            }
+        });
+
+        assertMockEndpointsSatisfied();
+        Message out = result.getMessage();
+        assertEquals("Roman", out.getBody());
+        assertMessageHeader(out, "foo", "bar");
+        assertProperty(result, Exchange.SPLIT_INDEX, 4);
+    }
+
+    @Test
+    public void testEmptyBody() {
+        Exchange result = template.request("direct:seqential", new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader("foo", "bar");
+            }
+        });
+
+        assertFalse("Should not have out", result.hasOut());
+    }
+
+    @Test
+    public void testSendingAMessageUsingMulticastReceivesItsOwnExchangeParallel() throws Exception {
+        MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
+
+        resultEndpoint.expectsNoDuplicates(body());
+        resultEndpoint.expectedMessageCount(4);
+
+        // InOnly
+        template.send("direct:parallel", new Processor() {
+            public void process(Exchange exchange) {
+                Message in = exchange.getIn();
+                in.setBody("James,Guillaume,Hiram,Rob");
+                in.setHeader("foo", "bar");
+            }
+        });
+
+        assertMockEndpointsSatisfied();
+
+        List<Exchange> list = resultEndpoint.getReceivedExchanges();
+        Set<Integer> numbersFound = new TreeSet<>();
+        final String[] names = {"James", "Guillaume", "Hiram", "Rob"};
+
+        for (int i = 0; i < 4; i++) {
+            Exchange exchange = list.get(i);
+            Message in = exchange.getIn();
+            Integer splitCounter = exchange.getProperty(Exchange.SPLIT_INDEX, Integer.class);
+            numbersFound.add(splitCounter);
+            assertEquals(names[splitCounter], in.getBody());
+            assertProperty(exchange, Exchange.SPLIT_SIZE, 4);
+        }
+
+        assertEquals(4, numbersFound.size());
+    }
+
+    @Test
+    public void testSplitterWithAggregationStrategyParallel() throws Exception {
+        MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
+        resultEndpoint.expectedMessageCount(5);
+
+        Exchange result = template.request("direct:parallel", new Processor() {
+            public void process(Exchange exchange) {
+                Message in = exchange.getIn();
+                in.setBody("James,Guillaume,Hiram,Rob,Roman");
+                in.setHeader("foo", "bar");
+            }
+        });
+
+        assertMockEndpointsSatisfied();
+        Message out = result.getMessage();
+
+        assertMessageHeader(out, "foo", "bar");
+        assertEquals((Integer)5, result.getProperty("aggregated", Integer.class));
+    }
+
+    @Test
+    public void testSplitterWithAggregationStrategyParallelStreaming() throws Exception {
+        MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
+        resultEndpoint.expectedMessageCount(5);
+        resultEndpoint.expectedBodiesReceivedInAnyOrder("James", "Guillaume", "Hiram", "Rob", "Roman");
+
+        Exchange result = template.request("direct:parallel-streaming", new Processor() {
+            public void process(Exchange exchange) {
+                Message in = exchange.getIn();
+                in.setBody("James,Guillaume,Hiram,Rob,Roman");
+                in.setHeader("foo", "bar");
+            }
+        });
+
+        assertMockEndpointsSatisfied();
+        Message out = result.getMessage();
+
+        assertMessageHeader(out, "foo", "bar");
+        assertEquals((Integer)5, result.getProperty("aggregated", Integer.class));
+    }
+
+    @Test
+    public void testSplitterParallelAggregate() throws Exception {
+        MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
+        resultEndpoint.expectedMessageCount(5);
+        resultEndpoint.expectedBodiesReceivedInAnyOrder("James", "Guillaume", "Hiram", "Rob", "Roman");
+
+        Exchange result = template.request("direct:parallelAggregate", new Processor() {
+            public void process(Exchange exchange) {
+                Message in = exchange.getIn();
+                in.setBody("James,Guillaume,Hiram,Rob,Roman");
+                in.setHeader("foo", "bar");
+            }
+        });
+
+        assertMockEndpointsSatisfied();
+        Message out = result.getMessage();
+
+        assertMessageHeader(out, "foo", "bar");
+        // we aggregate parallel and therefore its not thread-safe when setting
+        // values
+    }
+
+    @Test
+    public void testSplitterWithStreamingAndFileBody() throws Exception {
+        URL url = this.getClass().getResource("/org/apache/camel/processor/simple.txt");
+        assertNotNull("We should find this simple file here.", url);
+        File file = new File(url.getFile());
+        sendToSplitterWithStreaming(file);
+    }
+
+    @Test
+    public void testSplitterWithStreamingAndStringBody() throws Exception {
+        sendToSplitterWithStreaming("James,Guillaume,Hiram,Rob,Roman");
+    }
+
+    public void sendToSplitterWithStreaming(final Object body) throws Exception {
+        MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
+        resultEndpoint.expectedMessageCount(5);
+        resultEndpoint.expectedHeaderReceived("foo", "bar");
+
+        template.request("direct:streaming", new Processor() {
+            public void process(Exchange exchange) {
+                Message in = exchange.getIn();
+                in.setBody(body);
+                in.setHeader("foo", "bar");
+            }
+        });
+
+        assertMockEndpointsSatisfied();
+
+        // check properties with split details is correct
+        int size = resultEndpoint.getReceivedExchanges().size();
+        for (int i = 0; i < size; i++) {
+            Exchange exchange = resultEndpoint.getReceivedExchanges().get(i);
+            assertEquals(i, exchange.getProperty(Exchange.SPLIT_INDEX));
+            if (i < (size - 1)) {
+                assertEquals(Boolean.FALSE, exchange.getProperty(Exchange.SPLIT_COMPLETE));
+                // this header cannot be set when streaming is used, except for
+                // the last exchange
+                assertNull(exchange.getProperty(Exchange.SPLIT_SIZE));
+            } else {
+                assertEquals(Boolean.TRUE, exchange.getProperty(Exchange.SPLIT_COMPLETE));
+                // when we are complete the size is set
+                assertEquals(size, exchange.getProperty(Exchange.SPLIT_SIZE));
+            }
+        }
+    }
+
+    @Test
+    public void testSplitterWithException() throws Exception {
+        MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
+        resultEndpoint.expectedMessageCount(4);
+        resultEndpoint.expectedHeaderReceived("foo", "bar");
+
+        MockEndpoint failedEndpoint = getMockEndpoint("mock:failed");
+        failedEndpoint.expectedMessageCount(1);
+        failedEndpoint.expectedHeaderReceived("foo", "bar");
+
+        Exchange result = template.request("direct:exception", new Processor() {
+            public void process(Exchange exchange) {
+                Message in = exchange.getIn();
+                in.setBody("James,Guillaume,Hiram,Rob,Exception");
+                in.setHeader("foo", "bar");
+            }
+        });
+
+        assertTrue("The result exchange should have a camel exception", result.getException() instanceof CamelException);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testSplitterWithIterable() throws Exception {
+        MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
+        resultEndpoint.expectedMessageCount(4);
+        resultEndpoint.expectedBodiesReceived("A", "B", "C", "D");
+        final List<String> data = Arrays.asList("A", "B", "C", "D");
+        Iterable<String> itb = new Iterable<String>() {
+            public Iterator<String> iterator() {
+                return data.iterator();
+            }
+        };
+        sendBody("direct:simple", itb);
+        resultEndpoint.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                onException(CamelException.class).to("mock:failed");
+
+                from("direct:seqential").split(body().tokenize(","), new UseLatestAggregationStrategy()).to("mock:result");
+                from("direct:parallel").split(body().tokenize(","), new MyAggregationStrategy()).parallelProcessing().to("mock:result");
+                from("direct:parallelAggregate").split(body().tokenize(","), new MyAggregationStrategy()).parallelProcessing().parallelAggregate().to("mock:result");
+                from("direct:streaming").split(body().tokenize(",")).streaming().to("mock:result");
+                from("direct:parallel-streaming").split(body().tokenize(","), new MyAggregationStrategy()).parallelProcessing().streaming().to("mock:result");
+                from("direct:exception").split(body().tokenize(",")).aggregationStrategy(new MyAggregationStrategy()).parallelProcessing().process(new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        String string = exchange.getIn().getBody(String.class);
+                        if ("Exception".equals(string)) {
+                            throw new CamelException("Just want to throw exception here");
+                        }
+
+                    }
+                }).to("mock:result");
+                from("direct:simple").split(body()).to("mock:result");
+            }
+        };
+    }
+}
diff --git a/core/camel-core/src/test/java/org/apache/camel/model/ChoiceDefinitionTest.java b/core/camel-core/src/test/java/org/apache/camel/model/ChoiceDefinitionTest.java
index 8df50e4..c5d738b 100644
--- a/core/camel-core/src/test/java/org/apache/camel/model/ChoiceDefinitionTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/model/ChoiceDefinitionTest.java
@@ -39,7 +39,7 @@ public class ChoiceDefinitionTest extends TestSupport {
         assertEquals(when1, choice.getOutputs().get(0));
         assertEquals(when2, choice.getOutputs().get(1));
         assertEquals(other, choice.getOutputs().get(2));
-        assertEquals("choice[when[{simple{${body}} contains Camel}],when[{simple{${body}} contains Donkey}],otherwise[]]", choice.getLabel());
+        assertEquals("choice[when[{${body} contains Camel}],when[{${body} contains Donkey}],otherwise[]]", choice.getLabel());
     }
 
     @Test