You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/01/08 13:42:15 UTC

[camel] branch camel-3.7.x updated: CAMEL-13553: Fix onCompletion EIP firing more than once.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.7.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.7.x by this push:
     new c654e00  CAMEL-13553: Fix onCompletion EIP firing more than once.
c654e00 is described below

commit c654e0014143d593d5dc2cc7ee9c9af1211eda70
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Jan 8 14:40:32 2021 +0100

    CAMEL-13553: Fix onCompletion EIP firing more than once.
---
 .../org/apache/camel/model/onCompletion.json       |   4 +-
 .../apache/camel/model/OnCompletionDefinition.java |   2 +-
 .../camel/processor/OnCompletionProcessor.java     |  41 +++-
 .../apache/camel/reifier/OnCompletionReifier.java  |   2 +-
 .../OnCompletionBeforeConsumerModeIssueTest.java   | 220 +++++++++++++++++++++
 .../modules/ROOT/pages/oncompletion.adoc           | 158 ++++++---------
 6 files changed, 322 insertions(+), 105 deletions(-)

diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/onCompletion.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/onCompletion.json
index 39a9a53..03a9add 100644
--- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/onCompletion.json
+++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/onCompletion.json
@@ -15,8 +15,8 @@
     "onCompleteOnly": { "kind": "attribute", "displayName": "On Complete Only", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will only synchronize when the org.apache.camel.Exchange completed successfully (no errors)." },
     "onFailureOnly": { "kind": "attribute", "displayName": "On Failure Only", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will only synchronize when the org.apache.camel.Exchange ended with failure (exception or FAULT message)." },
     "onWhen": { "kind": "element", "displayName": "On When", "required": false, "type": "object", "javaType": "org.apache.camel.model.WhenDefinition", "deprecated": false, "autowired": false, "secret": false, "asPredicate": true, "description": "Sets an additional predicate that should be true before the onCompletion is triggered. To be used for fine grained controlling whether a completion callback should be invoked or not" },
-    "parallelProcessing": { "kind": "attribute", "displayName": "Parallel Processing", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "If enabled then the on completion process will run asynchronously by a separate thread from a thread pool. By default this is false, meaning the on completion process will run synchronously using the same caller thread as from the route." },
-    "executorServiceRef": { "kind": "attribute", "displayName": "Executor Service Ref", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Refers to a custom Thread Pool to be used for parallel processing. Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well." },
+    "parallelProcessing": { "kind": "attribute", "displayName": "Parallel Processing", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then the on completion process will run asynchronously by a separate thread from a thread pool. By default this is false, meaning the on completion process will run synchronously using the same caller thread as from the route." },
+    "executorServiceRef": { "kind": "attribute", "displayName": "Executor Service Ref", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Refers to a custom Thread Pool to be used for parallel processing. Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well." },
     "useOriginalMessage": { "kind": "attribute", "displayName": "Use Original Message", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input message body when an org.apache.camel.Exchange for this on completion. By default this feature is off." },
     "id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" },
     "description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" }
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/OnCompletionDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
index 33e2346..3761be9 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
@@ -55,9 +55,9 @@ public class OnCompletionDefinition extends OutputDefinition<OnCompletionDefinit
     @AsPredicate
     private WhenDefinition onWhen;
     @XmlAttribute
+    @Metadata(javaType = "java.lang.Boolean")
     private String parallelProcessing;
     @XmlAttribute
-    @Metadata(javaType = "java.lang.Boolean")
     private String executorServiceRef;
     @XmlAttribute(name = "useOriginalMessage")
     @Metadata(javaType = "java.lang.Boolean")
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
index 4425edd..d6de01b 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
@@ -59,11 +59,12 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac
     private final Predicate onWhen;
     private final boolean useOriginalBody;
     private final boolean afterConsumer;
+    private final boolean routeScoped;
 
     public OnCompletionProcessor(CamelContext camelContext, Processor processor, ExecutorService executorService,
                                  boolean shutdownExecutorService,
                                  boolean onCompleteOnly, boolean onFailureOnly, Predicate onWhen, boolean useOriginalBody,
-                                 boolean afterConsumer) {
+                                 boolean afterConsumer, boolean routeScoped) {
         notNull(camelContext, "camelContext");
         notNull(processor, "processor");
         this.camelContext = camelContext;
@@ -75,6 +76,7 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac
         this.onWhen = onWhen;
         this.useOriginalBody = useOriginalBody;
         this.afterConsumer = afterConsumer;
+        this.routeScoped = routeScoped;
     }
 
     @Override
@@ -124,9 +126,11 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac
         if (processor != null) {
             // register callback
             if (afterConsumer) {
-                exchange.getUnitOfWork().addSynchronization(new OnCompletionSynchronizationAfterConsumer());
+                exchange.getUnitOfWork()
+                        .addSynchronization(new OnCompletionSynchronizationAfterConsumer(routeScoped, getRouteId()));
             } else {
-                exchange.getUnitOfWork().addSynchronization(new OnCompletionSynchronizationBeforeConsumer());
+                exchange.getUnitOfWork()
+                        .addSynchronization(new OnCompletionSynchronizationBeforeConsumer(routeScoped, getRouteId()));
             }
         }
 
@@ -231,6 +235,14 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac
 
     private final class OnCompletionSynchronizationAfterConsumer extends SynchronizationAdapter implements Ordered {
 
+        private final boolean routeScoped;
+        private final String routeId;
+
+        public OnCompletionSynchronizationAfterConsumer(boolean routeScoped, String routeId) {
+            this.routeScoped = routeScoped;
+            this.routeId = routeId;
+        }
+
         @Override
         public int getOrder() {
             // we want to be last
@@ -239,6 +251,11 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac
 
         @Override
         public void onComplete(final Exchange exchange) {
+            String currentRouteId = ExchangeHelper.getRouteId(exchange);
+            if (currentRouteId != null && !routeId.equals(currentRouteId)) {
+                return;
+            }
+
             if (onFailureOnly) {
                 return;
             }
@@ -319,6 +336,14 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac
 
     private final class OnCompletionSynchronizationBeforeConsumer extends SynchronizationAdapter implements Ordered {
 
+        private final boolean routeScoped;
+        private final String routeId;
+
+        public OnCompletionSynchronizationBeforeConsumer(boolean routeScoped, String routeId) {
+            this.routeScoped = routeScoped;
+            this.routeId = routeId;
+        }
+
         @Override
         public int getOrder() {
             // we want to be last
@@ -327,6 +352,16 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac
 
         @Override
         public void onAfterRoute(Route route, Exchange exchange) {
+            // route scope = should be from this route
+            if (routeScoped && !route.getRouteId().equals(routeId)) {
+                return;
+            }
+
+            // global scope = should be from the original route
+            if (!routeScoped && (!route.getRouteId().equals(routeId) || !exchange.getFromRouteId().equals(routeId))) {
+                return;
+            }
+
             if (exchange.isFailed() && onCompleteOnly) {
                 return;
             }
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
index 497bf4c..318591a 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
@@ -72,7 +72,7 @@ public class OnCompletionReifier extends ProcessorReifier<OnCompletionDefinition
 
         OnCompletionProcessor answer = new OnCompletionProcessor(
                 camelContext, target, threadPool, shutdownThreadPool, isOnCompleteOnly, isOnFailureOnly, when,
-                original, afterConsumer);
+                original, afterConsumer, definition.isRouteScoped());
         return answer;
     }
 
diff --git a/core/camel-core/src/test/java/org/apache/camel/issues/OnCompletionBeforeConsumerModeIssueTest.java b/core/camel-core/src/test/java/org/apache/camel/issues/OnCompletionBeforeConsumerModeIssueTest.java
new file mode 100644
index 0000000..a92b950
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/issues/OnCompletionBeforeConsumerModeIssueTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Test;
+
+public class OnCompletionBeforeConsumerModeIssueTest extends ContextTestSupport {
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    @Test
+    public void testOnCompletionTopMode() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .onCompletion().modeBeforeConsumer()
+                        .to("mock:end")
+                    .end()
+                    .transform(constant("a"))
+                    .to("mock:a")
+                    .to("direct:sub")
+                    .transform(constant("c"))
+                    .to("mock:c");
+
+                from("direct:sub")
+                        .transform(constant("b"))
+                        .to("mock:b");
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:a").expectedBodiesReceived("a");
+        getMockEndpoint("mock:b").expectedBodiesReceived("b");
+        getMockEndpoint("mock:c").expectedBodiesReceived("c");
+        getMockEndpoint("mock:end").expectedBodiesReceived("c");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testOnCompletionEndMode() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .transform(constant("a"))
+                    .to("mock:a")
+                    .to("direct:sub")
+                    .transform(constant("c"))
+                    .to("mock:c")
+                    .onCompletion().modeBeforeConsumer()
+                        .to("mock:end")
+                    .end();
+
+                from("direct:sub")
+                        .transform(constant("b"))
+                        .to("mock:b");
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:a").expectedBodiesReceived("a");
+        getMockEndpoint("mock:b").expectedBodiesReceived("b");
+        getMockEndpoint("mock:c").expectedBodiesReceived("c");
+        getMockEndpoint("mock:end").expectedBodiesReceived("c");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testOnCompletionTop() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .onCompletion()
+                        .to("mock:end")
+                    .end()
+                    .transform(constant("a"))
+                    .to("mock:a")
+                    .to("direct:sub")
+                    .transform(constant("c"))
+                    .to("mock:c");
+
+                from("direct:sub")
+                        .transform(constant("b"))
+                        .to("mock:b");
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:a").expectedBodiesReceived("a");
+        getMockEndpoint("mock:b").expectedBodiesReceived("b");
+        getMockEndpoint("mock:c").expectedBodiesReceived("c");
+        getMockEndpoint("mock:end").expectedBodiesReceived("c");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testOnCompletionEnd() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .transform(constant("a"))
+                    .to("mock:a")
+                    .to("direct:sub")
+                    .transform(constant("c"))
+                    .to("mock:c")
+                    .onCompletion()
+                        .to("mock:end")
+                    .end();
+
+                from("direct:sub")
+                        .transform(constant("b"))
+                        .to("mock:b");
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:a").expectedBodiesReceived("a");
+        getMockEndpoint("mock:b").expectedBodiesReceived("b");
+        getMockEndpoint("mock:c").expectedBodiesReceived("c");
+        getMockEndpoint("mock:end").expectedBodiesReceived("c");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testOnCompletionGlobalMode() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onCompletion().modeBeforeConsumer().to("mock:end");
+
+                from("direct:start")
+                        .transform(constant("a"))
+                        .to("mock:a")
+                        .to("direct:sub")
+                        .transform(constant("c"))
+                        .to("mock:c");
+
+                from("direct:sub")
+                        .transform(constant("b"))
+                        .to("mock:b");
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:a").expectedBodiesReceived("a");
+        getMockEndpoint("mock:b").expectedBodiesReceived("b");
+        getMockEndpoint("mock:c").expectedBodiesReceived("c");
+        getMockEndpoint("mock:end").expectedBodiesReceived("c");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testOnCompletionGlobal() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onCompletion().to("mock:end");
+
+                from("direct:start")
+                        .transform(constant("a"))
+                        .to("mock:a")
+                        .to("direct:sub")
+                        .transform(constant("c"))
+                        .to("mock:c");
+
+                from("direct:sub")
+                        .transform(constant("b"))
+                        .to("mock:b");
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:a").expectedBodiesReceived("a");
+        getMockEndpoint("mock:b").expectedBodiesReceived("b");
+        getMockEndpoint("mock:c").expectedBodiesReceived("c");
+        getMockEndpoint("mock:end").expectedBodiesReceived("c");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+}
diff --git a/docs/user-manual/modules/ROOT/pages/oncompletion.adoc b/docs/user-manual/modules/ROOT/pages/oncompletion.adoc
index 6f6638d..a7c733c 100644
--- a/docs/user-manual/modules/ROOT/pages/oncompletion.adoc
+++ b/docs/user-manual/modules/ROOT/pages/oncompletion.adoc
@@ -5,8 +5,7 @@ Camel has this concept of a Unit of Work that encompass the
 Exchange. The unit of work among others supports
 synchronization callbacks that are invoked when the
 Exchange is complete. The callback API is defined in
-`org.apache.camel.spi.Synchronization`. From *Camel 2.14* onwards we
-have an extended synchronization
+`org.apache.camel.spi.Synchronization`. and the extended synchronization
 `org.apache.camel.spi.SynchronizationRouteAware` that have callbacks for
 route events.
 
@@ -15,22 +14,20 @@ route events.
 You can get hold of the `org.apache.camel.spi.UnitOfWork` from
 `org.apache.camel.Exchange` with the method `getUnitOfWork()`.
 
-In Camel 2.0 we have added DSL for these callbacks using the new
-*onCompletion* DSL name.
+*OnCompletion DSL*
 
-*onCompletion* supports the following features:
+The *onCompletion* EIP supports the following features:
 
 * scope: global and/or per route (route scope override all global scope)
 * multiple global scope
 * triggered either always, only if completed with success, or only if
 failed
-* `onWhen` predicate to only trigger in certain situations
-* *Camel 2.14:* mode: to define whether to run either before or after
-route consumer writes response back to callee (if its InOut)
-* *Camel 2.14:* whether to run async or sync (use a thread pool or not)
+* `onWhen` predicate to only trigger if matched
+* `mode` to define whether to run either before or after
+route consumer writes response back to callee (if its InOut) (default AfterConsumer)
+* `parallelProcessing` whether to run async or sync (use a thread pool or not) (default false)
 
-From *Camel 2.14* onwards the onCompletion has been modified to support
-running the completion task in either synchronous or asynchronous mode
+The onCompletion supports running the completion task in either synchronous or asynchronous mode
 (using a thread pool) and also whether to run before or after the route
 consumer is done. The reason is to give more flexibility. For example to
 specify to run synchronous and before the route consumer is done, which
@@ -38,38 +35,11 @@ allows to modify the exchange before the consumer writes back any
 response to the callee. You can use this to for example add customer
 headers, or send to a log to log the response message, etc.
 
-
-*Changes from Camel 2.14 onwards*
-The onCompletion has changed defaults and behavior from Camel 2.14
-onwards. It now runs
-* Runs synchronously without any thread pool
-In Camel 2.13 the defaults were
-* Runs asynchronous using a thread pool
-
-
-*Camel 2.13 or older - On completion runs in separate thread*
-
-The *onCompletion* runs in a separate thread in parallel with the
-original route. It is therefore not intended to influence the outcome of
-the original route. The idea for on completion is to spin off a new
-thread to eg send logs to a central log database, send an email, send
-alterts to a monitoring system, store a copy of the result message
-etc. +
- Therefore if you want to do some work that influence the original
-route, then do *not* use *onCompletion* for that. Notice: if you use the
-`UnitOfWork` API as mentioned in the top of this page, then you can
-register a `Synchronization` callback on the
-Exchange which is executed in the original route.
-That way allows you to do some custom code when the route is completed;
-this is how custom components can enlist on completion services which
-they need, eg the File component does that for work
-that moves/deletes the original file etc.
-
 [[OnCompletion-onCompletionwithroutescope]]
 == onCompletion with route scope
 
 The *onCompletion* DSL allows you to add custom routes/processors when
-the original Exchange is complete. Camel spin off a
+the original Exchange is complete. Camel spin-off a
 copy of the Exchange and routes it in a separate
 thread, kinda like a Wire Tap. This allows the
 original thread to continue while the *onCompletion* route is running
@@ -99,7 +69,7 @@ from("direct:start")
 
 By default the *onCompletion* will be triggered when the
 Exchange is complete and regardless if the
-Exchange completed with success or with an failure
+Exchange completed with success or with a failure
 (such as an Exception was thrown). You can limit the trigger to only
 occur `onCompleteOnly` or by `onFailureOnly` as shown below:
 
@@ -120,15 +90,15 @@ from("direct:start")
 You can identify if the Exchange is an
 *onCompletion* Exchange as Camel will add the
 property `Exchange.ON_COMPLETION` with a boolean value of `true` when it
-spin offs the *onCompletion* Exchange.
+spin-off the *onCompletion* Exchange.
 
 [[OnCompletion-UsingonCompletionfromSpringDSL]]
-=== Using onCompletion from Spring DSL
+=== Using onCompletion from XML DSL
 
-The onCompletion is defined like this with Spring DSL:
+The onCompletion is defined like this with XML DSL:
 
 [source,xml]
------------------------------------------------------------
+----
 <route>
     <from uri="direct:start"/>
     <!-- this onCompletion block will only be executed when the exchange is done being routed -->
@@ -141,13 +111,13 @@ The onCompletion is defined like this with Spring DSL:
     <process ref="myProcessor"/>
     <to uri="mock:result"/>
 </route>
------------------------------------------------------------
+----
 
 And the `onCompleteOnly` and `onFailureOnly` is defined as a boolean
 attribute on the <onCompletion> tag so the failure example would be:
 
 [source,xml]
------------------------------------------------------------
+----
 <route>
     <from uri="direct:start"/>
     <!-- this onCompletion block will only be executed when the exchange is done being routed -->
@@ -159,7 +129,7 @@ attribute on the <onCompletion> tag so the failure example would be:
     <process ref="myProcessor"/>
     <to uri="mock:result"/>
 </route>
------------------------------------------------------------
+----
 
 [[OnCompletion-onCompletionwithglobalscope]]
 == onCompletion with global scope
@@ -168,14 +138,14 @@ This works just like the route scope except from the fact that they are
 defined globally. An example below:
 
 [source,java]
------------------------------------------------------------
+----
 // define a global on completion that is invoked when the exchange is complete
 onCompletion().to("log:global").to("mock:sync");
  
 from("direct:start")
     .process(new MyProcessor())
     .to("mock:result");
------------------------------------------------------------
+----
 
 [[OnCompletion-UsingonCompletionfromSpringDSL.1]]
 === Using onCompletion from Spring DSL
@@ -184,7 +154,7 @@ This works just like the route scope except from the fact that they are
 defined globally. An example below:
 
 [source,xml]
------------------------------------------------------------
+----
 <!-- this is a global onCompletion route that is invoke when any exchange is complete
      as a kind of after callback -->
 <onCompletion>
@@ -197,24 +167,24 @@ defined globally. An example below:
     <process ref="myProcessor"/>
     <to uri="mock:result"/>
 </route>
------------------------------------------------------------
+----
 
 
 *Route scope override Global scope*
 If an *onCompletion* is defined in a route, it overrides *all* global
-scoped and thus its only the route scoped that are used. The globally
-scoped ones are never used.
+scoped and thus it is only the route scoped that are used. The globally
+scoped are not in use.
 
 [[OnCompletion-UsingonCompletionwithonWhenpredicate]]
 == Using onCompletion with onWhen predicate
 
-As other DSL in Camel you can attach a Predicate to
+As other DSL in Camel you can attach a predicate to
 the *onCompletion* so it only triggers in certain conditions, when the
 predicate matches. For example to only trigger if the message body contains the word
 `Hello` we can do like:
 
 [source,java]
------------------------------------------------------------
+----
 from("direct:start")
     .onCompletion().onWhen(body().contains("Hello"))
         // this route is only invoked when the original route is complete as a kind
@@ -226,57 +196,51 @@ from("direct:start")
     // here the original route contiues
     .to("log:original")
     .to("mock:result");
------------------------------------------------------------
+----
 
 [[OnCompletion-UsingonCompletionwithorwithoutthreadpool]]
 == Using onCompletion with or without thread pool
 
-*Since Camel 2.14*
-
-OnCompletion will from Camel 2.14 onwards not use thread pool by
-default. To use thread pool then either set a `executorService` or set
+To use thread pool then either set a `executorService` or set
 `parallelProcessing` to true.
 
 For example in Java DSL do
 
 [source,java]
------------------------------------------------------------
-                onCompletion().parallelProcessing()
-                    .to("mock:before")
-                    .delay(1000)
-                    .setBody(simple("OnComplete:${body}"));
------------------------------------------------------------
+----
+onCompletion().parallelProcessing()
+    .to("mock:before")
+    .delay(1000)
+    .setBody(simple("OnComplete:${body}"));
+----
 
 And in XML DSL
 
-[source,java]
---------------------------------------------------------------
-      <onCompletion parallelProcessing="true">
-        <to uri="before"/>
-        <delay><constant>1000</constant></delay>
-        <setBody><simple>OnComplete:${body}</simple></setBody>
-      </onCompletion>
---------------------------------------------------------------
+[source,xml]
+----
+<onCompletion parallelProcessing="true">
+  <to uri="before"/>
+  <delay><constant>1000</constant></delay>
+  <setBody><simple>OnComplete:${body}</simple></setBody>
+</onCompletion>
+----
 
 You can also refer to a specific thread pool
 to be used, using the executorServiceRef option
 
-[source,java]
---------------------------------------------------------------
-      <onCompletion executorServiceRef="myThreadPool">
-        <to uri="before"/>
-        <delay><constant>1000</constant></delay>
-        <setBody><simple>OnComplete:${body}</simple></setBody>
-      </onCompletion>
---------------------------------------------------------------
+[source,xml]
+----
+<onCompletion executorServiceRef="myThreadPool">
+  <to uri="before"/>
+  <delay><constant>1000</constant></delay>
+  <setBody><simple>OnComplete:${body}</simple></setBody>
+</onCompletion>
+----
 
- 
 
 [[OnCompletion-UsingonCompletiontorunbeforerouteconsumersendsbackresponsetocallee]]
 == Using onCompletion to run before route consumer sends back response to callee
 
-*Since Camel 2.14*
-
 OnCompletion supports two modes
 
 * AfterConsumer - Default mode which runs after the consumer is done
@@ -296,21 +260,19 @@ use `modeBeforeConsumer()` as shown below:
 
 [source,java]
 ----------------------------------------------------
-    .onCompletion().modeBeforeConsumer()
-        .setHeader("createdBy", constant("Someone"))
-    .end()
+.onCompletion().modeBeforeConsumer()
+    .setHeader("createdBy", constant("Someone"))
+.end()
 ----------------------------------------------------
 
- 
-
 And in XML DSL you set the mode attribute to BeforeConsumer:
 
-[source,java]
-------------------------------------------
-      <onCompletion mode="BeforeConsumer">
-        <setHeader name="createdBy">
-          <constant>Someone</constant>
-        </setHeader>
-      </onCompletion>
-------------------------------------------
+[source,xml]
+----
+<onCompletion mode="BeforeConsumer">
+  <setHeader name="createdBy">
+    <constant>Someone</constant>
+  </setHeader>
+</onCompletion>
+----