You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/08/12 16:39:59 UTC

git commit: CAMEL-5286: More flexible onCompletion allow to configure mode before/after consumer. And whether to use async/sync with thread pool or not.

Repository: camel
Updated Branches:
  refs/heads/master c475fb661 -> f4d9d3c33


CAMEL-5286: More flexible onCompletion allow to configure mode before/after consumer. And whether to use async/sync with thread pool or not.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f4d9d3c3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f4d9d3c3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f4d9d3c3

Branch: refs/heads/master
Commit: f4d9d3c3397dc91aaccc980c6afc753d3df28e09
Parents: c475fb6
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Aug 12 16:39:48 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Aug 12 16:39:48 2014 +0200

----------------------------------------------------------------------
 .../camel/model/OnCompletionDefinition.java     |  71 +++++++++-
 .../apache/camel/model/OnCompletionMode.java    |  28 ++++
 .../camel/processor/OnCompletionProcessor.java  | 135 +++++++++++++++----
 .../resources/org/apache/camel/model/jaxb.index |   1 +
 .../camel/processor/OnCompletionAsyncTest.java  |  10 +-
 .../camel/processor/OnCompletionModeTest.java   |  69 ++++++++++
 .../OnCompletionParallelProcessingTest.java     |  49 +++++++
 .../OnCompletionUseOriginalBodyTest.java        |   2 +-
 ...ompletionAndInterceptAndOnExceptionTest.java |   4 +-
 .../component/jms/JmsOnCompletionTest.java      |   2 +-
 .../scala/dsl/SOnCompletionDefinition.scala     |   8 +-
 .../camel/scala/dsl/SOnCompletionModeTest.scala |  48 +++++++
 .../dsl/SOnCompletionOnCompleteOnlyTest.scala   |   2 +-
 .../dsl/SOnCompletionOnFailureOnlyTest.scala    |   2 +-
 .../processor/SpringOnCompletionModeTest.java   |  33 +++++
 .../processor/SpringOnCompletionModeTest.xml    |  59 ++++++++
 .../SpringOnCompletionUseOriginalBodyTest.xml   |   2 +-
 17 files changed, 480 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java b/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
index 001c0fe..1e2bb7e 100644
--- a/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
@@ -46,12 +46,16 @@ import org.apache.camel.spi.RouteContext;
 @XmlAccessorType(XmlAccessType.FIELD)
 public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefinition> implements ExecutorServiceAwareDefinition<OnCompletionDefinition> {
     @XmlAttribute
+    private OnCompletionMode mode;
+    @XmlAttribute
     private Boolean onCompleteOnly;
     @XmlAttribute
     private Boolean onFailureOnly;
     @XmlElement(name = "onWhen")
     private WhenDefinition onWhen;
     @XmlAttribute
+    private Boolean parallelProcessing;
+    @XmlAttribute
     private String executorServiceRef;
     @XmlAttribute(name = "useOriginalMessage")
     private Boolean useOriginalMessagePolicy;
@@ -137,14 +141,16 @@ public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefi
             when = onWhen.getExpression().createPredicate(routeContext);
         }
 
-        // executor service is mandatory for on completion
-        boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true);
-        ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "OnCompletion", this, true);
+        boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing());
+        ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "OnCompletion", this, isParallelProcessing());
+
+        // should be after consumer by default
+        boolean afterConsumer = mode == null || mode == OnCompletionMode.AfterConsumer;
 
         // should be false by default
         boolean original = getUseOriginalMessagePolicy() != null ? getUseOriginalMessagePolicy() : false;
         OnCompletionProcessor answer = new OnCompletionProcessor(routeContext.getCamelContext(), internal,
-                threadPool, shutdownThreadPool, isOnCompleteOnly(), isOnFailureOnly(), when, original);
+                threadPool, shutdownThreadPool, isOnCompleteOnly(), isOnFailureOnly(), when, original, afterConsumer);
         return answer;
     }
 
