You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/06/21 09:28:53 UTC

[camel] branch camel-3.x updated: CAMEL-19478: Add synchronous option to relevant EIPs

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.x by this push:
     new d299b841993 CAMEL-19478: Add synchronous option to relevant EIPs
d299b841993 is described below

commit d299b84199377b8195fc79ce849650deb3a06615
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Jun 21 11:27:57 2023 +0200

    CAMEL-19478: Add synchronous option to relevant EIPs
---
 .../docs/modules/eips/pages/multicast-eip.adoc     |   7 ++
 .../docs/modules/eips/pages/recipientList-eip.adoc |   7 ++
 .../main/docs/modules/eips/pages/split-eip.adoc    | 115 ++++++++++++++++-----
 .../org/apache/camel/model/multicast.json          |   3 +-
 .../org/apache/camel/model/recipientList.json      |   3 +-
 .../resources/org/apache/camel/model/split.json    |   3 +-
 .../apache/camel/model/MulticastDefinition.java    |  92 +++++++++++++++++
 .../camel/model/RecipientListDefinition.java       |  96 ++++++++++++++++-
 .../org/apache/camel/model/SplitDefinition.java    |  54 ++++++++++
 .../apache/camel/processor/MulticastProcessor.java |  34 ++++++
 .../org/apache/camel/processor/RecipientList.java  |  10 ++
 .../org/apache/camel/reifier/MulticastReifier.java |   2 +
 .../apache/camel/reifier/RecipientListReifier.java |   2 +
 .../org/apache/camel/reifier/SplitReifier.java     |   2 +
 .../MulticastParallelSynchronousTest.java          |  74 +++++++++++++
 .../RecipientListParallelSynchronousTest.java      |  77 ++++++++++++++
 .../processor/SplitParallelSynchronousTest.java    |  68 ++++++++++++
 .../java/org/apache/camel/xml/in/ModelParser.java  |   3 +
 .../dsl/yaml/deserializers/ModelDeserializers.java |  18 ++++
 .../generated/resources/schema/camel-yaml-dsl.json |   9 ++
 .../generated/resources/schema/camelYamlDsl.json   |   9 ++
 21 files changed, 658 insertions(+), 30 deletions(-)

diff --git a/core/camel-core-engine/src/main/docs/modules/eips/pages/multicast-eip.adoc b/core/camel-core-engine/src/main/docs/modules/eips/pages/multicast-eip.adoc
index a8c01165237..a3564b26af5 100644
--- a/core/camel-core-engine/src/main/docs/modules/eips/pages/multicast-eip.adoc
+++ b/core/camel-core-engine/src/main/docs/modules/eips/pages/multicast-eip.adoc
@@ -94,6 +94,13 @@ And in XML:
 </route>
 ----
 
+[IMPORTANT]
+====
+When parallel processing is enabled, then the Camel routing engin will continue processing
+using last used thread from the parallel thread pool. However, if you want to use the original
+thread that called the multicast, then make sure to enable the synchronous option as well.
+====
+
 === Ending a Multicast block
 
 You may want to continue routing the exchange after the Multicast EIP. In Java DSL you need to use `end()`
diff --git a/core/camel-core-engine/src/main/docs/modules/eips/pages/recipientList-eip.adoc b/core/camel-core-engine/src/main/docs/modules/eips/pages/recipientList-eip.adoc
index 227d071f47b..dd5525a6ef1 100644
--- a/core/camel-core-engine/src/main/docs/modules/eips/pages/recipientList-eip.adoc
+++ b/core/camel-core-engine/src/main/docs/modules/eips/pages/recipientList-eip.adoc
@@ -164,6 +164,13 @@ And in XML it is an attribute on `<recipientList>`:
 </route>
 ----
 