@@ -173,6 +179,32 @@ public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefi
     }
 
     /**
+     * Sets the mode to be after route is done (default due backwards compatible).
+     * <p/>
+     * This executes the on completion work <i>after</i> the route consumer have written response
+     * back to the callee (if its InOut mode).
+     *
+     * @return the builder
+     */
+    public OnCompletionDefinition modeAfterConsumer() {
+        setMode(OnCompletionMode.AfterConsumer);
+        return this;
+    }
+
+    /**
+     * Sets the mode to be before consumer is done.
+     * <p/>
+     * This allows the on completion work to execute <i>before</i> the route consumer, writes any response
+     * back to the callee (if its InOut mode).
+     *
+     * @return the builder
+     */
+    public OnCompletionDefinition modeBeforeConsumer() {
+        setMode(OnCompletionMode.BeforeConsumer);
+        return this;
+    }
+
+    /**
      * Will only synchronize when the {@link org.apache.camel.Exchange} completed successfully (no errors).
      *
      * @return the builder
@@ -239,6 +271,16 @@ public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefi
         return this;
     }
 
+    /**
+     * Doing the on completion work in parallel
+     *
+     * @return the builder
+     */
+    public OnCompletionDefinition parallelProcessing() {
+        setParallelProcessing(true);
+        return this;
+    }
+
     public List<ProcessorDefinition<?>> getOutputs() {
         return outputs;
     }
@@ -251,6 +293,14 @@ public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefi
         return true;
     }
 
+    public OnCompletionMode getMode() {
+        return mode;
+    }
+
+    public void setMode(OnCompletionMode mode) {
+        this.mode = mode;
+    }
+
     public Boolean getOnCompleteOnly() {
         return onCompleteOnly;
     }
@@ -307,4 +357,17 @@ public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefi
         this.useOriginalMessagePolicy = useOriginalMessagePolicy;
     }
 
+    public Boolean getParallelProcessing() {
+        return parallelProcessing;
+    }
+
+    public void setParallelProcessing(Boolean parallelProcessing) {
+        this.parallelProcessing = parallelProcessing;
+    }
+
+    public boolean isParallelProcessing() {
+        return parallelProcessing != null && parallelProcessing;
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/camel-core/src/main/java/org/apache/camel/model/OnCompletionMode.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/OnCompletionMode.java b/camel-core/src/main/java/org/apache/camel/model/OnCompletionMode.java
new file mode 100644
index 0000000..1d6a800
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/model/OnCompletionMode.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model;
+
+import javax.xml.bind.annotation.XmlEnum;
+import javax.xml.bind.annotation.XmlType;
+
+@XmlType
+@XmlEnum(String.class)
+public enum OnCompletionMode {
+
+    AfterConsumer, BeforeConsumer
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
index 3604a5c..35b0a52 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
@@ -28,6 +28,7 @@ import org.apache.camel.Message;
 import org.apache.camel.Ordered;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
+import org.apache.camel.Route;
 import org.apache.camel.Traceable;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.support.SynchronizationAdapter;
@@ -53,9 +54,10 @@ public class OnCompletionProcessor extends ServiceSupport implements AsyncProces
     private final boolean onFailureOnly;
     private final Predicate onWhen;
     private final boolean useOriginalBody;
+    private final boolean afterConsumer;
 
     public OnCompletionProcessor(CamelContext camelContext, Processor processor, ExecutorService executorService, boolean shutdownExecutorService,
-                                 boolean onCompleteOnly, boolean onFailureOnly, Predicate onWhen, boolean useOriginalBody) {
+                                 boolean onCompleteOnly, boolean onFailureOnly, Predicate onWhen, boolean useOriginalBody, boolean afterConsumer) {
         notNull(camelContext, "camelContext");
         notNull(processor, "processor");
         this.camelContext = camelContext;
@@ -66,6 +68,7 @@ public class OnCompletionProcessor extends ServiceSupport implements AsyncProces
         this.onFailureOnly = onFailureOnly;
         this.onWhen = onWhen;
         this.useOriginalBody = useOriginalBody;
+        this.afterConsumer = afterConsumer;
     }
 
     @Override
@@ -97,13 +100,22 @@ public class OnCompletionProcessor extends ServiceSupport implements AsyncProces
     public boolean process(Exchange exchange, AsyncCallback callback) {
         if (processor != null) {
             // register callback
-            exchange.getUnitOfWork().addSynchronization(new OnCompletionSynchronization());
+            if (afterConsumer) {
+                exchange.getUnitOfWork().addSynchronization(new OnCompletionSynchronizationAfterConsumer());
+            } else {
+                exchange.getUnitOfWork().addSynchronization(new OnCompletionSynchronizationBeforeConsumer());
+            }
         }
 
         callback.done(true);
         return true;
     }
 
+    protected boolean isCreateCopy() {
+        // we need to create a correlated copy if we run in parallel mode
+        return executorService != null;
+    }
+
     /**
      * Processes the exchange by the processors
      *
@@ -127,17 +139,22 @@ public class OnCompletionProcessor extends ServiceSupport implements AsyncProces
     protected Exchange prepareExchange(Exchange exchange) {
         Exchange answer;
 
-        // for asynchronous routing we must use a copy as we dont want it
-        // to cause side effects of the original exchange
-        // (the original thread will run in parallel)
-        answer = ExchangeHelper.createCorrelatedCopy(exchange, false);
-        if (answer.hasOut()) {
-            // move OUT to IN (pipes and filters)
-            answer.setIn(answer.getOut());
-            answer.setOut(null);
+        if (isCreateCopy()) {
+            // for asynchronous routing we must use a copy as we dont want it
+            // to cause side effects of the original exchange
+            // (the original thread will run in parallel)
+            answer = ExchangeHelper.createCorrelatedCopy(exchange, false);
+            if (answer.hasOut()) {
+                // move OUT to IN (pipes and filters)
+                answer.setIn(answer.getOut());
+                answer.setOut(null);
+            }
+            // set MEP to InOnly as this wire tap is a fire and forget
+            answer.setPattern(ExchangePattern.InOnly);
+        } else {
+            // use the exchange as-is
+            answer = exchange;
         }
-        // set MEP to InOnly as this wire tap is a fire and forget
-        answer.setPattern(ExchangePattern.InOnly);
 
         if (useOriginalBody) {
             LOG.trace("Using the original IN message instead of current");
@@ -152,7 +169,7 @@ public class OnCompletionProcessor extends ServiceSupport implements AsyncProces
         return answer;
     }
 
-    private final class OnCompletionSynchronization extends SynchronizationAdapter implements Ordered {
+    private final class OnCompletionSynchronizationAfterConsumer extends SynchronizationAdapter implements Ordered {
 
         public int getOrder() {
             // we want to be last
@@ -173,13 +190,19 @@ public class OnCompletionProcessor extends ServiceSupport implements AsyncProces
             // must use a copy as we dont want it to cause side effects of the original exchange
             final Exchange copy = prepareExchange(exchange);
 
-            executorService.submit(new Callable<Exchange>() {
-                public Exchange call() throws Exception {
-                    LOG.debug("Processing onComplete: {}", copy);
-                    doProcess(processor, copy);
-                    return copy;
-                }
-            });
+            if (executorService != null) {
+                executorService.submit(new Callable<Exchange>() {
+                    public Exchange call() throws Exception {
+                        LOG.debug("Processing onComplete: {}", copy);
+                        doProcess(processor, copy);
+                        return copy;
+                    }
+                });
+            } else {
+                // run without thread-pool
+                LOG.debug("Processing onComplete: {}", copy);
+                doProcess(processor, copy);
+            }
         }
 
         public void onFailure(final Exchange exchange) {
@@ -192,19 +215,31 @@ public class OnCompletionProcessor extends ServiceSupport implements AsyncProces
                 return;
             }
 
+
             // must use a copy as we dont want it to cause side effects of the original exchange
             final Exchange copy = prepareExchange(exchange);
+            final Exception original = copy.getException();
             // must remove exception otherwise onFailure routing will fail as well
             // the caused exception is stored as a property (Exchange.EXCEPTION_CAUGHT) on the exchange
             copy.setException(null);
 
-            executorService.submit(new Callable<Exchange>() {
-                public Exchange call() throws Exception {
-                    LOG.debug("Processing onFailure: {}", copy);
-                    doProcess(processor, copy);
-                    return null;
-                }
-            });
+            if (executorService != null) {
+                executorService.submit(new Callable<Exchange>() {
+                    public Exchange call() throws Exception {
+                        LOG.debug("Processing onFailure: {}", copy);
+                        doProcess(processor, copy);
+                        // restore exception after processing
+                        copy.setException(original);
+                        return null;
+                    }
+                });
+            } else {
+                // run without thread-pool
+                LOG.debug("Processing onFailure: {}", copy);
+                doProcess(processor, copy);
+                // restore exception after processing
+                copy.setException(original);
+            }
         }
 
         @Override
@@ -219,6 +254,52 @@ public class OnCompletionProcessor extends ServiceSupport implements AsyncProces
         }
     }
 
+    private final class OnCompletionSynchronizationBeforeConsumer extends SynchronizationAdapter implements Ordered {
+
+        public int getOrder() {
+            // we want to be last
+            return Ordered.LOWEST;
+        }
+
+        @Override
+        public void onAfterRoute(Route route, Exchange exchange) {
+            if (exchange.isFailed() && onCompleteOnly) {
+                return;
+            }
+
+            if (!exchange.isFailed() && onFailureOnly) {
+                return;
+            }
+
+            if (onWhen != null && !onWhen.matches(exchange)) {
+                // predicate did not match so do not route the onComplete
+                return;
+            }
+
+            // must use a copy as we dont want it to cause side effects of the original exchange
+            final Exchange copy = prepareExchange(exchange);
+
+            if (executorService != null) {
+                executorService.submit(new Callable<Exchange>() {
+                    public Exchange call() throws Exception {
+                        LOG.debug("Processing onAfterRoute: {}", copy);
+                        doProcess(processor, copy);
+                        return copy;
+                    }
+                });
+            } else {
+                // run without thread-pool
+                LOG.debug("Processing onAfterRoute: {}", copy);
+                doProcess(processor, copy);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "onAfterRoute";
+        }
+    }
+
     @Override
     public String toString() {
         return "OnCompletionProcessor[" + processor + "]";

http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
----------------------------------------------------------------------
diff --git a/camel-core/src/main/resources/org/apache/camel/model/jaxb.index b/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
index f0d43ae..ea0d2b9 100644
--- a/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
+++ b/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
@@ -43,6 +43,7 @@ LoopDefinition
 MarshalDefinition
 MulticastDefinition
 OnCompletionDefinition
+OnCompletionMode
 OnExceptionDefinition
 OptimisticLockRetryPolicyDefinition
 OptionalIdentifiedDefinition

http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/camel-core/src/test/java/org/apache/camel/processor/OnCompletionAsyncTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/OnCompletionAsyncTest.java b/camel-core/src/test/java/org/apache/camel/processor/OnCompletionAsyncTest.java
index 5c4ef2d..386e402 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/OnCompletionAsyncTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/OnCompletionAsyncTest.java
@@ -37,7 +37,7 @@ public class OnCompletionAsyncTest extends ContextTestSupport {
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                onCompletion()
+                onCompletion().parallelProcessing()
                     .to("mock:before")
                     .delay(1000)
                     .setBody(simple("OnComplete:${body}"))
@@ -67,7 +67,7 @@ public class OnCompletionAsyncTest extends ContextTestSupport {
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                onCompletion()
+                onCompletion().parallelProcessing()
                     .to("mock:before")
                     .delay(1000)
                     .setBody(simple("OnComplete:${body}"))
@@ -101,7 +101,7 @@ public class OnCompletionAsyncTest extends ContextTestSupport {
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                onCompletion().useOriginalBody()
+                onCompletion().useOriginalBody().parallelProcessing()
                     .to("mock:before")
                     .delay(1000)
                     .setBody(simple("OnComplete:${body}"))
@@ -131,7 +131,7 @@ public class OnCompletionAsyncTest extends ContextTestSupport {
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                onCompletion().useOriginalBody()
+                onCompletion().useOriginalBody().parallelProcessing()
                     .to("mock:before")
                     .delay(1000)
                     .setBody(simple("OnComplete:${body}"))
@@ -166,7 +166,7 @@ public class OnCompletionAsyncTest extends ContextTestSupport {
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                onCompletion()
+                onCompletion().parallelProcessing()
                     .to("mock:before")
                     .delay(1000)
                     .setBody(simple("OnComplete:${body}"))

http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/camel-core/src/test/java/org/apache/camel/processor/OnCompletionModeTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/OnCompletionModeTest.java b/camel-core/src/test/java/org/apache/camel/processor/OnCompletionModeTest.java
new file mode 100644
index 0000000..e958ef2
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/OnCompletionModeTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class OnCompletionModeTest extends ContextTestSupport {
+
+    public void testOnCompletionScopeBefore() throws Exception {
+        getMockEndpoint("mock:input").expectedBodiesReceived("Camel");
+        getMockEndpoint("mock:after").expectedBodiesReceived("I was here Hello Camel");
+
+        String out = template.requestBody("seda:foo", "Camel", String.class);
+        assertEquals("I was here Hello Camel", out);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testOnCompletionScopeAfter() throws Exception {
+        getMockEndpoint("mock:input").expectedBodiesReceived("World");
+        getMockEndpoint("mock:after").expectedBodiesReceived("I was here Hello World");
+
+        String out = template.requestBody("seda:bar", "World", String.class);
+        assertEquals("Hello World", out);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:foo")
+                    // we do not want parallel as we want to change the message before the consumer writes the response
+                    .onCompletion().modeBeforeConsumer()
+                        .transform(body().prepend("I was here "))
+                        .to("mock:after")
+                    .end()
+                    .to("mock:input")
+                    .transform(body().prepend("Hello ")).to("log:foo");
+
+                from("seda:bar")
+                    // need to use parallel to make copy so we do not do side-effects
+                    .onCompletion().modeAfterConsumer().parallelProcessing()
+                        .transform(body().prepend("I was here "))
+                        .to("mock:after")
+                    .end()
+                    .to("mock:input")
+                    .transform(body().prepend("Hello ")).to("log:bar");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/camel-core/src/test/java/org/apache/camel/processor/OnCompletionParallelProcessingTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/OnCompletionParallelProcessingTest.java b/camel-core/src/test/java/org/apache/camel/processor/OnCompletionParallelProcessingTest.java
new file mode 100644
index 0000000..da99971
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/OnCompletionParallelProcessingTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class OnCompletionParallelProcessingTest extends ContextTestSupport {
+
+    public void testOnCompletionParallel() throws Exception {
+        getMockEndpoint("mock:input").expectedBodiesReceived("World");
+        getMockEndpoint("mock:after").expectedBodiesReceived("I was here Hello World");
+
+        String out = template.requestBody("seda:bar", "World", String.class);
+        assertEquals("Hello World", out);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:bar")
+                    .onCompletion().parallelProcessing()
+                        .transform(body().prepend("I was here "))
+                        .to("mock:after")
+                    .end()
+                    .to("mock:input")
+                    .transform(body().prepend("Hello ")).to("log:bar");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/camel-core/src/test/java/org/apache/camel/processor/OnCompletionUseOriginalBodyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/OnCompletionUseOriginalBodyTest.java b/camel-core/src/test/java/org/apache/camel/processor/OnCompletionUseOriginalBodyTest.java
index ade4fb5..dc6d31f 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/OnCompletionUseOriginalBodyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/OnCompletionUseOriginalBodyTest.java
@@ -46,7 +46,7 @@ public class OnCompletionUseOriginalBodyTest extends ContextTestSupport {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                onCompletion().useOriginalBody()
+                onCompletion().useOriginalBody().parallelProcessing()
                     .to("mock:before")
                     .delay(1000)
                     .setBody(simple("OnComplete:${body}"))

http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionAndInterceptAndOnExceptionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionAndInterceptAndOnExceptionTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionAndInterceptAndOnExceptionTest.java
index 30eb66d..67a3b45 100644
--- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionAndInterceptAndOnExceptionTest.java
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionAndInterceptAndOnExceptionTest.java
@@ -69,10 +69,10 @@ public class JmsOnCompletionAndInterceptAndOnExceptionTest extends CamelTestSupp
             public void configure() throws Exception {
                 intercept().to("mock:intercept");
 
-                // define a global on completion that is invoked when the exchage is complete
+                // define a global on completion that is invoked when the exchange is complete
                 onCompletion().to("log:global").to("mock:sync");
 
-                // define an on excpetion
+                // define an on exception
                 onException(Exception.class).to("mock:exception");
 
                 from("activemq:queue:start")

http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionTest.java
index 171be60..3d51861 100644
--- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionTest.java
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionTest.java
@@ -73,7 +73,7 @@ public class JmsOnCompletionTest extends CamelTestSupport {
                         .to("mock:sync")
                     // must use end to denote the end of the onCompletion route
                     .end()
-                    // here the original route contiues
+                    // here the original route continues
                     .process(new MyProcessor())
                     .to("mock:result");
                 // END SNIPPET: e1

http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SOnCompletionDefinition.scala
----------------------------------------------------------------------
diff --git a/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SOnCompletionDefinition.scala b/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SOnCompletionDefinition.scala
index 94659f4..9277819 100644
--- a/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SOnCompletionDefinition.scala
+++ b/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SOnCompletionDefinition.scala
@@ -32,8 +32,12 @@ case class SOnCompletionDefinition(override val target : OnCompletionDefinition)
   def onFailureOnly = wrap(target.onFailureOnly)
   def onCompleteOnly = wrap(target.onCompleteOnly)
   
-  def useOriginalBody = wrap(target.useOriginalBody())
-  
+  def useOriginalBody = wrap(target.useOriginalBody)
+
+  def modeBeforeConsumer = wrap(target.modeBeforeConsumer)
+  def modeAfterConsumer = wrap(target.modeAfterConsumer)
+
+  def parallelProcessing = wrap(target.parallelProcessing)
   def executorService(executorService: ExecutorService) = wrap(target.setExecutorService(executorService))
   def executorServiceRef(ref: String) = wrap(target.setExecutorServiceRef(ref))
 

http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionModeTest.scala
----------------------------------------------------------------------
diff --git a/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionModeTest.scala b/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionModeTest.scala
new file mode 100644
index 0000000..8e981bb
--- /dev/null
+++ b/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionModeTest.scala
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.scala.dsl
+
+import org.apache.camel.scala.dsl.builder.{RouteBuilderSupport, RouteBuilder}
+import org.apache.camel.processor.OnCompletionModeTest
+
+class SOnCompletionModeTest extends OnCompletionModeTest with RouteBuilderSupport {
+
+  override def createRouteBuilder = new RouteBuilder {
+
+    "seda:foo" ==> {
+      onCompletion.modeBeforeConsumer {
+        transform(simple("I was here ${body}"))
+        to("mock:after")
+      }
+      to("mock:input")
+      transform(simple("Hello ${body}"))
+      to("log:foo")
+    }
+
+    "seda:bar" ==> {
+      onCompletion.modeAfterConsumer {
+        transform(simple("I was here ${body}"))
+        to("mock:after")
+      }
+      to("mock:input")
+      transform(simple("Hello ${body}"))
+      to("log:bar")
+    }
+
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnCompleteOnlyTest.scala
----------------------------------------------------------------------
diff --git a/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnCompleteOnlyTest.scala b/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnCompleteOnlyTest.scala
index 837c997..354c34d 100644
--- a/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnCompleteOnlyTest.scala
+++ b/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnCompleteOnlyTest.scala
@@ -26,7 +26,7 @@ class SOnCompletionOnCompleteOnlyTest extends OnCompletionOnCompleteOnlyTest wit
   override def createRouteBuilder = new RouteBuilder {
 
     "direct:start" ==> {
-      onCompletion(completeOnly) {
+      onCompletion(completeOnly).parallelProcessing {
         to("mock:sync")
       }
       process(new MyProcessor())

http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnFailureOnlyTest.scala
----------------------------------------------------------------------
diff --git a/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnFailureOnlyTest.scala b/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnFailureOnlyTest.scala
index e4ce4cb..f645d54 100644
--- a/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnFailureOnlyTest.scala
+++ b/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnFailureOnlyTest.scala
@@ -26,7 +26,7 @@ class SOnCompletionOnFailureOnlyTest extends OnCompletionOnFailureOnlyTest with
   override def createRouteBuilder = new RouteBuilder {
 
     "direct:start" ==> {
-      onCompletion(failureOnly) {
+      onCompletion(failureOnly).parallelProcessing {
         to("mock:sync")
       }
       process(new MyProcessor())

http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringOnCompletionModeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringOnCompletionModeTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringOnCompletionModeTest.java
new file mode 100644
index 0000000..eee48ee
--- /dev/null
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringOnCompletionModeTest.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.processor;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.processor.OnCompletionModeTest;
+
+import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+
+/**
+ * @version 
+ */
+public class SpringOnCompletionModeTest extends OnCompletionModeTest {
+
+    protected CamelContext createCamelContext() throws Exception {
+        return createSpringCamelContext(this, "org/apache/camel/spring/processor/SpringOnCompletionModeTest.xml");
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionModeTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionModeTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionModeTest.xml
new file mode 100644
index 0000000..2bc1211
--- /dev/null
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionModeTest.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+    ">
+
+  <camelContext xmlns="http://camel.apache.org/schema/spring">
+
+    <route>
+      <from uri="seda:foo"/>
+      <onCompletion mode="BeforeConsumer">
+        <transform>
+          <simple>I was here ${body}</simple>
+        </transform>
+        <to uri="mock:after"/>
+      </onCompletion>
+      <to uri="mock:input"/>
+      <transform>
+        <simple>Hello ${body}</simple>
+      </transform>
+      <to uri="log:foo"/>
+    </route>
+
+    <route>
+      <from uri="seda:bar"/>
+      <onCompletion mode="AfterConsumer" parallelProcessing="true">
+        <transform>
+          <simple>I was here ${body}</simple>
+        </transform>
+        <to uri="mock:after"/>
+      </onCompletion>
+      <to uri="mock:input"/>
+      <transform>
+        <simple>Hello ${body}</simple>
+      </transform>
+      <to uri="log:bar"/>
+    </route>
+
+  </camelContext>
+
+</beans>

http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionUseOriginalBodyTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionUseOriginalBodyTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionUseOriginalBodyTest.xml
index f125623..83b3a42 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionUseOriginalBodyTest.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionUseOriginalBodyTest.xml
@@ -25,7 +25,7 @@
     <bean id="myProcessor" class="org.apache.camel.processor.OnCompletionUseOriginalBodyTest$MyProcessor"/>
 
     <camelContext xmlns="http://camel.apache.org/schema/spring">
-        <onCompletion useOriginalMessage="true">
+        <onCompletion useOriginalMessage="true" parallelProcessing="true">
             <to uri="mock:before"/>
             <delay>
                 <constant>1000</constant>