+[IMPORTANT]
+====
+When parallel processing is enabled, then the Camel routing engin will continue processing
+using last used thread from the parallel thread pool. However, if you want to use the original
+thread that called the recipient list, then make sure to enable the synchronous option as well.
+====
+
 ==== Using custom thread pool
 
 A thread pool is only used for `parallelProcessing`. You supply your own custom thread pool via the `ExecutorServiceStrategy` (see Camel's Threading Model),
diff --git a/core/camel-core-engine/src/main/docs/modules/eips/pages/split-eip.adoc b/core/camel-core-engine/src/main/docs/modules/eips/pages/split-eip.adoc
index 362d2482115..ba5f4d5f89d 100644
--- a/core/camel-core-engine/src/main/docs/modules/eips/pages/split-eip.adoc
+++ b/core/camel-core-engine/src/main/docs/modules/eips/pages/split-eip.adoc
@@ -90,6 +90,10 @@ so you could use any of the supported languages such as
 xref:components:languages:simple-language.adoc[Simple], xref:components:languages:xpath-language.adoc[XPath],
 xref:components:languages:jsonpath-language.adoc[JSonPath], xref:components:languages:groovy-language.adoc[Groovy] to perform the split.
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
 from("activemq:my.queue")
@@ -97,8 +101,8 @@ from("activemq:my.queue")
         .to("file:some/directory")
 ----
 
-And in XML:
-
+XML::
++
 [source,xml]
 ----
 <route>
@@ -109,6 +113,7 @@ And in XML:
     </split>
 </route>
 ----
+====
 
 === Splitting the message body
 
@@ -130,6 +135,10 @@ the Split EIP will be split into a single message (the same).
 
 To use this with the splitter you should _just_ use body as the expression:
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
 from("direct:splitUsingBody")
@@ -137,6 +146,8 @@ from("direct:splitUsingBody")
         .to("mock:result");
 ----
 
+XML::
++
 In XML you use xref:components:languages:simple-language.adoc[Simple] to refer to the message body:
 
 [source,xml]
@@ -149,6 +160,7 @@ In XML you use xref:components:languages:simple-language.adoc[Simple] to refer t
   </split>
 </route>
 ----
+====
 
 === Splitting with parallel processing
 
@@ -157,6 +169,10 @@ is processed by its own thread in parallel.
 
 The example below enabled parallel mode:
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
 from("direct:a")
@@ -166,8 +182,8 @@ from("direct:a")
     .to("direct:z");
 ----
 
-And in XML:
-
+XML::
++
 [source,xml]
 ----
 <route>
@@ -180,6 +196,14 @@ And in XML:
     </split>
 </route>
 ----
+====
+
+[IMPORTANT]
+====
+When parallel processing is enabled, then the Camel routing engin will continue processing
+using last used thread from the parallel thread pool. However, if you want to use the original
+thread that called the splitter, then make sure to enable the synchronous option as well.
+====
 
 === Ending a Split block
 
@@ -190,6 +214,10 @@ In the example above then sending to mock:result happens after the Spllit EIP ha
 In other words the message should finish splitting the entire message before the message
 continues.
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
 from("direct:a")
@@ -201,6 +229,8 @@ from("direct:a")
   .to("mock:result");
 ----
 
+XML::
++
 And in XML its intuitive as `</split>` marks the end of the block:
 
 [source,xml]
@@ -216,6 +246,7 @@ And in XML its intuitive as `</split>` marks the end of the block:
     <to uri="mock:result"/>
 </route>
 ----
+====
 
 === What is returned from Split EIP when its complete
 
@@ -230,6 +261,10 @@ as a single response exchange, that becomes the outgoing exchange after the Spli
 
 The example now aggregates with the `MyAggregationStrategy` class:
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
 from("direct:start")
@@ -241,6 +276,8 @@ from("direct:start")
   .to("mock:result");
 ----
 
+XML::
++
 And in XML we can refer to the FQN class name with `#class:` syntax as shown below:
 
 [source,xml]
@@ -256,6 +293,7 @@ And in XML we can refer to the FQN class name with `#class:` syntax as shown bel
     <to uri="mock:result"/>
 </route>
 ----
+====
 
 [NOTE]
 ====
@@ -276,6 +314,10 @@ The Split EIP operates in two modes when splitting:
 
 You can split in streaming mode as shown:
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
 from("direct:streaming")
@@ -283,8 +325,8 @@ from("direct:streaming")
     .to("activemq:my.parts");
 ----
 
-And in XML:
-
+XML::
++
 [source,xml]
 ----
 <route>
@@ -295,10 +337,15 @@ And in XML:
     </split>
 </route>
 ----
+====
 
 You can also supply a custom xref:components:languages:bean-language.adoc[Bean]
 to perform the splitting in streaming mode like this:
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
 from("direct:streaming")
@@ -306,8 +353,8 @@ from("direct:streaming")
     .to("activemq:my.parts")
 ----
 
-And in XML:
-
+XML::
++
 [source,xml]
 ----
 <route>
@@ -318,6 +365,7 @@ And in XML:
     </split>
 </route>
 ----
+====
 
 Then the custom bean could for example be implemented as follows:
 
@@ -393,15 +441,18 @@ Now to split this big file using xref:components:languages:xpath-language.adoc[X
 would cause the entire content to be loaded into memory. So instead we can use the
 xref:components:languages:tokenize-language.adoc[Tokenize] language to do this as follows:
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
 from("file:inbox")
   .split().tokenizeXML("order").streaming()
      .to("activemq:queue:order");
 ----
-
-In XML the route would be as follows:
-
+XML::
++
 [source,xml]
 ----
 <route>
@@ -412,6 +463,7 @@ In XML the route would be as follows:
   </split>
 </route>
 ----
+====
 
 This will split the file using the tag name of the child nodes (more precisely speaking, the local name of the element without its namespace prefix if any),
 which mean it will grab the content between the `<order>` and `</order>` tags (incl. the tags).
@@ -427,6 +479,19 @@ So for example a split message would be structured as follows:
 
 If you want to inherit namespaces from a root/parent tag, then you can do this as well by providing the name of the root/parent tag:
 
+[tabs]
+====
+Java::
++
+[source,java]
+----
+from("file:inbox")
+  .split().tokenizeXML("order", "orders").streaming()
+     .to("activemq:queue:order");
+----
+
+XML::
++
 [source,xml]
 ----
 <route>
@@ -437,15 +502,7 @@ If you want to inherit namespaces from a root/parent tag, then you can do this a
   </split>
 </route>
 ----
-
-And in Java DSL it is done like this:
-
-[source,java]
-----
-from("file:inbox")
-  .split().tokenizeXML("order", "orders").streaming()
-     .to("activemq:queue:order");
-----
+====
 
 You can set `inheritNamsepaceTagName` property to `*` to include the preceding context in each token (i.e., generating each token enclosed in its ancestor elements). It is noted that each token must share the same ancestor elements in this case.
 The above tokenizer works well on simple structures but has some inherent limitations in handling more complex XML structures.
@@ -549,6 +606,10 @@ can be used for grouping N parts together, for example to split big files into c
 
 Doing this is easy as the following example shows:
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
 from("file:inbox")
@@ -556,8 +617,8 @@ from("file:inbox")
      .to("activemq:queue:order");
 ----
 
-And in XML DSL:
-
+XML::
++
 [source,xml]
 ----
 <route>
@@ -568,6 +629,7 @@ And in XML DSL:
   </split>
 </route>
 ----
+====
 
 The `group` value must be a positive number dictating how many elements to combine together in a group.
 Each part will be combined using the token.
@@ -706,6 +768,10 @@ handle it. You can do this by specifying that it should stop in case of an
 exception occurred. This is done by the `stopOnException` option as
 shown below:
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
 from("direct:start")
@@ -716,8 +782,8 @@ from("direct:start")
     .to("direct:cheese");
 ----
 
-And using XML DSL you specify it as follows:
-
+XML::
++
 [source,xml]
 ----
 <route>
@@ -730,6 +796,7 @@ And using XML DSL you specify it as follows:
     <to uri="direct:cheese"/>
 </route>
 ----
+====
 
 In the example above, then `MyProcessor` is causing a failure and throws an exception.
 This means the Split EIP will stop after this, and not split anymore.
diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/multicast.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/multicast.json
index 1d1113310f9..4d58a81816f 100644
--- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/multicast.json
+++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/multicast.json
@@ -16,7 +16,8 @@
     "aggregationStrategyMethodName": { "kind": "attribute", "displayName": "Aggregation Strategy Method Name", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy." },
     "aggregationStrategyMethodAllowNull": { "kind": "attribute", "displayName": "Aggregation Strategy Method Allow Null", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null values is used as the oldExchange (when no data to enr [...]
     "parallelAggregate": { "kind": "attribute", "displayName": "Parallel Aggregate", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe. By default this is fals [...]
-    "parallelProcessing": { "kind": "attribute", "displayName": "Parallel Processing", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then sending messages to the multicasts occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only the sending and processing the replies from the mul [...]
+    "parallelProcessing": { "kind": "attribute", "displayName": "Parallel Processing", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then sending messages to the multicasts occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only the sending and processing the replies from the mul [...]
+    "synchronous": { "kind": "attribute", "displayName": "Synchronous", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Sets whether synchronous processing should be strictly used. When enabled then the same thread is used to continue routing after the multicast is complete, even if parallel processing is enabled." },
     "streaming": { "kind": "attribute", "displayName": "Streaming", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then Camel will process replies out-of-order, eg in the order they come back. If disabled, Camel will process replies in the same order as defined by the multicast." },
     "stopOnException": { "kind": "attribute", "displayName": "Stop On Exception", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will now stop further processing if an exception or failure occurred during processing of an org.apache.camel.Exchange and the caused exception will be thrown. Will also stop if processing the exchange failed (has a fault [...]
     "timeout": { "kind": "attribute", "displayName": "Timeout", "label": "advanced", "required": false, "type": "duration", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "0", "description": "Sets a total timeout specified in millis, when using parallel processing. If the Multicast hasn't been able to send and process all replies within the given timeframe, then the timeout triggers and the Multicast breaks out and continues. Not [...]
diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/recipientList.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/recipientList.json
index 13b1135ecb2..3580661323a 100644
--- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/recipientList.json
+++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/recipientList.json
@@ -18,7 +18,8 @@
     "aggregationStrategyMethodName": { "kind": "attribute", "displayName": "Aggregation Strategy Method Name", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy." },
     "aggregationStrategyMethodAllowNull": { "kind": "attribute", "displayName": "Aggregation Strategy Method Allow Null", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null values is used as the oldExchange (when no data to enr [...]
     "parallelAggregate": { "kind": "attribute", "displayName": "Parallel Aggregate", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe. By default this is fals [...]
-    "parallelProcessing": { "kind": "attribute", "displayName": "Parallel Processing", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then sending messages to the recipients occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only the sending and processing the replies from the rec [...]
+    "parallelProcessing": { "kind": "attribute", "displayName": "Parallel Processing", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then sending messages to the recipients occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only the sending and processing the replies from the rec [...]
+    "synchronous": { "kind": "attribute", "displayName": "Synchronous", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Sets whether synchronous processing should be strictly used. When enabled then the same thread is used to continue routing after the recipient list is complete, even if parallel processing is enabled." },
     "timeout": { "kind": "attribute", "displayName": "Timeout", "required": false, "type": "duration", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "0", "description": "Sets a total timeout specified in millis, when using parallel processing. If the Recipient List hasn't been able to send and process all replies within the given timeframe, then the timeout triggers and the Recipient List breaks out and continues. Notice if you  [...]
     "executorService": { "kind": "attribute", "displayName": "Executor Service", "label": "advanced", "required": false, "type": "object", "javaType": "java.util.concurrent.ExecutorService", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom Thread Pool to be used for parallel processing. Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well." },
     "stopOnException": { "kind": "attribute", "displayName": "Stop On Exception", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will now stop further processing if an exception or failure occurred during processing of an org.apache.camel.Exchange and the caused exception will be thrown. Will also stop if processing the exchange failed (has a fault [...]
diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/split.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/split.json
index 5087b2b7ed2..001943dc58d 100644
--- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/split.json
+++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/split.json
@@ -18,7 +18,8 @@
     "aggregationStrategyMethodName": { "kind": "attribute", "displayName": "Aggregation Strategy Method Name", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy." },
     "aggregationStrategyMethodAllowNull": { "kind": "attribute", "displayName": "Aggregation Strategy Method Allow Null", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null values is used as the oldExchange (when no data to enr [...]
     "parallelAggregate": { "kind": "attribute", "displayName": "Parallel Aggregate", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe. By default this is fals [...]
-    "parallelProcessing": { "kind": "attribute", "displayName": "Parallel Processing", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then processing each split messages occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. It's only processing the sub messages from the splitter which ha [...]
+    "parallelProcessing": { "kind": "attribute", "displayName": "Parallel Processing", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then processing each split messages occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. It's only processing the sub messages from the splitter which ha [...]
+    "synchronous": { "kind": "attribute", "displayName": "Synchronous", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Sets whether synchronous processing should be strictly used. When enabled then the same thread is used to continue routing after the split is complete, even if parallel processing is enabled." },
     "streaming": { "kind": "attribute", "displayName": "Streaming", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "When in streaming mode, then the splitter splits the original message on-demand, and each split message is processed one by one. This reduces memory usage as the splitter do not split all the messages first, but then we do not know the total size, and ther [...]
     "stopOnException": { "kind": "attribute", "displayName": "Stop On Exception", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will now stop further processing if an exception or failure occurred during processing of an org.apache.camel.Exchange and the caused exception will be thrown. Will also stop if processing the exchange failed (has a fault [...]
     "timeout": { "kind": "attribute", "displayName": "Timeout", "label": "advanced", "required": false, "type": "duration", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "0", "description": "Sets a total timeout specified in millis, when using parallel processing. If the Splitter hasn't been able to split and process all the sub messages within the given timeframe, then the timeout triggers and the Splitter breaks out and contin [...]
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/MulticastDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/MulticastDefinition.java
index 91fd7dee5bd..f20522c0fb9 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/MulticastDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/MulticastDefinition.java
@@ -65,6 +65,9 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition>
     private String parallelProcessing;
     @XmlAttribute
     @Metadata(javaType = "java.lang.Boolean")
+    private String synchronous;
+    @XmlAttribute
+    @Metadata(javaType = "java.lang.Boolean")
     private String streaming;
     @XmlAttribute
     @Metadata(label = "advanced", javaType = "java.lang.Boolean")
@@ -176,6 +179,10 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition>
      * until all messages has been fully processed, before it continues. Its only the sending and processing the replies
      * from the multicasts which happens concurrently.
      *
+     * When parallel processing is enabled, then the Camel routing engin will continue processing using last used thread
+     * from the parallel thread pool. However, if you want to use the original thread that called the multicast, then
+     * make sure to enable the synchronous option as well.
+     *
      * @return the builder
      */
     public MulticastDefinition parallelProcessing() {
@@ -188,6 +195,26 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition>
      * until all messages has been fully processed, before it continues. Its only the sending and processing the replies
      * from the multicasts which happens concurrently.
      *
+     * When parallel processing is enabled, then the Camel routing engin will continue processing using last used thread
+     * from the parallel thread pool. However, if you want to use the original thread that called the multicast, then
+     * make sure to enable the synchronous option as well.
+     *
+     * @return the builder
+     */
+    public MulticastDefinition parallelProcessing(String parallelProcessing) {
+        setParallelProcessing(parallelProcessing);
+        return this;
+    }
+
+    /**
+     * If enabled then sending messages to the multicasts occurs concurrently. Note the caller thread will still wait
+     * until all messages has been fully processed, before it continues. Its only the sending and processing the replies
+     * from the multicasts which happens concurrently.
+     *
+     * When parallel processing is enabled, then the Camel routing engin will continue processing using last used thread
+     * from the parallel thread pool. However, if you want to use the original thread that called the multicast, then
+     * make sure to enable the synchronous option as well.
+     *
      * @return the builder
      */
     public MulticastDefinition parallelProcessing(boolean parallelProcessing) {
@@ -195,6 +222,37 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition>
         return this;
     }
 
+    /**
+     * Sets whether synchronous processing should be strictly used. When enabled then the same thread is used to
+     * continue routing after the multicast is complete, even if parallel processing is enabled.
+     *
+     * @return the builder
+     */
+    public MulticastDefinition synchronous() {
+        return synchronous(true);
+    }
+
+    /**
+     * Sets whether synchronous processing should be strictly used. When enabled then the same thread is used to
+     * continue routing after the multicast is complete, even if parallel processing is enabled.
+     *
+     * @return the builder
+     */
+    public MulticastDefinition synchronous(boolean synchronous) {
+        return synchronous(Boolean.toString(synchronous));
+    }
+
+    /**
+     * Sets whether synchronous processing should be strictly used. When enabled then the same thread is used to
+     * continue routing after the multicast is complete, even if parallel processing is enabled.
+     *
+     * @return the builder
+     */
+    public MulticastDefinition synchronous(String synchronous) {
+        setSynchronous(synchronous);
+        return this;
+    }
+
     /**
      * If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would
      * require the implementation of AggregationStrategy to be implemented as thread-safe. By default this is false
@@ -208,6 +266,32 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition>
         return this;
     }
 
+    /**
+     * If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would
+     * require the implementation of AggregationStrategy to be implemented as thread-safe. By default this is false
+     * meaning that Camel synchronizes the call to the aggregate method. Though in some use-cases this can be used to
+     * archive higher performance when the AggregationStrategy is implemented as thread-safe.
+     *
+     * @return the builder
+     */
+    public MulticastDefinition parallelAggregate(boolean parallelAggregate) {
+        setParallelAggregate(Boolean.toString(parallelAggregate));
+        return this;
+    }
+
+    /**
+     * If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would
+     * require the implementation of AggregationStrategy to be implemented as thread-safe. By default this is false
+     * meaning that Camel synchronizes the call to the aggregate method. Though in some use-cases this can be used to
+     * archive higher performance when the AggregationStrategy is implemented as thread-safe.
+     *
+     * @return the builder
+     */
+    public MulticastDefinition parallelAggregate(String parallelAggregate) {
+        setParallelAggregate(parallelAggregate);
+        return this;
+    }
+
     /**
      * If enabled then Camel will process replies out-of-order, eg in the order they come back. If disabled, Camel will
      * process replies in the same order as defined by the multicast.
@@ -373,6 +457,14 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition>
         this.parallelProcessing = parallelProcessing;
     }
 
+    public String getSynchronous() {
+        return synchronous;
+    }
+
+    public void setSynchronous(String synchronous) {
+        this.synchronous = synchronous;
+    }
+
     public String getStreaming() {
         return streaming;
     }
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/RecipientListDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/RecipientListDefinition.java
index 2ebe2567c06..26bf2cd9476 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/RecipientListDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/RecipientListDefinition.java
@@ -66,6 +66,9 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
     @Metadata(javaType = "java.lang.Boolean")
     private String parallelProcessing;
     @XmlAttribute
+    @Metadata(javaType = "java.lang.Boolean")
+    private String synchronous;
+    @XmlAttribute
     @Metadata(javaType = "java.time.Duration", defaultValue = "0")
     private String timeout;
     @XmlAttribute
@@ -197,6 +200,10 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
      * until all messages has been fully processed, before it continues. Its only the sending and processing the replies
      * from the recipients which happens concurrently.
      *
+     * When parallel processing is enabled, then the Camel routing engin will continue processing using last used thread
+     * from the parallel thread pool. However, if you want to use the original thread that called the recipient list,
+     * then make sure to enable the synchronous option as well.
+     *
      * @return the builder
      */
     public RecipientListDefinition<Type> parallelProcessing() {
@@ -209,13 +216,32 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
      * until all messages has been fully processed, before it continues. Its only the sending and processing the replies
      * from the recipients which happens concurrently.
      *
+     * When parallel processing is enabled, then the Camel routing engin will continue processing using last used thread
+     * from the parallel thread pool. However, if you want to use the original thread that called the recipient list,
+     * then make sure to enable the synchronous option as well.
+     *
      * @return the builder
      */
-    public RecipientListDefinition<Type> parallelProcessing(boolean parallelProcessing) {
-        setParallelProcessing(Boolean.toString(parallelProcessing));
+    public RecipientListDefinition<Type> parallelProcessing(String parallelProcessing) {
+        setParallelProcessing(parallelProcessing);
         return this;
     }
 
+    /**
+     * If enabled then sending messages to the recipients occurs concurrently. Note the caller thread will still wait
+     * until all messages has been fully processed, before it continues. Its only the sending and processing the replies
+     * from the recipients which happens concurrently.
+     *
+     * When parallel processing is enabled, then the Camel routing engin will continue processing using last used thread
+     * from the parallel thread pool. However, if you want to use the original thread that called the recipient list,
+     * then make sure to enable the synchronous option as well.
+     *
+     * @return the builder
+     */
+    public RecipientListDefinition<Type> parallelProcessing(boolean parallelProcessing) {
+        return parallelProcessing(Boolean.toString(parallelProcessing));
+    }
+
     /**
      * If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would
      * require the implementation of AggregationStrategy to be implemented as thread-safe. By default this is false
@@ -225,7 +251,63 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
      * @return the builder
      */
     public RecipientListDefinition<Type> parallelAggregate() {
-        setParallelAggregate(Boolean.toString(true));
+        return parallelAggregate(Boolean.toString(true));
+    }
+
+    /**
+     * If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would
+     * require the implementation of AggregationStrategy to be implemented as thread-safe. By default this is false
+     * meaning that Camel synchronizes the call to the aggregate method. Though in some use-cases this can be used to
+     * archive higher performance when the AggregationStrategy is implemented as thread-safe.
+     *
+     * @return the builder
+     */
+    public RecipientListDefinition<Type> parallelAggregate(boolean parallelAggregate) {
+        setParallelAggregate(Boolean.toString(parallelAggregate));
+        return this;
+    }
+
+    /**
+     * If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would
+     * require the implementation of AggregationStrategy to be implemented as thread-safe. By default this is false
+     * meaning that Camel synchronizes the call to the aggregate method. Though in some use-cases this can be used to
+     * archive higher performance when the AggregationStrategy is implemented as thread-safe.
+     *
+     * @return the builder
+     */
+    public RecipientListDefinition<Type> parallelAggregate(String parallelAggregate) {
+        setParallelAggregate(parallelAggregate);
+        return this;
+    }
+
+    /**
+     * Sets whether synchronous processing should be strictly used. When enabled then the same thread is used to
+     * continue routing after the recipient list is complete, even if parallel processing is enabled.
+     *
+     * @return the builder
+     */
+    public RecipientListDefinition<Type> synchronous() {
+        return synchronous(true);
+    }
+
+    /**
+     * Sets whether synchronous processing should be strictly used. When enabled then the same thread is used to
+     * continue routing after the recipient list is complete, even if parallel processing is enabled.
+     *
+     * @return the builder
+     */
+    public RecipientListDefinition<Type> synchronous(boolean synchronous) {
+        return synchronous(Boolean.toString(synchronous));
+    }
+
+    /**
+     * Sets whether synchronous processing should be strictly used. When enabled then the same thread is used to
+     * continue routing after the recipient list is complete, even if parallel processing is enabled.
+     *
+     * @return the builder
+     */
+    public RecipientListDefinition<Type> synchronous(String synchronous) {
+        setSynchronous(synchronous);
         return this;
     }
 
@@ -430,6 +512,14 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
         this.parallelProcessing = parallelProcessing;
     }
 
+    public String getSynchronous() {
+        return synchronous;
+    }
+
+    public void setSynchronous(String synchronous) {
+        this.synchronous = synchronous;
+    }
+
     public String getIgnoreInvalidEndpoints() {
         return ignoreInvalidEndpoints;
     }
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/SplitDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/SplitDefinition.java
index 8eaf0f59b43..e976072fdff 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/SplitDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/SplitDefinition.java
@@ -65,6 +65,9 @@ public class SplitDefinition extends OutputExpressionNode implements ExecutorSer
     private String parallelProcessing;
     @XmlAttribute
     @Metadata(javaType = "java.lang.Boolean")
+    private String synchronous;
+    @XmlAttribute
+    @Metadata(javaType = "java.lang.Boolean")
     private String streaming;
     @XmlAttribute
     @Metadata(label = "advanced", javaType = "java.lang.Boolean")
@@ -194,6 +197,10 @@ public class SplitDefinition extends OutputExpressionNode implements ExecutorSer
      * all messages has been fully processed, before it continues. It's only processing the sub messages from the
      * splitter which happens concurrently.
      *
+     * When parallel processing is enabled, then the Camel routing engin will continue processing using last used thread
+     * from the parallel thread pool. However, if you want to use the original thread that called the splitter, then
+     * make sure to enable the synchronous option as well.
+     *
      * @return the builder
      */
     public SplitDefinition parallelProcessing() {
@@ -205,6 +212,10 @@ public class SplitDefinition extends OutputExpressionNode implements ExecutorSer
      * all messages has been fully processed, before it continues. It's only processing the sub messages from the
      * splitter which happens concurrently.
      *
+     * When parallel processing is enabled, then the Camel routing engin will continue processing using last used thread
+     * from the parallel thread pool. However, if you want to use the original thread that called the splitter, then
+     * make sure to enable the synchronous option as well.
+     *
      * @return the builder
      */
     public SplitDefinition parallelProcessing(boolean parallelProcessing) {
@@ -216,6 +227,10 @@ public class SplitDefinition extends OutputExpressionNode implements ExecutorSer
      * all messages has been fully processed, before it continues. It's only processing the sub messages from the
      * splitter which happens concurrently.
      *
+     * When parallel processing is enabled, then the Camel routing engin will continue processing using last used thread
+     * from the parallel thread pool. However, if you want to use the original thread that called the splitter, then
+     * make sure to enable the synchronous option as well.
+     *
      * @return the builder
      */
     public SplitDefinition parallelProcessing(String parallelProcessing) {
@@ -260,6 +275,37 @@ public class SplitDefinition extends OutputExpressionNode implements ExecutorSer
         return this;
     }
 
+    /**
+     * Sets whether synchronous processing should be strictly used. When enabled then the same thread is used to
+     * continue routing after the split is complete, even if parallel processing is enabled.
+     *
+     * @return the builder
+     */
+    public SplitDefinition synchronous() {
+        return synchronous(true);
+    }
+
+    /**
+     * Sets whether synchronous processing should be strictly used. When enabled then the same thread is used to
+     * continue routing after the split is complete, even if parallel processing is enabled.
+     *
+     * @return the builder
+     */
+    public SplitDefinition synchronous(boolean synchronous) {
+        return synchronous(Boolean.toString(synchronous));
+    }
+
+    /**
+     * Sets whether synchronous processing should be strictly used. When enabled then the same thread is used to
+     * continue routing after the split is complete, even if parallel processing is enabled.
+     *
+     * @return the builder
+     */
+    public SplitDefinition synchronous(String synchronous) {
+        setSynchronous(synchronous);
+        return this;
+    }
+
     /**
      * When in streaming mode, then the splitter splits the original message on-demand, and each split message is
      * processed one by one. This reduces memory usage as the splitter do not split all the messages first, but then we
@@ -522,6 +568,14 @@ public class SplitDefinition extends OutputExpressionNode implements ExecutorSer
         this.parallelProcessing = parallelProcessing;
     }
 
+    public String getSynchronous() {
+        return synchronous;
+    }
+
+    public void setSynchronous(String synchronous) {
+        this.synchronous = synchronous;
+    }
+
     public String getStreaming() {
         return streaming;
     }
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 0b5f1f6dc33..8ce05283e28 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -55,6 +55,7 @@ import org.apache.camel.Traceable;
 import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy;
 import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
 import org.apache.camel.processor.errorhandler.ErrorHandlerSupport;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.ErrorHandlerAware;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.InternalProcessorFactory;
@@ -149,6 +150,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
 
     protected final Processor onPrepare;
     protected final ProcessorExchangeFactory processorExchangeFactory;
+    private final AsyncProcessorAwaitManager awaitManager;
     private final CamelContext camelContext;
     private final InternalProcessorFactory internalProcessorFactory;
     private final Route route;
@@ -159,6 +161,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
     private Collection<Processor> processors;
     private final AggregationStrategy aggregationStrategy;
     private final boolean parallelProcessing;
+    private boolean synchronous;
     private final boolean streaming;
     private final boolean parallelAggregate;
     private final boolean stopOnException;
@@ -189,6 +192,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
         notNull(camelContext, "camelContext");
         this.camelContext = camelContext;
         this.internalProcessorFactory = camelContext.adapt(ExtendedCamelContext.class).getInternalProcessorFactory();
+        this.awaitManager = camelContext.adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager();
         this.route = route;
         this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor();
         this.processors = processors;
@@ -251,6 +255,14 @@ public class MulticastProcessor extends AsyncProcessorSupport
         return camelContext;
     }
 
+    public boolean isSynchronous() {
+        return synchronous;
+    }
+
+    public void setSynchronous(boolean synchronous) {
+        this.synchronous = synchronous;
+    }
+
     @Override
     protected void doBuild() throws Exception {
         if (processorExchangeFactory != null) {
@@ -289,6 +301,28 @@ public class MulticastProcessor extends AsyncProcessorSupport
 
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
+        if (synchronous) {
+            try {
+                // force synchronous processing using await manager
+                awaitManager.process(new AsyncProcessorSupport() {
+                    @Override
+                    public boolean process(Exchange exchange, AsyncCallback callback) {
+                        // must invoke doProcess directly here to avoid calling recursive
+                        return doProcess(exchange, callback);
+                    }
+                }, exchange);
+            } catch (Throwable e) {
+                exchange.setException(e);
+            } finally {
+                callback.done(true);
+            }
+            return true;
+        } else {
+            return doProcess(exchange, callback);
+        }
+    }
+
+    protected boolean doProcess(Exchange exchange, AsyncCallback callback) {
         Iterable<ProcessorExchangePair> pairs;
         int size = 0;
         try {
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
index bed81851648..dbccb6cacf7 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -58,6 +58,7 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou
     private final Expression expression;
     private final String delimiter;
     private boolean parallelProcessing;
+    private boolean synchronous;
     private boolean parallelAggregate;
     private boolean stopOnException;
     private boolean ignoreInvalidEndpoints;
@@ -205,6 +206,7 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou
                 camelContext, null, expression, delimiter, producerCache, getAggregationStrategy(),
                 isParallelProcessing(), getExecutorService(), isShutdownExecutorService(),
                 isStreaming(), isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork(), isParallelAggregate());
+        recipientListProcessor.setSynchronous(synchronous);
         recipientListProcessor.setErrorHandler(errorHandler);
         recipientListProcessor.setAggregateExecutorService(aggregateExecutorService);
         recipientListProcessor.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints());
@@ -264,6 +266,14 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou
         this.parallelProcessing = parallelProcessing;
     }
 
+    public boolean isSynchronous() {
+        return synchronous;
+    }
+
+    public void setSynchronous(boolean synchronous) {
+        this.synchronous = synchronous;
+    }
+
     public boolean isParallelAggregate() {
         return parallelAggregate;
     }
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java
index 2f638e8629e..86d2de72f2a 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java
@@ -58,6 +58,7 @@ public class MulticastReifier extends ProcessorReifier<MulticastDefinition> {
         final AggregationStrategy strategy = createAggregationStrategy();
 
         boolean isParallelProcessing = parseBoolean(definition.getParallelProcessing(), false);
+        boolean isSynchronous = parseBoolean(definition.getSynchronous(), false);
         boolean isShareUnitOfWork = parseBoolean(definition.getShareUnitOfWork(), false);
         boolean isStreaming = parseBoolean(definition.getStreaming(), false);
         boolean isStopOnException = parseBoolean(definition.getStopOnException(), false);
@@ -78,6 +79,7 @@ public class MulticastReifier extends ProcessorReifier<MulticastDefinition> {
         MulticastProcessor answer = new MulticastProcessor(
                 camelContext, route, list, strategy, isParallelProcessing, threadPool, shutdownThreadPool, isStreaming,
                 isStopOnException, timeout, prepare, isShareUnitOfWork, isParallelAggregate);
+        answer.setSynchronous(isSynchronous);
         return answer;
     }
 
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RecipientListReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RecipientListReifier.java
index bfc3ad6f61f..5ced2c43173 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RecipientListReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RecipientListReifier.java
@@ -46,6 +46,7 @@ public class RecipientListReifier extends ProcessorReifier<RecipientListDefiniti
         final Expression expression = createExpression(definition.getExpression());
 
         boolean isParallelProcessing = parseBoolean(definition.getParallelProcessing(), false);
+        boolean isSynchronous = parseBoolean(definition.getSynchronous(), false);
         boolean isStreaming = parseBoolean(definition.getStreaming(), false);
         boolean isParallelAggregate = parseBoolean(definition.getParallelAggregate(), false);
         boolean isShareUnitOfWork = parseBoolean(definition.getShareUnitOfWork(), false);
@@ -62,6 +63,7 @@ public class RecipientListReifier extends ProcessorReifier<RecipientListDefiniti
         answer.setAggregationStrategy(createAggregationStrategy());
         answer.setParallelProcessing(isParallelProcessing);
         answer.setParallelAggregate(isParallelAggregate);
+        answer.setSynchronous(isSynchronous);
         answer.setStreaming(isStreaming);
         answer.setShareUnitOfWork(isShareUnitOfWork);
         answer.setStopOnException(isStopOnException);
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/SplitReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/SplitReifier.java
index 471beee3b01..55fc66c1e4b 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/SplitReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/SplitReifier.java
@@ -44,6 +44,7 @@ public class SplitReifier extends ExpressionReifier<SplitDefinition> {
         final AggregationStrategy strategy = createAggregationStrategy();
 
         boolean isParallelProcessing = parseBoolean(definition.getParallelProcessing(), false);
+        boolean isSynchronous = parseBoolean(definition.getSynchronous(), false);
         boolean isStreaming = parseBoolean(definition.getStreaming(), false);
         boolean isShareUnitOfWork = parseBoolean(definition.getShareUnitOfWork(), false);
         boolean isParallelAggregate = parseBoolean(definition.getParallelAggregate(), false);
@@ -75,6 +76,7 @@ public class SplitReifier extends ExpressionReifier<SplitDefinition> {
                     threadPool, shutdownThreadPool, isStreaming, isStopOnException, timeout, prepare,
                     isShareUnitOfWork, isParallelAggregate);
         }
+        answer.setSynchronous(isSynchronous);
 
         return answer;
     }
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelSynchronousTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelSynchronousTest.java
new file mode 100644
index 00000000000..0b90f83f8e1
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelSynchronousTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class MulticastParallelSynchronousTest extends ContextTestSupport {
+
+    private String before;
+    private String middle;
+    private String middle2;
+    private String after;
+
+    @Test
+    public void testSynchronous() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:end");
+        mock.expectedMessageCount(1);
+
+        template.sendBody("direct:a", "A");
+
+        assertMockEndpointsSatisfied();
+
+        Assertions.assertNotEquals(before, middle);
+        Assertions.assertNotEquals(before, middle2);
+        Assertions.assertNotEquals(after, middle);
+        Assertions.assertNotEquals(after, middle2);
+        Assertions.assertEquals(before, after);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:a")
+                        .process(e -> {
+                            before = Thread.currentThread().getName();
+                        })
+                        .multicast().parallelProcessing().synchronous()
+                        .process(e -> {
+                            middle = Thread.currentThread().getName();
+                        })
+                        .process(e -> {
+                            middle2 = Thread.currentThread().getName();
+                        })
+                        .end()
+                        .process(e -> {
+                            after = Thread.currentThread().getName();
+                        })
+                        .to("mock:end");
+
+            }
+        };
+    }
+
+}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelSynchronousTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelSynchronousTest.java
new file mode 100644
index 00000000000..7ab24dbbced
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelSynchronousTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class RecipientListParallelSynchronousTest extends ContextTestSupport {
+
+    private String before;
+    private String middle;
+    private String middle2;
+    private String after;
+
+    @Test
+    public void testSynchronous() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:end");
+        mock.expectedMessageCount(1);
+
+        template.sendBodyAndHeader("direct:a", "A", "whereTo", "direct:b,direct:c");
+
+        assertMockEndpointsSatisfied();
+
+        Assertions.assertNotEquals(before, middle);
+        Assertions.assertNotEquals(before, middle2);
+        Assertions.assertNotEquals(after, middle);
+        Assertions.assertNotEquals(after, middle2);
+        Assertions.assertEquals(before, after);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:a")
+                        .process(e -> {
+                            before = Thread.currentThread().getName();
+                        })
+                        .recipientList(header("whereTo")).parallelProcessing().synchronous()
+                        .process(e -> {
+                            after = Thread.currentThread().getName();
+                        })
+                        .to("mock:end");
+
+                from("direct:b")
+                        .process(e -> {
+                            middle = Thread.currentThread().getName();
+                        });
+
+                from("direct:c")
+                        .process(e -> {
+                            middle2 = Thread.currentThread().getName();
+                        });
+
+            }
+        };
+    }
+
+}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/SplitParallelSynchronousTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/SplitParallelSynchronousTest.java
new file mode 100644
index 00000000000..4bd5a300ea3
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/SplitParallelSynchronousTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class SplitParallelSynchronousTest extends ContextTestSupport {
+
+    private String before;
+    private String middle;
+    private String after;
+
+    @Test
+    public void testSynchronous() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:end");
+        mock.expectedMessageCount(1);
+
+        template.sendBody("direct:a", "A,B,C,D,E");
+
+        assertMockEndpointsSatisfied();
+
+        Assertions.assertNotEquals(before, middle);
+        Assertions.assertNotEquals(after, middle);
+        Assertions.assertEquals(before, after);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:a")
+                        .process(e -> {
+                            before = Thread.currentThread().getName();
+                        })
+                        .split(body().tokenize(",")).parallelProcessing().synchronous()
+                        .process(e -> {
+                            middle = Thread.currentThread().getName();
+                        })
+                        .end()
+                        .process(e -> {
+                            after = Thread.currentThread().getName();
+                        })
+                        .to("mock:end");
+
+            }
+        };
+    }
+
+}
diff --git a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
index cf3b37c9597..cc748b0d14e 100644
--- a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
+++ b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
@@ -606,6 +606,7 @@ public class ModelParser extends BaseParser {
                 case "shareUnitOfWork": def.setShareUnitOfWork(val); break;
                 case "stopOnException": def.setStopOnException(val); break;
                 case "streaming": def.setStreaming(val); break;
+                case "synchronous": def.setSynchronous(val); break;
                 case "timeout": def.setTimeout(val); break;
                 default: return processorDefinitionAttributeHandler().accept(def, key, val);
             }
@@ -818,6 +819,7 @@ public class ModelParser extends BaseParser {
                 case "shareUnitOfWork": def.setShareUnitOfWork(val); break;
                 case "stopOnException": def.setStopOnException(val); break;
                 case "streaming": def.setStreaming(val); break;
+                case "synchronous": def.setSynchronous(val); break;
                 case "timeout": def.setTimeout(val); break;
                 default: return processorDefinitionAttributeHandler().accept(def, key, val);
             }
@@ -1343,6 +1345,7 @@ public class ModelParser extends BaseParser {
                 case "shareUnitOfWork": def.setShareUnitOfWork(val); break;
                 case "stopOnException": def.setStopOnException(val); break;
                 case "streaming": def.setStreaming(val); break;
+                case "synchronous": def.setSynchronous(val); break;
                 case "timeout": def.setTimeout(val); break;
                 default: return processorDefinitionAttributeHandler().accept(def, key, val);
             }
diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
index fbff89f0b11..42f77213767 100644
--- a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
+++ b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
@@ -8879,6 +8879,7 @@ public final class ModelDeserializers extends YamlDeserializerSupport {
                     @YamlProperty(name = "steps", type = "array:org.apache.camel.model.ProcessorDefinition"),
                     @YamlProperty(name = "stop-on-exception", type = "boolean"),
                     @YamlProperty(name = "streaming", type = "boolean"),
+                    @YamlProperty(name = "synchronous", type = "boolean"),
                     @YamlProperty(name = "timeout", type = "string")
             }
     )
@@ -8956,6 +8957,11 @@ public final class ModelDeserializers extends YamlDeserializerSupport {
                     target.setStreaming(val);
                     break;
                 }
+                case "synchronous": {
+                    String val = asText(node);
+                    target.setSynchronous(val);
+                    break;
+                }
                 case "timeout": {
                     String val = asText(node);
                     target.setTimeout(val);
@@ -11365,6 +11371,7 @@ public final class ModelDeserializers extends YamlDeserializerSupport {
                     @YamlProperty(name = "share-unit-of-work", type = "boolean"),
                     @YamlProperty(name = "stop-on-exception", type = "boolean"),
                     @YamlProperty(name = "streaming", type = "boolean"),
+                    @YamlProperty(name = "synchronous", type = "boolean"),
                     @YamlProperty(name = "timeout", type = "string")
             }
     )
@@ -11462,6 +11469,11 @@ public final class ModelDeserializers extends YamlDeserializerSupport {
                     target.setStreaming(val);
                     break;
                 }
+                case "synchronous": {
+                    String val = asText(node);
+                    target.setSynchronous(val);
+                    break;
+                }
                 case "timeout": {
                     String val = asText(node);
                     target.setTimeout(val);
@@ -15286,6 +15298,7 @@ public final class ModelDeserializers extends YamlDeserializerSupport {
                     @YamlProperty(name = "steps", type = "array:org.apache.camel.model.ProcessorDefinition"),
                     @YamlProperty(name = "stop-on-exception", type = "boolean"),
                     @YamlProperty(name = "streaming", type = "boolean"),
+                    @YamlProperty(name = "synchronous", type = "boolean"),
                     @YamlProperty(name = "timeout", type = "string")
             }
     )
@@ -15373,6 +15386,11 @@ public final class ModelDeserializers extends YamlDeserializerSupport {
                     target.setStreaming(val);
                     break;
                 }
+                case "synchronous": {
+                    String val = asText(node);
+                    target.setSynchronous(val);
+                    break;
+                }
                 case "timeout": {
                     String val = asText(node);
                     target.setTimeout(val);
diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camel-yaml-dsl.json b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camel-yaml-dsl.json
index aae0998745d..7f8aeca9036 100644
--- a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camel-yaml-dsl.json
+++ b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camel-yaml-dsl.json
@@ -1650,6 +1650,9 @@
           "streaming" : {
             "type" : "boolean"
           },
+          "synchronous" : {
+            "type" : "boolean"
+          },
           "timeout" : {
             "type" : "string"
           }
@@ -2115,6 +2118,9 @@
           "streaming" : {
             "type" : "boolean"
           },
+          "synchronous" : {
+            "type" : "boolean"
+          },
           "timeout" : {
             "type" : "string"
           }
@@ -3015,6 +3021,9 @@
           "streaming" : {
             "type" : "boolean"
           },
+          "synchronous" : {
+            "type" : "boolean"
+          },
           "timeout" : {
             "type" : "string"
           }
diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json
index c60ab670796..e8701676be3 100644
--- a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json
+++ b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json
@@ -1554,6 +1554,9 @@
           "streaming" : {
             "type" : "boolean"
           },
+          "synchronous" : {
+            "type" : "boolean"
+          },
           "timeout" : {
             "type" : "string"
           }
@@ -2019,6 +2022,9 @@
           "streaming" : {
             "type" : "boolean"
           },
+          "synchronous" : {
+            "type" : "boolean"
+          },
           "timeout" : {
             "type" : "string"
           }
@@ -2919,6 +2925,9 @@
           "streaming" : {
             "type" : "boolean"
           },
+          "synchronous" : {
+            "type" : "boolean"
+          },
           "timeout" : {
             "type" : "string"
           }