You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/01/27 08:13:39 UTC

[camel] branch main updated: CAMEL-17500: Simplified Dynamic Router control channel messages. (#6810)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 55addd0  CAMEL-17500: Simplified Dynamic Router control channel messages. (#6810)
55addd0 is described below

commit 55addd015d17278e9133a9cc3ec628f4299c7b34
Author: Steve Storck <st...@gmail.com>
AuthorDate: Thu Jan 27 03:13:07 2022 -0500

    CAMEL-17500: Simplified Dynamic Router control channel messages. (#6810)
    
    * CAMEL-17500: Simplified Dynamic Router control channel messages.
    
    * CAMEL-17500: Fixed a few missed checkstyle problems.
    
    * CAMEL-17500: Updated to allow mode to match multiple participants.
    
    * CAMEL-17500: Added mode to send exchanges to all matching recipients.
---
 .../DynamicRouterComponentConfigurer.java          |   6 -
 .../DynamicRouterEndpointConfigurer.java           |  74 ++--
 .../DynamicRouterEndpointUriFactory.java           |  23 +-
 .../component/dynamicrouter/dynamic-router.json    |  25 +-
 .../src/main/docs/dynamic-router-component.adoc    |  81 +++-
 .../dynamicrouter/DynamicRouterComponent.java      | 109 +++---
 .../dynamicrouter/DynamicRouterConfiguration.java  | 406 +++++++++++++++++++++
 .../dynamicrouter/DynamicRouterConstants.java      |  61 +++-
 .../dynamicrouter/DynamicRouterConsumer.java       | 165 ---------
 ...DynamicRouterConsumerNotAvailableException.java |  32 --
 .../DynamicRouterControlChannelProcessor.java      | 112 +++++-
 .../{message => }/DynamicRouterControlMessage.java |   2 +-
 .../DynamicRouterControlProducer.java              | 114 ++++++
 .../dynamicrouter/DynamicRouterEndpoint.java       | 342 ++++++-----------
 .../{processor => }/DynamicRouterProcessor.java    |  83 +++--
 .../dynamicrouter/DynamicRouterProducer.java       | 286 ++++++---------
 .../PrioritizedFilterProcessor.java                |   5 +-
 .../org/apache/camel/component/dynamic-router      |   2 +-
 .../dynamicrouter/DynamicRouterComponentTest.java  |  48 +--
 .../DynamicRouterConfigurationTest.java            |  48 +++
 .../dynamicrouter/DynamicRouterConsumerTest.java   |  79 ----
 .../DynamicRouterControlChannelProcessorTest.java  | 229 ++++++++++++
 .../DynamicRouterControlMessageTest.java           |  93 +++++
 .../DynamicRouterControlProducerTest.java          |  47 +++
 .../dynamicrouter/DynamicRouterEndpointTest.java   |  45 ++-
 .../DynamicRouterProcessorTest.java                |  42 ++-
 .../dynamicrouter/DynamicRouterProducerTest.java   |  55 +--
 .../PrioritizedFilterProcessorTest.java            |  56 +++
 .../DynamicRouterBasicSynchronousIT.java           |   6 +-
 ...a => DynamicRouterMultipleRecipientModeIT.java} |  39 +-
 .../DynamicRouterSingleRouteTwoParticipantsIT.java |   4 +-
 .../integration/DynamicRouterTwoRoutesIT.java      |   4 +-
 .../integration/DynamicRouterUriControlIT.java     | 203 +++++++++++
 ...tSupport.java => DynamicRouterTestSupport.java} | 129 +++++--
 ...namicRouterMultipleRecipientModeIT-context.xml} |   2 +-
 ...RouterSingleRouteTwoParticipantsIT-context.xml} |   0
 36 files changed, 2020 insertions(+), 1037 deletions(-)

diff --git a/components/camel-dynamic-router/src/generated/java/org/apache/camel/component/dynamicrouter/DynamicRouterComponentConfigurer.java b/components/camel-dynamic-router/src/generated/java/org/apache/camel/component/dynamicrouter/DynamicRouterComponentConfigurer.java
index f67eca4..d78015f 100644
--- a/components/camel-dynamic-router/src/generated/java/org/apache/camel/component/dynamicrouter/DynamicRouterComponentConfigurer.java
+++ b/components/camel-dynamic-router/src/generated/java/org/apache/camel/component/dynamicrouter/DynamicRouterComponentConfigurer.java
@@ -23,8 +23,6 @@ public class DynamicRouterComponentConfigurer extends PropertyConfigurerSupport
         switch (ignoreCase ? name.toLowerCase() : name) {
         case "autowiredenabled":
         case "autowiredEnabled": target.setAutowiredEnabled(property(camelContext, boolean.class, value)); return true;
-        case "bridgeerrorhandler":
-        case "bridgeErrorHandler": target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); return true;
         case "lazystartproducer":
         case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true;
         default: return false;
@@ -36,8 +34,6 @@ public class DynamicRouterComponentConfigurer extends PropertyConfigurerSupport
         switch (ignoreCase ? name.toLowerCase() : name) {
         case "autowiredenabled":
         case "autowiredEnabled": return boolean.class;
-        case "bridgeerrorhandler":
-        case "bridgeErrorHandler": return boolean.class;
         case "lazystartproducer":
         case "lazyStartProducer": return boolean.class;
         default: return null;
@@ -50,8 +46,6 @@ public class DynamicRouterComponentConfigurer extends PropertyConfigurerSupport
         switch (ignoreCase ? name.toLowerCase() : name) {
         case "autowiredenabled":
         case "autowiredEnabled": return target.isAutowiredEnabled();
-        case "bridgeerrorhandler":
-        case "bridgeErrorHandler": return target.isBridgeErrorHandler();
         case "lazystartproducer":
         case "lazyStartProducer": return target.isLazyStartProducer();
         default: return null;
diff --git a/components/camel-dynamic-router/src/generated/java/org/apache/camel/component/dynamicrouter/DynamicRouterEndpointConfigurer.java b/components/camel-dynamic-router/src/generated/java/org/apache/camel/component/dynamicrouter/DynamicRouterEndpointConfigurer.java
index ed8e649..b67b22b 100644
--- a/components/camel-dynamic-router/src/generated/java/org/apache/camel/component/dynamicrouter/DynamicRouterEndpointConfigurer.java
+++ b/components/camel-dynamic-router/src/generated/java/org/apache/camel/component/dynamicrouter/DynamicRouterEndpointConfigurer.java
@@ -21,21 +21,23 @@ public class DynamicRouterEndpointConfigurer extends PropertyConfigurerSupport i
     public boolean configure(CamelContext camelContext, Object obj, String name, Object value, boolean ignoreCase) {
         DynamicRouterEndpoint target = (DynamicRouterEndpoint) obj;
         switch (ignoreCase ? name.toLowerCase() : name) {
-        case "block": target.setBlock(property(camelContext, boolean.class, value)); return true;
-        case "bridgeerrorhandler":
-        case "bridgeErrorHandler": target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); return true;
-        case "exceptionhandler":
-        case "exceptionHandler": target.setExceptionHandler(property(camelContext, org.apache.camel.spi.ExceptionHandler.class, value)); return true;
-        case "exchangepattern":
-        case "exchangePattern": target.setExchangePattern(property(camelContext, org.apache.camel.ExchangePattern.class, value)); return true;
-        case "failifnoconsumers":
-        case "failIfNoConsumers": target.setFailIfNoConsumers(property(camelContext, boolean.class, value)); return true;
+        case "destinationuri":
+        case "destinationUri": target.getConfiguration().setDestinationUri(property(camelContext, java.lang.String.class, value)); return true;
+        case "expressionlanguage":
+        case "expressionLanguage": target.getConfiguration().setExpressionLanguage(property(camelContext, java.lang.String.class, value)); return true;
         case "lazystartproducer":
         case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true;
-        case "synchronous": target.setSynchronous(property(camelContext, boolean.class, value)); return true;
-        case "timeout": target.setTimeout(property(camelContext, long.class, value)); return true;
+        case "predicate": target.getConfiguration().setPredicate(property(camelContext, java.lang.String.class, value)); return true;
+        case "predicatebean":
+        case "predicateBean": target.getConfiguration().setPredicateBean(property(camelContext, org.apache.camel.Predicate.class, value)); return true;
+        case "priority": target.getConfiguration().setPriority(property(camelContext, java.lang.Integer.class, value)); return true;
+        case "recipientmode":
+        case "recipientMode": target.getConfiguration().setRecipientMode(property(camelContext, java.lang.String.class, value)); return true;
+        case "subscriptionid":
+        case "subscriptionId": target.getConfiguration().setSubscriptionId(property(camelContext, java.lang.String.class, value)); return true;
+        case "synchronous": target.getConfiguration().setSynchronous(property(camelContext, boolean.class, value)); return true;
         case "warndroppedmessage":
-        case "warnDroppedMessage": target.setWarnDroppedMessage(property(camelContext, boolean.class, value)); return true;
+        case "warnDroppedMessage": target.getConfiguration().setWarnDroppedMessage(property(camelContext, boolean.class, value)); return true;
         default: return false;
         }
     }
@@ -43,19 +45,21 @@ public class DynamicRouterEndpointConfigurer extends PropertyConfigurerSupport i
     @Override
     public Class<?> getOptionType(String name, boolean ignoreCase) {
         switch (ignoreCase ? name.toLowerCase() : name) {
-        case "block": return boolean.class;
-        case "bridgeerrorhandler":
-        case "bridgeErrorHandler": return boolean.class;
-        case "exceptionhandler":
-        case "exceptionHandler": return org.apache.camel.spi.ExceptionHandler.class;
-        case "exchangepattern":
-        case "exchangePattern": return org.apache.camel.ExchangePattern.class;
-        case "failifnoconsumers":
-        case "failIfNoConsumers": return boolean.class;
+        case "destinationuri":
+        case "destinationUri": return java.lang.String.class;
+        case "expressionlanguage":
+        case "expressionLanguage": return java.lang.String.class;
         case "lazystartproducer":
         case "lazyStartProducer": return boolean.class;
+        case "predicate": return java.lang.String.class;
+        case "predicatebean":
+        case "predicateBean": return org.apache.camel.Predicate.class;
+        case "priority": return java.lang.Integer.class;
+        case "recipientmode":
+        case "recipientMode": return java.lang.String.class;
+        case "subscriptionid":
+        case "subscriptionId": return java.lang.String.class;
         case "synchronous": return boolean.class;
-        case "timeout": return long.class;
         case "warndroppedmessage":
         case "warnDroppedMessage": return boolean.class;
         default: return null;
@@ -66,21 +70,23 @@ public class DynamicRouterEndpointConfigurer extends PropertyConfigurerSupport i
     public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
         DynamicRouterEndpoint target = (DynamicRouterEndpoint) obj;
         switch (ignoreCase ? name.toLowerCase() : name) {
-        case "block": return target.isBlock();
-        case "bridgeerrorhandler":
-        case "bridgeErrorHandler": return target.isBridgeErrorHandler();
-        case "exceptionhandler":
-        case "exceptionHandler": return target.getExceptionHandler();
-        case "exchangepattern":
-        case "exchangePattern": return target.getExchangePattern();
-        case "failifnoconsumers":
-        case "failIfNoConsumers": return target.isFailIfNoConsumers();
+        case "destinationuri":
+        case "destinationUri": return target.getConfiguration().getDestinationUri();
+        case "expressionlanguage":
+        case "expressionLanguage": return target.getConfiguration().getExpressionLanguage();
         case "lazystartproducer":
         case "lazyStartProducer": return target.isLazyStartProducer();
-        case "synchronous": return target.isSynchronous();
-        case "timeout": return target.getTimeout();
+        case "predicate": return target.getConfiguration().getPredicate();
+        case "predicatebean":
+        case "predicateBean": return target.getConfiguration().getPredicateBean();
+        case "priority": return target.getConfiguration().getPriority();
+        case "recipientmode":
+        case "recipientMode": return target.getConfiguration().getRecipientMode();
+        case "subscriptionid":
+        case "subscriptionId": return target.getConfiguration().getSubscriptionId();
+        case "synchronous": return target.getConfiguration().isSynchronous();
         case "warndroppedmessage":
-        case "warnDroppedMessage": return target.isWarnDroppedMessage();
+        case "warnDroppedMessage": return target.getConfiguration().isWarnDroppedMessage();
         default: return null;
         }
     }
diff --git a/components/camel-dynamic-router/src/generated/java/org/apache/camel/component/dynamicrouter/DynamicRouterEndpointUriFactory.java b/components/camel-dynamic-router/src/generated/java/org/apache/camel/component/dynamicrouter/DynamicRouterEndpointUriFactory.java
index a1fef2c..0f67eb1 100644
--- a/components/camel-dynamic-router/src/generated/java/org/apache/camel/component/dynamicrouter/DynamicRouterEndpointUriFactory.java
+++ b/components/camel-dynamic-router/src/generated/java/org/apache/camel/component/dynamicrouter/DynamicRouterEndpointUriFactory.java
@@ -21,17 +21,20 @@ public class DynamicRouterEndpointUriFactory extends org.apache.camel.support.co
     private static final Set<String> SECRET_PROPERTY_NAMES;
     private static final Set<String> MULTI_VALUE_PREFIXES;
     static {
-        Set<String> props = new HashSet<>(10);
-        props.add("lazyStartProducer");
-        props.add("bridgeErrorHandler");
-        props.add("warnDroppedMessage");
+        Set<String> props = new HashSet<>(13);
+        props.add("recipientMode");
         props.add("synchronous");
         props.add("channel");
-        props.add("exchangePattern");
-        props.add("failIfNoConsumers");
-        props.add("block");
-        props.add("exceptionHandler");
-        props.add("timeout");
+        props.add("expressionLanguage");
+        props.add("priority");
+        props.add("subscribeChannel");
+        props.add("predicate");
+        props.add("lazyStartProducer");
+        props.add("destinationUri");
+        props.add("warnDroppedMessage");
+        props.add("predicateBean");
+        props.add("controlAction");
+        props.add("subscriptionId");
         PROPERTY_NAMES = Collections.unmodifiableSet(props);
         SECRET_PROPERTY_NAMES = Collections.emptySet();
         MULTI_VALUE_PREFIXES = Collections.emptySet();
@@ -50,6 +53,8 @@ public class DynamicRouterEndpointUriFactory extends org.apache.camel.support.co
         Map<String, Object> copy = new HashMap<>(properties);
 
         uri = buildPathParameter(syntax, uri, "channel", null, true, copy);
+        uri = buildPathParameter(syntax, uri, "controlAction", null, false, copy);
+        uri = buildPathParameter(syntax, uri, "subscribeChannel", null, false, copy);
         uri = buildQueryParameters(uri, copy, encode);
         return uri;
     }
diff --git a/components/camel-dynamic-router/src/generated/resources/org/apache/camel/component/dynamicrouter/dynamic-router.json b/components/camel-dynamic-router/src/generated/resources/org/apache/camel/component/dynamicrouter/dynamic-router.json
index 8b02990..9cf0104 100644
--- a/components/camel-dynamic-router/src/generated/resources/org/apache/camel/component/dynamicrouter/dynamic-router.json
+++ b/components/camel-dynamic-router/src/generated/resources/org/apache/camel/component/dynamicrouter/dynamic-router.json
@@ -15,27 +15,30 @@
     "scheme": "dynamic-router",
     "extendsScheme": "",
     "syntax": "dynamic-router:channel",
+    "alternativeSyntax": "dynamic-router:channel\/controlAction\/subscribeChannel",
     "async": false,
     "api": false,
     "consumerOnly": false,
-    "producerOnly": false,
+    "producerOnly": true,
     "lenientProperties": false
   },
   "componentProperties": {
-    "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a me [...]
     "lazyStartProducer": { "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during star [...]
     "autowiredEnabled": { "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which t [...]
   },
   "properties": {
-    "channel": { "kind": "path", "displayName": "Channel", "group": "common", "label": "common", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Channel of the Dynamic Router" },
-    "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a m [...]
-    "exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the con [...]
-    "exchangePattern": { "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", "InOptionalOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." },
-    "block": { "kind": "parameter", "displayName": "Block", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Flag that determines if the producer should block while waiting for a consumer." },
-    "failIfNoConsumers": { "kind": "parameter", "displayName": "Fail If No Consumers", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Flag to fail if there are no consumers." },
+    "channel": { "kind": "path", "displayName": "Channel", "group": "common", "label": "common", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.dynamicrouter.DynamicRouterConfiguration", "configurationField": "configuration", "description": "Channel of the Dynamic Router" },
+    "controlAction": { "kind": "path", "displayName": "Control Action", "group": "control", "label": "control", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "subscribe", "unsubscribe" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.dynamicrouter.DynamicRouterConfiguration", "configurationField": "configuration", "description": "Control channel action: subscribe or unsub [...]
+    "subscribeChannel": { "kind": "path", "displayName": "Subscribe Channel", "group": "control", "label": "control", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.dynamicrouter.DynamicRouterConfiguration", "configurationField": "configuration", "description": "The channel to subscribe to" },
+    "recipientMode": { "kind": "parameter", "displayName": "Recipient Mode", "group": "common", "label": "common", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "firstMatch", "allMatch" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "firstMatch", "configurationClass": "org.apache.camel.component.dynamicrouter.DynamicRouterConfiguration", "configurationField": "configuration", "description": "Recipient mode: firstMatch or allM [...]
+    "synchronous": { "kind": "parameter", "displayName": "Synchronous", "group": "common", "label": "common", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.dynamicrouter.DynamicRouterConfiguration", "configurationField": "configuration", "description": "Flag to ensure synchronous processing." },
+    "warnDroppedMessage": { "kind": "parameter", "displayName": "Warn Dropped Message", "group": "common", "label": "common", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.dynamicrouter.DynamicRouterConfiguration", "configurationField": "configuration", "description": "Flag to log a warning if no predicates match for an exchange." },
     "lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during sta [...]
-    "timeout": { "kind": "parameter", "displayName": "Timeout", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 30000, "description": "The time limit, in milliseconds, if\/when the producer blocks while waiting for a consumer." },
-    "warnDroppedMessage": { "kind": "parameter", "displayName": "Warn Dropped Message", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Flag to log a warning if no predicates match for an exchange." },
-    "synchronous": { "kind": "parameter", "displayName": "Synchronous", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Flag to ensure synchronous processing." }
+    "destinationUri": { "kind": "parameter", "displayName": "Destination Uri", "group": "control", "label": "control", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.dynamicrouter.DynamicRouterConfiguration", "configurationField": "configuration", "description": "The destination URI for exchanges that match." },
+    "expressionLanguage": { "kind": "parameter", "displayName": "Expression Language", "group": "control", "label": "control", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "simple", "configurationClass": "org.apache.camel.component.dynamicrouter.DynamicRouterConfiguration", "configurationField": "configuration", "description": "The subscription predicate language." },
+    "predicate": { "kind": "parameter", "displayName": "Predicate", "group": "control", "label": "control", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.dynamicrouter.DynamicRouterConfiguration", "configurationField": "configuration", "description": "The subscription predicate." },
+    "predicateBean": { "kind": "parameter", "displayName": "Predicate Bean", "group": "control", "label": "control", "required": false, "type": "object", "javaType": "org.apache.camel.Predicate", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.dynamicrouter.DynamicRouterConfiguration", "configurationField": "configuration", "description": "A Predicate instance in the registry." },
+    "priority": { "kind": "parameter", "displayName": "Priority", "group": "control", "label": "control", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.dynamicrouter.DynamicRouterConfiguration", "configurationField": "configuration", "description": "The subscription priority." },
+    "subscriptionId": { "kind": "parameter", "displayName": "Subscription Id", "group": "control", "label": "control", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.dynamicrouter.DynamicRouterConfiguration", "configurationField": "configuration", "description": "The subscription ID; if unspecified, one will be assigned and returned." }
   }
 }
diff --git a/components/camel-dynamic-router/src/main/docs/dynamic-router-component.adoc b/components/camel-dynamic-router/src/main/docs/dynamic-router-component.adoc
index 64c7f6b..a81b708 100644
--- a/components/camel-dynamic-router/src/main/docs/dynamic-router-component.adoc
+++ b/components/camel-dynamic-router/src/main/docs/dynamic-router-component.adoc
@@ -5,7 +5,7 @@
 :description: The Dynamic Router component routes exchanges to recipients, and the recipients (and their rules) may change at runtime.
 :since: 3.15
 :supportlevel: Preview
-:component-header: Both producer and consumer are supported
+:component-header: Only producer is supported
 //Manually maintained attributes
 :camel-spring-boot-name: dynamic-router
 
@@ -36,8 +36,19 @@ For some use cases, the core Dynamic Router will be more appropriate.  In other
 dynamic-router:channel[?options]
 ----
 
+== Control URI format
+
+[source]
+----
+dynamic-router:control/controlAction/subscribeChannel[?options]
+----
+
 The `channel` is the routing channel that allows messaging to be logically separate from other channels.  Any string that can be included in a URI is a valid channel name.  Each channel can have a set of participant subscriptions, and can consume messages to be routed to appropriate recipients.  The only reserved channel is the `control` channel.  This is a single channel that handles control messages for participants to subscribe or unsubscribe for messaging over a desired channel.
 
+For control channel messages, the chanel must have a literal value of `control`.  The value of `controlAction` must be `subscribe` or `unsubscribe`.  The channel to which the action applies is specified as the `subscribeChannel` param.
+
+These messages will be described in greater detail below, with examples.
+
 // component-configure options: START
 // component-configure options: END
 
@@ -95,9 +106,11 @@ And the same route using XML DSL:
 
 == Subscribing
 
+=== Method 1: Subscribe Message POJO
+
 Participating recipients may subscribe by sending a `DynamicRouterControlMessage` to the `control` channel.  An example subscribe message might look like the following:
 
-.Example Subscribe Message
+.Example Subscribe Message as POJO
 [source,java]
 ----
 // Send a message to the Dynamic Router "billing" channel
@@ -120,11 +133,61 @@ The parameters, in order, are:
 . _Endpoint URI_:  If an evaluation of the rules in the rule base determines that the exchange is appropriate for the recipient, it is sent to this URI.
 . _Predicate_:  For evaluating the exchange to determine if a message should be routed to the supplied URI.
 
+=== Method 2: URI Only
+
+You can also use a simplified method for subscribing using URL parameters, if you wish.  The subscribe message, detailed above, can be sent via the control URI like this (with an empty body, since it will be ignored):
+
+.Example Subscribe Message Using URI
+[source,java]
+----
+template.sendBody("dynamic-router:control/subscribe/billing?subscriptionId=billingSubscription&priority=10&destinationUri=<URI here>&predicate=<text predicate>", "");
+----
+
+=== Method 3: URI and Predicate as Bean
+
+The predicate can also be supplied as a bean in the Camel context registry.  The URI example is modified slightly:
+
+.Example Subscribe Message Using URI and Predicate Bean
+[source,java]
+----
+// Register a bean, somehow, in the registry:
+camelContext.getRegistry().bind("predicateBean", Predicate.class, PredicateBuilder.constant(true));
+template.sendBody("dynamic-router:control/subscribe/billing?subscriptionId=billingSubscription&priority=10&destinationUri=<URI here>&predicate=#bean:predicateBean", "");
+----
+
+=== Method 4: URI and Predicate as Message Body
+
+As a final example, the URI subscription method can be modified, once again, to eliminate the `predicate` parameter altogether, and supply the `Predicate` as the message body:
+
+.Example Subscribe Message Using URI and Predicate as Message Body
+[source,java]
+----
+// Register a bean, somehow, in the registry:
+Predicate predicate = PredicateBuilder.constant(true));
+template.sendBody("dynamic-router:control/subscribe/billing?subscriptionId=billingSubscription&priority=10&destinationUri=<URI here>", predicate);
+----
+
+=== Note: Subscription ID is Generated if Not Supplied
+
+For any of the URI subscription examples, the `subscriptionId` URI parameter may be omitted, and a subscription ID will be generated and returned as the message body.  It will have the form of a UUID string.  As an example:
+
+.Example Subscribe Message Using URI and Omitting Subscription ID
+[source,java]
+----
+Predicate predicate = PredicateBuilder.constant(true));
+// Capture the generated subscription ID from the returned message body:
+String generatedId = (String) template.sendBody("dynamic-router:control/subscribe/billing?priority=10&destinationUri=<URI here>", ExchangePattern.InOut, predicate);
+----
+
+The `generatedId` variable will contain the generated UUID string.  If you are going to unsubscribe, you will need this ID.
+
 == Unsubscribing
 
+=== Method 1: Unsubscribe Message POJO
+
 Participating recipients may unsubscribe by sending a `DynamicRouterControlMessage` to the `control` channel.  An example unsubscribe message might look like the following:
 
-.Example Unsubscribe Message
+.Example Unsubscribe Message Using POJO
 [source,java]
 ----
 // Send message to the Dynamic Router "billing" channel
@@ -138,9 +201,19 @@ template.sendBody("dynamic-router:control", unsubscribeMsg);
 
 The builder for an unsubscribe message only requires the id and channel name.
 
+=== Method 2: URI Only
+
+You can also use a simplified method for unsubscribing using URL parameters, if you wish.  The unsubscribe message, detailed above, can be sent via the control URI like this (with an empty body, since it will be ignored):
+
+.Example Unsubscribe Message Using URI
+[source,java]
+----
+template.sendBody("dynamic-router:control/unsubscribe/billing?subscriptionId=billingSubscription", "");
+----
+
 == The Dynamic Rule Base
 
-To determine if an exchange is suitable for any of the participants, all predicates for the participants that are subscribed to the channel are evaluated until the first result of "true" is found.  The exchange will be routed to the corresponding endpoint.  The rule base contains a default filter that is registered at the least priority (which is the highest integer number).  Like the "default" case of a switch statement in Java, any message that is not appropriate for any registered par [...]
+To determine if an exchange is suitable for any of the participants, all predicates for the participants that are subscribed to the channel are evaluated until the first result of "true" is found, by default.  If the Dynamic Router is configured with the `recipientMode` set to `allMatch`, then all recipients with matching predicates will be selected.  The exchange will be routed to the corresponding endpoint(s).  The rule base contains a default filter that is registered at the least pri [...]
 
 Rules are registered in a channel, and they are logically separate from rules in another channel.  Subscription IDs must be unique within a channel, although multiple subscriptions of the same name may coexist in a dynamic router instance if they are in separate channels.
 
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterComponent.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterComponent.java
index 75c9bd6..b1d4f9b 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterComponent.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterComponent.java
@@ -21,17 +21,14 @@ import java.util.Map;
 import java.util.function.Supplier;
 
 import org.apache.camel.Endpoint;
-import org.apache.camel.component.dynamicrouter.DynamicRouterConsumer.DynamicRouterConsumerFactory;
 import org.apache.camel.component.dynamicrouter.DynamicRouterControlChannelProcessor.DynamicRouterControlChannelProcessorFactory;
+import org.apache.camel.component.dynamicrouter.DynamicRouterControlProducer.DynamicRouterControlProducerFactory;
+import org.apache.camel.component.dynamicrouter.DynamicRouterProcessor.DynamicRouterProcessorFactory;
 import org.apache.camel.component.dynamicrouter.DynamicRouterProducer.DynamicRouterProducerFactory;
-import org.apache.camel.component.dynamicrouter.processor.DynamicRouterProcessor;
-import org.apache.camel.component.dynamicrouter.processor.DynamicRouterProcessor.DynamicRouterProcessorFactory;
-import org.apache.camel.component.dynamicrouter.processor.PrioritizedFilterProcessor;
-import org.apache.camel.component.dynamicrouter.processor.PrioritizedFilterProcessor.PrioritizedFilterProcessorFactory;
+import org.apache.camel.component.dynamicrouter.PrioritizedFilterProcessor.PrioritizedFilterProcessorFactory;
 import org.apache.camel.spi.annotations.Component;
 import org.apache.camel.support.DefaultComponent;
 import org.apache.camel.support.service.ServiceHelper;
-import org.apache.camel.util.StopWatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,9 +49,11 @@ public class DynamicRouterComponent extends DefaultComponent {
     private static final Logger LOG = LoggerFactory.getLogger(DynamicRouterComponent.class);
 
     /**
-     * The {@link DynamicRouterConsumer}s, mapped by their channel, for the Dynamic Router.
+     * The {@link DynamicRouterProcessor}s, mapped by their channel, for the Dynamic Router.
      */
-    private final Map<String, DynamicRouterConsumer> consumers = new HashMap<>();
+    private final transient Map<String, DynamicRouterProcessor> processors = new HashMap<>();
+
+    private DynamicRouterControlChannelProcessor controlChannelProcessor;
 
     /**
      * Creates a {@link DynamicRouterEndpoint} instance.
@@ -78,9 +77,10 @@ public class DynamicRouterComponent extends DefaultComponent {
     private Supplier<DynamicRouterProducerFactory> producerFactorySupplier = DynamicRouterProducerFactory::new;
 
     /**
-     * Creates a {@link DynamicRouterConsumer} instance.
+     * Creates a {@link DynamicRouterControlProducerFactory} instance.
      */
-    private Supplier<DynamicRouterConsumerFactory> consumerFactorySupplier = DynamicRouterConsumerFactory::new;
+    private Supplier<DynamicRouterControlProducerFactory> controlProducerFactorySupplier
+            = DynamicRouterControlProducerFactory::new;
 
     /**
      * Creates a {@link PrioritizedFilterProcessor} instance.
@@ -101,7 +101,7 @@ public class DynamicRouterComponent extends DefaultComponent {
      * @param processorFactorySupplier               creates the {@link DynamicRouterProcessor}
      * @param controlChannelProcessorFactorySupplier creates the {@link DynamicRouterControlChannelProcessor}
      * @param producerFactorySupplier                creates the {@link DynamicRouterProducer}
-     * @param consumerFactorySupplier                creates the {@link DynamicRouterConsumer}
+     * @param controlProducerFactorySupplier         creates the {@link DynamicRouterControlProducer}
      * @param filterProcessorFactorySupplier         creates the {@link PrioritizedFilterProcessor}
      */
     public DynamicRouterComponent(
@@ -109,13 +109,13 @@ public class DynamicRouterComponent extends DefaultComponent {
                                   final Supplier<DynamicRouterProcessorFactory> processorFactorySupplier,
                                   final Supplier<DynamicRouterControlChannelProcessorFactory> controlChannelProcessorFactorySupplier,
                                   final Supplier<DynamicRouterProducerFactory> producerFactorySupplier,
-                                  final Supplier<DynamicRouterConsumerFactory> consumerFactorySupplier,
+                                  final Supplier<DynamicRouterControlProducerFactory> controlProducerFactorySupplier,
                                   final Supplier<PrioritizedFilterProcessorFactory> filterProcessorFactorySupplier) {
         this.endpointFactorySupplier = endpointFactorySupplier;
         this.processorFactorySupplier = processorFactorySupplier;
         this.controlChannelProcessorFactorySupplier = controlChannelProcessorFactorySupplier;
         this.producerFactorySupplier = producerFactorySupplier;
-        this.consumerFactorySupplier = consumerFactorySupplier;
+        this.controlProducerFactorySupplier = controlProducerFactorySupplier;
         this.filterProcessorFactorySupplier = filterProcessorFactorySupplier;
         LOG.debug("Created Dynamic Router component");
     }
@@ -131,20 +131,22 @@ public class DynamicRouterComponent extends DefaultComponent {
     @Override
     protected Endpoint createEndpoint(final String uri, final String remaining, final Map<String, Object> parameters)
             throws Exception {
+        DynamicRouterConfiguration configuration = new DynamicRouterConfiguration();
+        configuration.parsePath(remaining);
         DynamicRouterEndpoint endpoint;
-        if (remaining == null) {
+        if (remaining == null || remaining.isBlank()) {
             throw new IllegalArgumentException("You must provide a channel for the Dynamic Router");
         }
-        if (remaining.equals(CONTROL_CHANNEL_NAME)) {
+        if (remaining.startsWith(CONTROL_CHANNEL_NAME)) {
             endpoint = endpointFactorySupplier.get()
-                    .getInstance(uri, remaining, this, controlChannelProcessorFactorySupplier,
-                            producerFactorySupplier, consumerFactorySupplier);
+                    .getInstance(uri, this, configuration, controlChannelProcessorFactorySupplier,
+                            controlProducerFactorySupplier);
         } else {
             endpoint = endpointFactorySupplier.get()
-                    .getInstance(uri, remaining, this, processorFactorySupplier, producerFactorySupplier,
-                            consumerFactorySupplier, filterProcessorFactorySupplier);
-            setProperties(endpoint, parameters);
+                    .getInstance(uri, this, configuration, processorFactorySupplier, producerFactorySupplier,
+                            filterProcessorFactorySupplier);
         }
+        setProperties(endpoint, parameters);
         return endpoint;
     }
 
@@ -155,71 +157,50 @@ public class DynamicRouterComponent extends DefaultComponent {
      */
     @Override
     protected void doShutdown() throws Exception {
-        ServiceHelper.stopAndShutdownService(consumers);
-        consumers.clear();
+        ServiceHelper.stopAndShutdownService(processors);
+        processors.clear();
         super.doShutdown();
     }
 
     /**
-     * Adds a consumer to the map. Only one consumer can be registered for a given channel.
+     * If no processor has been added for the supplied channel, then add the supplied processor.
      *
-     * @param channel  the channel of the consumer to register
-     * @param consumer the consumer to register
+     * @param channel   the channel to add the processor to
+     * @param processor the processor to add for the channel
      */
-    public void addConsumer(final String channel, final DynamicRouterConsumer consumer) {
-        synchronized (consumers) {
-            if (consumers.putIfAbsent(channel, consumer) != null) {
-                throw new IllegalArgumentException(
-                        String.format(
-                                "Cannot add a 2nd consumer to the same endpoint: %s. Dynamic Router only allows one consumer per channel.",
-                                channel));
-            }
-            consumers.notifyAll();
+    void addRoutingProcessor(final String channel, final DynamicRouterProcessor processor) {
+        if (processors.putIfAbsent(channel, processor) != null) {
+            throw new IllegalArgumentException(
+                    "Dynamic Router can have only one processor per channel; channel '" + channel
+                                               + "' already has a processor");
         }
     }
 
     /**
-     * Remove the supplied consumer registered for the supplied channel.
+     * Get the processor for the given channel.
      *
-     * @param channel  channel of the consumer to remove
-     * @param consumer consumer to remove
+     * @param  channel the channel to get the processor for
+     * @return         the processor for the given channel
      */
-    public void removeConsumer(final String channel, final DynamicRouterConsumer consumer) {
-        synchronized (consumers) {
-            consumers.remove(channel, consumer);
-            consumers.notifyAll();
-        }
+    public DynamicRouterProcessor getRoutingProcessor(final String channel) {
+        return processors.get(channel);
     }
 
     /**
-     * Get the consumer indicated by the supplied channel.
+     * Sets the control channel processor.
      *
-     * @param  channel channel of the consumer to get
-     * @return         the consumer indicated by the supplied channel
+     * @param controlChannelProcessor the control channel processor
      */
-    public DynamicRouterConsumer getConsumer(final String channel) {
-        return consumers.get(channel);
+    void setControlChannelProcessor(final DynamicRouterControlChannelProcessor controlChannelProcessor) {
+        this.controlChannelProcessor = controlChannelProcessor;
     }
 
     /**
-     * Get the consumer indicated by the supplied channel.
+     * Gets the control channel processor.
      *
-     * @param  channel channel of the consumer to get
-     * @param  block   if the call to get the consumer should block if not found
-     * @return         the consumer indicated by the supplied channel
+     * @return the control channel processor
      */
-    public DynamicRouterConsumer getConsumer(final String channel, final boolean block, final long timeout)
-            throws InterruptedException {
-        synchronized (consumers) {
-            long remaining = timeout;
-            StopWatch watch = new StopWatch();
-            DynamicRouterConsumer consumer = getConsumer(channel);
-            while (consumer == null && remaining > 0 && block) {
-                consumers.wait(remaining);
-                remaining -= watch.taken();
-                consumer = getConsumer(channel);
-            }
-            return consumer;
-        }
+    DynamicRouterControlChannelProcessor getControlChannelProcessor() {
+        return this.controlChannelProcessor;
     }
 }
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterConfiguration.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterConfiguration.java
new file mode 100644
index 0000000..322d76a
--- /dev/null
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterConfiguration.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.dynamicrouter;
+
+import java.util.Optional;
+import java.util.regex.Matcher;
+
+import org.apache.camel.Predicate;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+import org.apache.camel.spi.UriPath;
+
+import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.ACTION_GROUP;
+import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.CHANNEL_GROUP;
+import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.MODE_ALL_MATCH;
+import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.MODE_FIRST_MATCH;
+import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.PATH_PARAMS_PATTERN;
+import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.SUBSCRIBE_GROUP;
+
+/**
+ * This class encapsulates all configuration items for the Dynamic Router EIP component.
+ */
+@UriParams
+public class DynamicRouterConfiguration {
+
+    /**
+     * Channel for the Dynamic Router. For example, if the Dynamic Router URI is "dynamic-router://test", then the
+     * channel is "test". Channels are a way of keeping routing participants, their rules, and exchanges logically
+     * separate from the participants, rules, and exchanges on other channels. This can be seen as analogous to VLANs in
+     * networking.
+     */
+    @UriPath(name = "channel", label = "common", description = "Channel of the Dynamic Router")
+    @Metadata(required = true)
+    private String channel;
+
+    /**
+     * When sending messages to the control channel without using a {@link DynamicRouterControlMessage}, specify the
+     * action (subscribe or unsubscribe) by using this URI path variable.
+     */
+    @UriPath(name = "controlAction", label = "control", enums = "subscribe,unsubscribe",
+             description = "Control channel action: subscribe or unsubscribe")
+    @Metadata
+    private String controlAction;
+
+    /**
+     * When sending messages to the control channel without using a {@link DynamicRouterControlMessage}, specify the
+     * subscribe channel by using this URI path variable.
+     */
+    @UriPath(name = "subscribeChannel", label = "control", description = "The channel to subscribe to")
+    @Metadata
+    private String subscribeChannel;
+
+    /**
+     * When sending messages to the control channel without using a {@link DynamicRouterControlMessage}, specify the
+     * subscription ID by using this URI param. If one is not supplied, one will be generated and returned.
+     */
+    @UriParam(label = "control", description = "The subscription ID; if unspecified, one will be assigned and returned.")
+    private String subscriptionId;
+
+    /**
+     * When sending messages to the control channel without using a {@link DynamicRouterControlMessage}, specify the
+     * destination URI by using this URI param.
+     */
+    @UriParam(label = "control", description = "The destination URI for exchanges that match.")
+    private String destinationUri;
+
+    /**
+     * When sending messages to the control channel without using a {@link DynamicRouterControlMessage}, specify the
+     * subscription priority by using this URI param. Lower numbers have higher priority.
+     */
+    @UriParam(label = "control", description = "The subscription priority.")
+    private Integer priority;
+
+    /**
+     * When sending messages to the control channel without using a {@link DynamicRouterControlMessage}, specify the
+     * {@link Predicate} by using this URI param. Only predicates that can be expressed as a string (e.g., using the
+     * Simple language) can be specified via URI param. Other types must be sent via control channel POJO or as the
+     * message body.
+     */
+    @UriParam(label = "control", description = "The subscription predicate.")
+    private String predicate;
+
+    /**
+     * When sending messages to the control channel without using a {@link DynamicRouterControlMessage}, specify the
+     * {@link Predicate} by using this URI param. Specify the predicate with bean syntax; i.e., #bean:someBeanId
+     */
+    @UriParam(label = "control", description = "A Predicate instance in the registry.", javaType = "org.apache.camel.Predicate")
+    private Predicate predicateBean;
+
+    /**
+     * When sending messages to the control channel without using a {@link DynamicRouterControlMessage}, specify the
+     * expression language for creating the {@link Predicate} by using this URI param. The default is "simple".
+     */
+    @UriParam(label = "control", defaultValue = "simple", description = "The subscription predicate language.")
+    private String expressionLanguage = "simple";
+
+    /**
+     * Sets the behavior of the Dynamic Router when routing participants are selected to receive an incoming exchange.
+     * If the mode is "firstMatch", then the exchange is routed only to the first participant that has a matching
+     * predicate. If the mode is "allMatch", then the exchange is routed to all participants that have a matching
+     * predicate.
+     */
+    @UriParam(label = "common", defaultValue = MODE_FIRST_MATCH, enums = MODE_FIRST_MATCH + "," + MODE_ALL_MATCH,
+              description = "Recipient mode: firstMatch or allMatch")
+    private String recipientMode = MODE_FIRST_MATCH;
+
+    /**
+     * Flag to ensure synchronous processing.
+     */
+    @UriParam(label = "common", defaultValue = "false")
+    private boolean synchronous;
+
+    /**
+     * Flag to log a warning if no predicates match for an exchange.
+     */
+    @UriParam(label = "common", defaultValue = "false")
+    private boolean warnDroppedMessage;
+
+    /**
+     * Channel for the Dynamic Router. For example, if the Dynamic Router URI is "dynamic-router://test", then the
+     * channel is "test". Channels are a way of keeping routing participants, their rules, and exchanges logically
+     * separate from the participants, rules, and exchanges on other channels. This can be seen as analogous to VLANs in
+     * networking.
+     *
+     * @return the channel
+     */
+    public String getChannel() {
+        return channel;
+    }
+
+    /**
+     * Channel for the Dynamic Router. For example, if the Dynamic Router URI is "dynamic-router://test", then the
+     * channel is "test". Channels are a way of keeping routing participants, their rules, and exchanges logically
+     * separate from the participants, rules, and exchanges on other channels. This can be seen as analogous to VLANs in
+     * networking.
+     *
+     * @param channel the channel
+     */
+    public void setChannel(final String channel) {
+        this.channel = channel;
+    }
+
+    /**
+     * When sending messages to the control channel without using a {@link DynamicRouterControlMessage}, specify the
+     * action (subscribe or unsubscribe) by using this URI path variable.
+     *
+     * @return the control action -- subscribe or unsubscribe
+     */
+    public String getControlAction() {
+        return controlAction;
+    }
+
+    /**
+     * When sending messages to the control channel without using a {@link DynamicRouterControlMessage}, specify the
+     * action (subscribe or unsubscribe) by using this URI path variable.
+     *
+     * @param controlAction the control action -- subscribe or unsubscribe
+     */
+    public void setControlAction(final String controlAction) {
+        this.controlAction = controlAction;
+    }
+
+    /**
+     * When sending messages to the control channel without using a {@link DynamicRouterControlMessage}, specify the
+     * subscribe channel by using this URI path variable.
+     *
+     * @return subscribe channel name
+     */
+    public String getSubscribeChannel() {
+        return subscribeChannel;
+    }
+
+    /**
+     * When sending messages to the control channel without using a {@link DynamicRouterControlMessage}, specify the
+     * subscribe channel by using this URI path variable.
+     *
+     * @param subscribeChannel subscribe channel name
+     */
+    public void setSubscribeChannel(final String subscribeChannel) {
+        this.subscribeChannel = subscribeChannel;
+    }
+
+    /**
+     * When sending messages to the control channel without using a {@link DynamicRouterControlMessage}, specify the
+     * subscription ID by using this URI param. If one is not supplied, one will be generated and returned.
+     *
+     * @return the subscription ID
+     */
+    public String getSubscriptionId() {
+        return subscriptionId;
+    }
+
+    /**
+     * When sending messages to the control channel without using a {@link DynamicRouterControlMessage}, specify the
+     * subscription ID by using this URI param. If one is not supplied, one will be generated and returned.
+     *
+     * @param subscriptionId the subscription ID
+     */
+    public void setSubscriptionId(final String subscriptionId) {
+        this.subscriptionId = subscriptionId;
+    }
+
+    /**
+     * When sending messages to the control channel without using a {@link DynamicRouterControlMessage}, specify the
+     * destination URI by using this URI param.
+     *
+     * @return the destination URI
+     */
+    public String getDestinationUri() {
+        return destinationUri;
+    }
+
+    /**
+     * When sending messages to the control channel without using a {@link DynamicRouterControlMessage}, specify the
+     * destination URI by using this URI param.
+     *
+     * @param destinationUri the destination URI
+     */
+    public void setDestinationUri(final String destinationUri) {
+        this.destinationUri = destinationUri;
+    }
+
+    /**
+     * When sending messages to the control channel without using a {@link DynamicRouterControlMessage}, specify the
+     * subscription priority by using this URI param. Lower numbers have higher priority.
+     *
+     * @return the subscription priority
+     */
+    public Integer getPriority() {
+        return priority;
+    }
+
+    /**
+     * When sending messages to the control channel without using a {@link DynamicRouterControlMessage}, specify the
+     * subscription priority by using this URI param. Lower numbers have higher priority.
+     *
+     * @param priority the subscription priority
+     */
+    public void setPriority(Integer priority) {
+        this.priority = priority;
+    }
+
+    /**
+     * When sending messages to the control channel without using a {@link DynamicRouterControlMessage}, specify the
+     * {@link Predicate} by using this URI param. Only predicates that can be expressed as a string (e.g., using the
+     * Simple language) can be specified via URI param. Other types must be sent via control channel POJO or as the
+     * message body.
+     *
+     * @return the predicate for evaluating exchanges
+     */
+    public String getPredicate() {
+        return predicate;
+    }
+
+    /**
+     * When sending messages to the control channel without using a {@link DynamicRouterControlMessage}, specify the
+     * {@link Predicate} by using this URI param. Only predicates that can be expressed as a string (e.g., using the
+     * Simple language) can be specified via URI param. Other types must be sent via control channel POJO or as the
+     * message body.
+     *
+     * @param predicate the predicate for evaluating exchanges
+     */
+    public void setPredicate(final String predicate) {
+        this.predicate = predicate;
+    }
+
+    /**
+     * When sending messages to the control channel without using a {@link DynamicRouterControlMessage}, specify the
+     * {@link Predicate} by using this URI param. Only predicates that can be expressed as a string (e.g., using the
+     * Simple language) can be specified via URI param. Other types must be sent via control channel POJO or as the
+     * message body.
+     *
+     * @return the predicate for evaluating exchanges
+     */
+    public Predicate getPredicateBean() {
+        return predicateBean;
+    }
+
+    /**
+     * When sending messages to the control channel without using a {@link DynamicRouterControlMessage}, specify the
+     * {@link Predicate} by using this URI param. Only predicates that can be expressed as a string (e.g., using the
+     * Simple language) can be specified via URI param. Other types must be sent via control channel POJO or as the
+     * message body.
+     *
+     * @param predicateBean the predicate for evaluating exchanges
+     */
+    public void setPredicateBean(final Predicate predicateBean) {
+        this.predicateBean = predicateBean;
+    }
+
+    /**
+     * When sending messages to the control channel without using a {@link DynamicRouterControlMessage}, specify the
+     * expression language for creating the {@link Predicate} by using this URI param. The default is "simple".
+     *
+     * @return the expression language name
+     */
+    public String getExpressionLanguage() {
+        return expressionLanguage;
+    }
+
+    /**
+     * When sending messages to the control channel without using a {@link DynamicRouterControlMessage}, specify the
+     * expression language for creating the {@link Predicate} by using this URI param. The default is "simple".
+     *
+     * @param expressionLanguage the expression language name
+     */
+    public void setExpressionLanguage(final String expressionLanguage) {
+        this.expressionLanguage = expressionLanguage;
+    }
+
+    /**
+     * Gets the behavior of the Dynamic Router when routing participants are selected to receive an incoming exchange.
+     * If the mode is "firstMatch", then the exchange is routed only to the first participant that has a matching
+     * predicate. If the mode is "allMatch", then the exchange is routed to all participants that have a matching
+     * predicate.
+     *
+     * @return the recipient mode
+     */
+    public String getRecipientMode() {
+        return recipientMode;
+    }
+
+    /**
+     * Sets the behavior of the Dynamic Router when routing participants are selected to receive an incoming exchange.
+     * If the mode is "firstMatch", then the exchange is routed only to the first participant that has a matching
+     * predicate. If the mode is "allMatch", then the exchange is routed to all participants that have a matching
+     * predicate.
+     *
+     * @param recipientMode the recipient mode
+     */
+    public void setRecipientMode(final String recipientMode) {
+        this.recipientMode = recipientMode;
+    }
+
+    /**
+     * Flag to ensure synchronous processing.
+     *
+     * @return true if processing will be synchronous, false otherwise
+     */
+    public boolean isSynchronous() {
+        return synchronous;
+    }
+
+    /**
+     * Flag to ensure synchronous processing.
+     *
+     * @param synchronous flag if processing will be synchronous
+     */
+    public void setSynchronous(boolean synchronous) {
+        this.synchronous = synchronous;
+    }
+
+    /**
+     * Flag to log a warning if no predicates match for an exchange.
+     *
+     * @return true if logging a warning when no predicates match an exchange, false otherwise
+     */
+    public boolean isWarnDroppedMessage() {
+        return warnDroppedMessage;
+    }
+
+    /**
+     * Flag to log a warning if no predicates match for an exchange.
+     *
+     * @param warnDroppedMessage flag to log warnings when no predicates match
+     */
+    public void setWarnDroppedMessage(boolean warnDroppedMessage) {
+        this.warnDroppedMessage = warnDroppedMessage;
+    }
+
+    /**
+     * Parse the URI path for configuration parameters.
+     *
+     * @param path the URI path to parse
+     */
+    public void parsePath(final String path) {
+        Optional.ofNullable(path)
+                .map(s -> s.isEmpty() ? null : s)
+                .ifPresent(p -> {
+                    final Matcher matcher = PATH_PARAMS_PATTERN.matcher(p);
+                    boolean matches = matcher.matches();
+                    if (matches) {
+                        setChannel(matcher.group(CHANNEL_GROUP));
+                        setControlAction(matcher.group(ACTION_GROUP));
+                        setSubscribeChannel(matcher.group(SUBSCRIBE_GROUP));
+                    } else {
+                        throw new IllegalArgumentException("Illegal syntax for a Dynamic Router URI: " + path);
+                    }
+                });
+    }
+}
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterConstants.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterConstants.java
index 4d5ee7c..8b6efc9 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterConstants.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterConstants.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.dynamicrouter;
 
+import java.util.regex.Pattern;
+
 /**
  * Contains constants that are used within this component.
  */
@@ -27,7 +29,7 @@ public abstract class DynamicRouterConstants {
     public static final String FIRST_VERSION = "3.15.0";
 
     /**
-     * The component name/scheme for the {@link DynamicRouterComponent}.
+     * The component name/scheme for the {@link DynamicRouterEndpoint}.
      */
     public static final String COMPONENT_SCHEME = "dynamic-router";
 
@@ -47,7 +49,64 @@ public abstract class DynamicRouterConstants {
     public static final String TITLE = "Dynamic Router";
 
     /**
+     * The mode for sending an exchange to recipients: send only to the first match.
+     */
+    public static final String MODE_FIRST_MATCH = "firstMatch";
+
+    /**
+     * The mode for sending an exchange to recipients: send to all matching.
+     */
+    public static final String MODE_ALL_MATCH = "allMatch";
+
+    /**
      * The syntax, for the auto-generated documentation.
      */
     public static final String SYNTAX = COMPONENT_SCHEME + ":channel";
+
+    /**
+     * Name of the control action parameter.
+     */
+    public static final String CONTROL_ACTION_PARAM = "controlAction";
+
+    /**
+     * Name of the channel parameter.
+     */
+    public static final String SUBSCRIPTION_CHANNEL_PARAM = "subscribeChannel";
+
+    /**
+     * The alternate control-channel syntax.
+     */
+    public static final String CONTROL_SYNTAX
+            = SYNTAX + "/" + CONTROL_ACTION_PARAM + "/" + SUBSCRIPTION_CHANNEL_PARAM;
+
+    /**
+     * Subscribe control channel action.
+     */
+    public static final String CONTROL_ACTION_SUBSCRIBE = "subscribe";
+
+    /**
+     * Unsubscribe control channel action.
+     */
+    public static final String CONTROL_ACTION_UNSUBSCRIBE = "unsubscribe";
+
+    /**
+     * The name for the regex capture group that captures the channel name.
+     */
+    public static final String CHANNEL_GROUP = "channel";
+
+    /**
+     * The name for the regex capture group that captures the control channel action.
+     */
+    public static final String ACTION_GROUP = "action";
+
+    /**
+     * The name for the regex capture group that captures the channel name for the subscription.
+     */
+    public static final String SUBSCRIBE_GROUP = "subscribe";
+
+    /**
+     * Regular expression to parse URI path parameters.
+     */
+    public static final Pattern PATH_PARAMS_PATTERN = Pattern.compile(
+            String.format("(?<%s>[^/]+)(/(?<%s>[^/]+)/(?<%s>[^/]+))?", CHANNEL_GROUP, ACTION_GROUP, SUBSCRIBE_GROUP));
 }
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterConsumer.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterConsumer.java
deleted file mode 100644
index d4041f4..0000000
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterConsumer.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.dynamicrouter;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.ShutdownRunningTask;
-import org.apache.camel.Suspendable;
-import org.apache.camel.spi.ShutdownAware;
-import org.apache.camel.support.DefaultConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The Dynamic Router {@link org.apache.camel.Consumer}.
- */
-public class DynamicRouterConsumer extends DefaultConsumer implements ShutdownAware, Suspendable {
-
-    private static final Logger LOG = LoggerFactory.getLogger(DynamicRouterConsumer.class);
-
-    /**
-     * The channel of the Dynamic Router. For example, if the Dynamic Router URI is "dynamic-router://test", then the
-     * channel is "test".
-     */
-    private final String channel;
-
-    /**
-     * Create the Dynamic Router {@link org.apache.camel.Consumer} with the supplied {@link org.apache.camel.Endpoint}
-     * and {@link Processor}.
-     *
-     * @param endpoint  the Dynamic Router endpoint
-     * @param processor the Dynamic Router processor
-     * @param channel   the channel of the Dynamic Router
-     */
-    public DynamicRouterConsumer(final DynamicRouterEndpoint endpoint, final Processor processor, final String channel) {
-        super(endpoint, processor);
-        this.channel = channel;
-        LOG.debug("Created DynamicRouter consumer for endpoint URI: {}", endpoint.getEndpointUri());
-    }
-
-    private DynamicRouterComponent getComponent() {
-        return (DynamicRouterComponent) getEndpoint().getComponent();
-    }
-
-    /**
-     * Get the Dynamic Router {@link org.apache.camel.Endpoint}.
-     *
-     * @return the {@link DynamicRouterEndpoint}
-     */
-    @Override
-    public DynamicRouterEndpoint getEndpoint() {
-        return (DynamicRouterEndpoint) super.getEndpoint();
-    }
-
-    /**
-     * Manages the "start" lifecycle phase.
-     *
-     * @throws Exception if there is a problem starting
-     */
-    @Override
-    protected void doStart() throws Exception {
-        super.doStart();
-        getComponent().addConsumer(channel, this);
-    }
-
-    /**
-     * Manages the "stop" lifecycle phase.
-     *
-     * @throws Exception if there is a problem stopping
-     */
-    @Override
-    protected void doStop() throws Exception {
-        getComponent().removeConsumer(channel, this);
-        super.doStop();
-    }
-
-    /**
-     * Manages the "suspend" lifecycle phase.
-     */
-    @Override
-    protected void doSuspend() {
-        getComponent().removeConsumer(channel, this);
-    }
-
-    /**
-     * Manages the "resume" lifecycle phase.
-     */
-    @Override
-    protected void doResume() {
-        getComponent().addConsumer(channel, this);
-    }
-
-    /**
-     * To defer shutdown during first phase of shutdown. This allows any pending exchanges to be completed and,
-     * therefore, ensure a graceful shutdown without losing messages. At the very end, when there are no more inflight
-     * and pending messages, the consumer may safely be shut down.
-     *
-     * @param  shutdownRunningTask the configured option for how to act when shutting down running tasks
-     * @return                     <tt>true</tt> to defer shutdown until the last exchange completes
-     */
-    @Override
-    public boolean deferShutdown(final ShutdownRunningTask shutdownRunningTask) {
-        return true;
-    }
-
-    /**
-     * Gets the number of pending exchanges. Some consumers have internal queues with {@link Exchange}s that are
-     * pending. Returning <tt>zero</tt> indicates that there are zero pending exchanges and, therefore, this consumer is
-     * ready to be shut down.
-     *
-     * @return number of pending exchanges
-     */
-    @Override
-    public int getPendingExchangesSize() {
-        return 0;
-    }
-
-    /**
-     * Prepares for stop/shutdown.
-     *
-     * @param suspendOnly <tt>true</tt> if the intention is to only suspend the service, and not stop/shutdown the
-     *                    service.
-     * @param forced      <tt>true</tt> to force a more aggressive shutdown, <tt>false</tt> to gracefully prepare to
-     *                    shut down
-     */
-    @Override
-    public void prepareShutdown(final boolean suspendOnly, final boolean forced) {
-        // noop
-    }
-
-    /**
-     * Create a {@link DynamicRouterConsumer} instance.
-     */
-    public static class DynamicRouterConsumerFactory {
-
-        /**
-         * Create the Dynamic Router {@link org.apache.camel.Consumer} with the supplied
-         * {@link org.apache.camel.Endpoint} and {@link Processor}.
-         *
-         * @param endpoint  the Dynamic Router endpoint
-         * @param processor the Dynamic Router processor
-         * @param channel   the channel of the Dynamic Router
-         */
-        public DynamicRouterConsumer getInstance(
-                final DynamicRouterEndpoint endpoint,
-                final Processor processor,
-                final String channel) {
-            return new DynamicRouterConsumer(endpoint, processor, channel);
-        }
-    }
-}
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterConsumerNotAvailableException.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterConsumerNotAvailableException.java
deleted file mode 100644
index afd2791..0000000
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterConsumerNotAvailableException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.dynamicrouter;
-
-import org.apache.camel.CamelExchangeException;
-import org.apache.camel.Exchange;
-
-/**
- * Exception thrown when no consumers are available.
- */
-public class DynamicRouterConsumerNotAvailableException extends CamelExchangeException {
-
-    private static final long serialVersionUID = 1L;
-
-    public DynamicRouterConsumerNotAvailableException(String message, Exchange exchange) {
-        super(message, exchange);
-    }
-}
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterControlChannelProcessor.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterControlChannelProcessor.java
index bfef8fd..81f6df8 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterControlChannelProcessor.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterControlChannelProcessor.java
@@ -17,15 +17,21 @@
 package org.apache.camel.component.dynamicrouter;
 
 import java.util.Optional;
+import java.util.UUID;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
-import org.apache.camel.component.dynamicrouter.message.DynamicRouterControlMessage;
-import org.apache.camel.component.dynamicrouter.processor.DynamicRouterProcessor;
+import org.apache.camel.Predicate;
+import org.apache.camel.component.dynamicrouter.DynamicRouterControlMessage.SubscribeMessageBuilder;
+import org.apache.camel.component.dynamicrouter.DynamicRouterControlMessage.UnsubscribeMessageBuilder;
+import org.apache.camel.spi.Language;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.CONTROL_ACTION_SUBSCRIBE;
+import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.CONTROL_ACTION_UNSUBSCRIBE;
+
 /**
  * Processes {@link DynamicRouterControlMessage}s on the specialized control channel.
  */
@@ -37,6 +43,11 @@ public class DynamicRouterControlChannelProcessor extends AsyncProcessorSupport
     private static final Logger LOG = LoggerFactory.getLogger(DynamicRouterControlChannelProcessor.class);
 
     /**
+     * The configuration for the Dynamic Router.
+     */
+    private DynamicRouterConfiguration configuration;
+
+    /**
      * The {@link DynamicRouterComponent} that this instance processes {@link DynamicRouterControlMessage}s for.
      */
     private final DynamicRouterComponent component;
@@ -52,6 +63,84 @@ public class DynamicRouterControlChannelProcessor extends AsyncProcessorSupport
     }
 
     /**
+     * Tries to obtain the subscription {@link Predicate}. First, it looks for an expression from the URI parameters. If
+     * that is not available, it checks the message body for the {@link Predicate}.
+     *
+     * @param  body body to inspect for a {@link Predicate} if the URI parameters does not have an expression
+     * @return      the {@link Predicate}
+     */
+    Predicate obtainPredicate(final Object body) {
+        Predicate predicate;
+        final String exLang = configuration.getExpressionLanguage();
+        final String value = configuration.getPredicate();
+        final Predicate bean = configuration.getPredicateBean();
+        if (bean != null) {
+            predicate = bean;
+        } else if (value != null && !value.isEmpty() && exLang != null && !exLang.isEmpty()) {
+            try {
+                Language language = component.getCamelContext().resolveLanguage(exLang);
+                predicate = language.createPredicate(value);
+            } catch (Exception e) {
+                String message = String.format(
+                        "Language '%s' and predicate expression '%s' could not create a valid predicate", exLang, value);
+                throw new IllegalArgumentException(message, e);
+            }
+        } else {
+            if (Predicate.class.isAssignableFrom(body.getClass())) {
+                predicate = (Predicate) body;
+            } else {
+                throw new IllegalArgumentException(
+                        "Subscription predicate must be provided either by an expression in" +
+                                                   "the URI, as the message body, or as a property in a control channel" +
+                                                   "message");
+            }
+        }
+        return predicate;
+    }
+
+    /**
+     * Process the {@link Exchange} for a {@link DynamicRouterControlMessage}, either as URI parameters, or as the
+     * message body. If the control parameters are specified in the URI, then a {@link DynamicRouterControlMessage} is
+     * created from the values.
+     *
+     * @param  exchange the {@link Exchange} to process
+     * @return          the {@link DynamicRouterControlMessage}
+     */
+    DynamicRouterControlMessage handleControlMessage(final Exchange exchange) {
+        final String controlAction = configuration.getControlAction();
+        DynamicRouterControlMessage controlMessage;
+        final Object body = exchange.getIn().getBody();
+        if (controlAction != null && !controlAction.isEmpty()) {
+            switch (controlAction) {
+                case CONTROL_ACTION_UNSUBSCRIBE:
+                    controlMessage = new UnsubscribeMessageBuilder()
+                            .channel(configuration.getSubscribeChannel())
+                            .id(configuration.getSubscriptionId())
+                            .build();
+                    break;
+                case CONTROL_ACTION_SUBSCRIBE:
+                    final String subscriptionId = configuration.getSubscriptionId() == null
+                            ? UUID.randomUUID().toString() : configuration.getSubscriptionId();
+                    controlMessage = new SubscribeMessageBuilder()
+                            .channel(configuration.getSubscribeChannel())
+                            .id(subscriptionId)
+                            .endpointUri(configuration.getDestinationUri())
+                            .priority(configuration.getPriority())
+                            .predicate(obtainPredicate(body))
+                            .build();
+                    break;
+                default:
+                    throw new IllegalArgumentException("Illegal control channel action: " + controlAction);
+            }
+        } else if (DynamicRouterControlMessage.class.isAssignableFrom(body.getClass())) {
+            controlMessage = (DynamicRouterControlMessage) body;
+        } else {
+            throw new IllegalArgumentException("Could not create or find a control channel message");
+        }
+        return controlMessage;
+    }
+
+    /**
      * When a {@link DynamicRouterControlMessage} is received, it is processed, depending on the
      * {@link DynamicRouterControlMessage#getMessageType()}: if the type is
      * {@link DynamicRouterControlMessage.ControlMessageType#SUBSCRIBE}, then create the
@@ -66,27 +155,36 @@ public class DynamicRouterControlChannelProcessor extends AsyncProcessorSupport
     @Override
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
         LOG.debug("Received control channel message");
-        final DynamicRouterControlMessage controlMessage = exchange.getIn().getBody(DynamicRouterControlMessage.class);
-        final DynamicRouterProcessor processor = Optional.ofNullable(component.getConsumer(controlMessage.getChannel()))
-                .map(DynamicRouterConsumer::getProcessor)
-                .map(DynamicRouterProcessor.class::cast)
+        DynamicRouterControlMessage controlMessage = handleControlMessage(exchange);
+        final DynamicRouterProcessor processor = Optional.ofNullable(component.getRoutingProcessor(controlMessage.getChannel()))
                 .orElseThrow(() -> new IllegalArgumentException(
                         "Control channel message is invalid: wrong channel, or no processors present."));
         switch (controlMessage.getMessageType()) {
             case SUBSCRIBE:
                 processor.addFilter(controlMessage);
+                exchange.getIn().setBody(controlMessage.getId(), String.class);
                 break;
             case UNSUBSCRIBE:
                 processor.removeFilter(controlMessage.getId());
                 break;
             default:
-                throw new IllegalArgumentException("You can only subscribe or unsubscribe for dynamic routing");
+                // Cannot get here due to enum
+                break;
         }
         callback.done(true);
         return true;
     }
 
     /**
+     * Set the {@link DynamicRouterConfiguration}.
+     *
+     * @param configuration the {@link DynamicRouterConfiguration}
+     */
+    public void setConfiguration(final DynamicRouterConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    /**
      * Create a {@link DynamicRouterControlChannelProcessor} instance.
      */
     public static class DynamicRouterControlChannelProcessorFactory {
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/message/DynamicRouterControlMessage.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterControlMessage.java
similarity index 99%
rename from components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/message/DynamicRouterControlMessage.java
rename to components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterControlMessage.java
index 4e91131..0d109dc 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/message/DynamicRouterControlMessage.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterControlMessage.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.dynamicrouter.message;
+package org.apache.camel.component.dynamicrouter;
 
 import java.io.Serializable;
 
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterControlProducer.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterControlProducer.java
new file mode 100644
index 0000000..2e9b61d
--- /dev/null
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterControlProducer.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.dynamicrouter;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.DefaultAsyncProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link org.apache.camel.Producer} implementation to process control channel messages for the Dynamic Router.
+ */
+public class DynamicRouterControlProducer extends DefaultAsyncProducer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DynamicRouterControlProducer.class);
+
+    /**
+     * The {@link org.apache.camel.Endpoint} for the Dynamic Router instance.
+     */
+    private final DynamicRouterEndpoint endpoint;
+
+    /**
+     * The configuration for the Dynamic Router.
+     */
+    private final DynamicRouterConfiguration configuration;
+
+    /**
+     * Create the {@link org.apache.camel.Producer} for the Dynamic Router with the supplied
+     * {@link org.apache.camel.Endpoint} URI.
+     *
+     * @param endpoint the {@link DynamicRouterEndpoint}
+     */
+    public DynamicRouterControlProducer(final DynamicRouterEndpoint endpoint) {
+        super(endpoint);
+        this.endpoint = endpoint;
+        this.configuration = endpoint.getConfiguration();
+        LOG.debug("Created producer for endpoint '{}', channel '{}'", endpoint.getEndpointUri(), configuration.getChannel());
+    }
+
+    /**
+     * Get the {@link DynamicRouterComponent} from the {@link DynamicRouterEndpoint}.
+     *
+     * @return the {@link DynamicRouterComponent}
+     */
+    private DynamicRouterComponent getComponent() {
+        return endpoint.getDynamicRouterComponent();
+    }
+
+    /**
+     * Process the exchange.
+     *
+     * @param  exchange  the exchange to process
+     * @throws Exception if the consumer has a problem
+     */
+    @Override
+    public void process(final Exchange exchange) throws Exception {
+        getComponent().getControlChannelProcessor().process(exchange);
+    }
+
+    /**
+     * Process the exchange, and use the {@link AsyncCallback} to signal completion.
+     *
+     * @param  exchange the exchange to process
+     * @param  callback the {@link AsyncCallback} to signal when asynchronous processing has completed
+     * @return          true to continue to execute synchronously, or false to continue to execute asynchronously
+     */
+    @Override
+    public boolean process(final Exchange exchange, final AsyncCallback callback) {
+        try {
+            // we may be forced synchronous
+            if (configuration.isSynchronous()) {
+                process(exchange);
+            } else {
+                return getComponent().getControlChannelProcessor().process(exchange, callback);
+            }
+        } catch (Exception e) {
+            exchange.setException(e);
+        } finally {
+            callback.done(true);
+        }
+        return true;
+    }
+
+    /**
+     * Create a {@link DynamicRouterControlProducer} instance.
+     */
+    public static class DynamicRouterControlProducerFactory {
+
+        /**
+         * Create the {@link org.apache.camel.Producer} for the Dynamic Router with the supplied
+         * {@link org.apache.camel.Endpoint} URI.
+         *
+         * @param endpoint the {@link DynamicRouterEndpoint}
+         */
+        public DynamicRouterControlProducer getInstance(final DynamicRouterEndpoint endpoint) {
+            return new DynamicRouterControlProducer(endpoint);
+        }
+    }
+}
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterEndpoint.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterEndpoint.java
index 7c61dd3..dc748e6 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterEndpoint.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterEndpoint.java
@@ -19,315 +19,197 @@ package org.apache.camel.component.dynamicrouter;
 import java.util.function.Supplier;
 
 import org.apache.camel.Category;
+import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
-import org.apache.camel.component.dynamicrouter.DynamicRouterConsumer.DynamicRouterConsumerFactory;
+import org.apache.camel.Producer;
 import org.apache.camel.component.dynamicrouter.DynamicRouterControlChannelProcessor.DynamicRouterControlChannelProcessorFactory;
+import org.apache.camel.component.dynamicrouter.DynamicRouterControlProducer.DynamicRouterControlProducerFactory;
+import org.apache.camel.component.dynamicrouter.DynamicRouterProcessor.DynamicRouterProcessorFactory;
 import org.apache.camel.component.dynamicrouter.DynamicRouterProducer.DynamicRouterProducerFactory;
-import org.apache.camel.component.dynamicrouter.message.DynamicRouterControlMessage;
-import org.apache.camel.component.dynamicrouter.processor.DynamicRouterProcessor;
-import org.apache.camel.component.dynamicrouter.processor.DynamicRouterProcessor.DynamicRouterProcessorFactory;
-import org.apache.camel.component.dynamicrouter.processor.PrioritizedFilterProcessor;
-import org.apache.camel.component.dynamicrouter.processor.PrioritizedFilterProcessor.PrioritizedFilterProcessorFactory;
-import org.apache.camel.processor.DynamicRouter;
-import org.apache.camel.spi.Metadata;
+import org.apache.camel.component.dynamicrouter.PrioritizedFilterProcessor.PrioritizedFilterProcessorFactory;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
-import org.apache.camel.spi.UriPath;
 import org.apache.camel.support.DefaultEndpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.COMPONENT_SCHEME;
+import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.CONTROL_CHANNEL_NAME;
+import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.CONTROL_SYNTAX;
+import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.FIRST_VERSION;
+import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.SYNTAX;
+import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.TITLE;
 
 /**
  * The Dynamic Router component routes exchanges to recipients, and the recipients (and their rules) may change at
  * runtime.
  */
-@UriEndpoint(firstVersion = DynamicRouterConstants.FIRST_VERSION,
+@UriEndpoint(firstVersion = FIRST_VERSION,
              scheme = COMPONENT_SCHEME,
-             title = DynamicRouterConstants.TITLE,
-             syntax = DynamicRouterConstants.SYNTAX,
+             title = TITLE,
+             syntax = SYNTAX,
+             alternativeSyntax = CONTROL_SYNTAX,
+             producerOnly = true,
              category = { Category.ENDPOINT, Category.JAVA })
 public class DynamicRouterEndpoint extends DefaultEndpoint {
 
     private static final Logger LOG = LoggerFactory.getLogger(DynamicRouterEndpoint.class);
 
     /**
-     * Creates a {@link DynamicRouterProducer} instance.
-     */
-    private final Supplier<DynamicRouterProducerFactory> producerFactorySupplier;
-
-    /**
-     * Creates a {@link DynamicRouterConsumer} instance.
-     */
-    private final Supplier<DynamicRouterConsumerFactory> consumerFactorySupplier;
-
-    /**
-     * Channel for the Dynamic Router. For example, if the Dynamic Router URI is "dynamic-router://test", then the
-     * channel is "test". Channels are a way of keeping routing participants, their rules, and exchanges logically
-     * separate from the participants, rules, and exchanges on other channels. This can be seen as analogous to VLANs in
-     * networking.
+     * Creates the {@link DynamicRouterProcessor} instance.
      */
-    @UriPath(label = "common", description = "Channel of the Dynamic Router")
-    @Metadata(required = true)
-    private String channel;
+    private Supplier<DynamicRouterProcessorFactory> processorFactorySupplier = DynamicRouterProcessorFactory::new;
 
     /**
-     * Flag to ensure synchronous processing.
+     * Creates a {@link DynamicRouterProducer} instance.
      */
-    @UriParam(label = "advanced", defaultValue = "false")
-    private boolean synchronous;
+    private Supplier<DynamicRouterProducerFactory> producerFactorySupplier = DynamicRouterProducerFactory::new;
 
     /**
-     * Flag that determines if the producer should block while waiting for a consumer.
+     * Creates the {@link PrioritizedFilterProcessor} instance.
      */
-    @UriParam(label = "producer", defaultValue = "true")
-    private boolean block = true;
+    private Supplier<PrioritizedFilterProcessorFactory> filterProcessorFactorySupplier = PrioritizedFilterProcessorFactory::new;
 
     /**
-     * The time limit, in milliseconds, if/when the producer blocks while waiting for a consumer.
+     * Creates the {@link DynamicRouterControlChannelProcessor} instance.
      */
-    @UriParam(label = "producer", defaultValue = "30000")
-    private long timeout = 30000L;
+    private Supplier<DynamicRouterControlChannelProcessorFactory> controlChannelProcessorFactorySupplier
+            = DynamicRouterControlChannelProcessorFactory::new;
 
     /**
-     * Flag to fail if there are no consumers.
+     * Creates a {@link DynamicRouterControlProducer} instance.
      */
-    @UriParam(label = "producer", defaultValue = "true")
-    private boolean failIfNoConsumers = true;
+    private Supplier<DynamicRouterControlProducerFactory> controlProducerFactorySupplier
+            = DynamicRouterControlProducerFactory::new;
 
     /**
-     * Flag to log a warning if no predicates match for an exchange.
+     * The configuration for the Dynamic Router.
      */
-    @UriParam(label = "producer", defaultValue = "false")
-    private boolean warnDroppedMessage;
+    @UriParam
+    private DynamicRouterConfiguration configuration;
 
     /**
      * Create the Dynamic Router {@link org.apache.camel.Endpoint} for the given endpoint URI. This includes the
-     * creation of a {@link DynamicRouterProcessor} to instantiate a {@link DynamicRouterConsumer} that is registered
-     * with the supplied {@link DynamicRouterComponent}.
+     * creation of a {@link DynamicRouterProcessor} that is registered with the supplied {@link DynamicRouterComponent}.
      *
      * @param uri                            the endpoint URI
-     * @param channel                        the Dynamic Router channel
      * @param component                      the Dynamic Router {@link org.apache.camel.Component}
+     * @param configuration                  the {@link DynamicRouterConfiguration}
      * @param processorFactorySupplier       creates the {@link DynamicRouterProcessor}
      * @param producerFactorySupplier        creates the {@link DynamicRouterProcessor}
-     * @param consumerFactorySupplier        creates the {@link DynamicRouterConsumer}
      * @param filterProcessorFactorySupplier creates the {@link PrioritizedFilterProcessor}
      */
     public DynamicRouterEndpoint(
-                                 final String uri, final String channel, final DynamicRouterComponent component,
+                                 final String uri, final DynamicRouterComponent component,
+                                 final DynamicRouterConfiguration configuration,
                                  final Supplier<DynamicRouterProcessorFactory> processorFactorySupplier,
                                  final Supplier<DynamicRouterProducerFactory> producerFactorySupplier,
-                                 final Supplier<DynamicRouterConsumerFactory> consumerFactorySupplier,
                                  final Supplier<PrioritizedFilterProcessorFactory> filterProcessorFactorySupplier) {
         super(uri, component);
-        this.channel = channel;
+        this.configuration = configuration;
+        this.processorFactorySupplier = processorFactorySupplier;
         this.producerFactorySupplier = producerFactorySupplier;
-        this.consumerFactorySupplier = consumerFactorySupplier;
-        final DynamicRouterProcessor processor = processorFactorySupplier.get()
-                .getInstance("dynamicRouterProcessor-" + channel, getCamelContext(), isWarnDroppedMessage(),
-                        filterProcessorFactorySupplier);
-        try {
-            final DynamicRouterConsumer consumer = createConsumer(processor);
-            component.addConsumer(channel, consumer);
-        } catch (Exception e) {
-            throw new IllegalStateException("Could not create Dynamic Router endpoint", e);
-        }
+        this.filterProcessorFactorySupplier = filterProcessorFactorySupplier;
         LOG.debug("Created Dynamic Router endpoint URI: {}", uri);
     }
 
     /**
      * Create the specialized endpoint for the Dynamic Router Control Channel.
      *
-     * @param uri                      the endpoint URI
-     * @param channel                  the Dynamic Router channel
-     * @param component                the Dynamic Router {@link org.apache.camel.Component}
-     * @param processorFactorySupplier creates the {@link DynamicRouterControlChannelProcessor}
-     * @param producerFactorySupplier  creates the {@link DynamicRouterProcessor}
-     * @param consumerFactorySupplier  creates the {@link DynamicRouterConsumer}
+     * @param uri                            the endpoint URI
+     * @param component                      the Dynamic Router {@link org.apache.camel.Component}
+     * @param configuration                  the {@link DynamicRouterConfiguration}
+     * @param processorFactorySupplier       creates the {@link DynamicRouterControlChannelProcessor}
+     * @param controlProducerFactorySupplier creates the {@link DynamicRouterProcessor}
      */
     public DynamicRouterEndpoint(
-                                 final String uri, final String channel, final DynamicRouterComponent component,
+                                 final String uri, final DynamicRouterComponent component,
+                                 final DynamicRouterConfiguration configuration,
                                  final Supplier<DynamicRouterControlChannelProcessorFactory> processorFactorySupplier,
-                                 final Supplier<DynamicRouterProducerFactory> producerFactorySupplier,
-                                 final Supplier<DynamicRouterConsumerFactory> consumerFactorySupplier) {
+                                 final Supplier<DynamicRouterControlProducerFactory> controlProducerFactorySupplier) {
         super(uri, component);
-        this.channel = channel;
-        this.producerFactorySupplier = producerFactorySupplier;
-        this.consumerFactorySupplier = consumerFactorySupplier;
-        final DynamicRouterControlChannelProcessor processor = processorFactorySupplier.get()
-                .getInstance(component);
-        try {
-            final DynamicRouterConsumer consumer = createConsumer(processor);
-            component.addConsumer(channel, consumer);
-        } catch (Exception e) {
-            throw new IllegalStateException("Could not create Dynamic Router endpoint", e);
-        }
+        this.configuration = configuration;
+        this.controlChannelProcessorFactorySupplier = processorFactorySupplier;
+        this.controlProducerFactorySupplier = controlProducerFactorySupplier;
         LOG.debug("Created Dynamic Router Control Channel endpoint URI: {}", uri);
     }
 
-    /**
-     * Calls the {@link DynamicRouterProducerFactory} to create a {@link DynamicRouterProducer} instance.
-     *
-     * @return a {@link DynamicRouterProducer} instance
-     */
     @Override
-    public DynamicRouterProducer createProducer() {
-        return producerFactorySupplier.get().getInstance(this);
+    protected void doInit() throws Exception {
+        super.doInit();
+        DynamicRouterComponent component = getDynamicRouterComponent();
+        if (CONTROL_CHANNEL_NAME.equals(configuration.getChannel())) {
+            final DynamicRouterControlChannelProcessor processor = controlChannelProcessorFactorySupplier.get()
+                    .getInstance(component);
+            processor.setConfiguration(configuration);
+            try {
+                // There can be multiple control actions, but we do not want to
+                // create another consumer on the control channel, so check to
+                // see if the consumer has already been created, and skip the
+                // creation of another consumer if one already exists
+                if (component.getControlChannelProcessor() == null) {
+                    component.setControlChannelProcessor(processor);
+                }
+            } catch (Exception e) {
+                throw new IllegalStateException("Could not create Dynamic Router endpoint", e);
+            }
+        } else {
+            final DynamicRouterProcessor processor = processorFactorySupplier.get()
+                    .getInstance("dynamicRouterProcessor-" + configuration.getChannel(), getCamelContext(),
+                            configuration.getRecipientMode(), configuration.isWarnDroppedMessage(),
+                            filterProcessorFactorySupplier);
+            component.addRoutingProcessor(configuration.getChannel(), processor);
+        }
     }
 
     /**
-     * Calls the {@link DynamicRouterConsumerFactory} to create a {@link DynamicRouterConsumer} instance.
+     * Calls the {@link DynamicRouterProducerFactory} to create a {@link DynamicRouterProducer} instance.
      *
-     * @param  processor the {@link DynamicRouterProcessor} needed to create the {@link DynamicRouterConsumer}
-     * @return           a {@link DynamicRouterConsumer} instance
-     * @throws Exception if there is a problem creating the consumer
+     * @return a {@link DynamicRouterProducer} instance
      */
     @Override
-    public DynamicRouterConsumer createConsumer(final Processor processor) throws Exception {
-        DynamicRouterConsumer consumer = consumerFactorySupplier.get().getInstance(this, processor, channel);
-        configureConsumer(consumer);
-        return consumer;
-    }
-
-    /**
-     * Gets the channel of the dynamic router. For example, if the Dynamic Router URI is "dynamic-router://test", then
-     * the channel is "test". Channels are a way of keeping routing participants, their rules, and exchanges logically
-     * separate from the participants, rules, and exchanges on other channels. This can be seen as analogous to VLANs in
-     * networking.
-     *
-     * @return the channel of the dynamic router
-     */
-    public String getChannel() {
-        return channel;
+    public Producer createProducer() {
+        return CONTROL_CHANNEL_NAME.equals(configuration.getChannel())
+                ? controlProducerFactorySupplier.get().getInstance(this)
+                : producerFactorySupplier.get().getInstance(this);
     }
 
     /**
-     * Sets the channel of the dynamic router. For example, if the Dynamic Router URI is "dynamic-router://test", then
-     * the channel is "test". Channels are a way of keeping routing participants, their rules, and exchanges logically
-     * separate from the participants, rules, and exchanges on other channels. This can be seen as analogous to VLANs in
-     * networking.
+     * This is a producer-only component.
      *
-     * @param channel the channel of the dynamic router
+     * @param  processor not applicable to producer-only component
+     * @return           not applicable to producer-only component
      */
-    public void setChannel(final String channel) {
-        this.channel = channel;
-    }
-
-    /**
-     * Whether synchronous processing is forced.
-     * <p>
-     * If enabled then the producer thread, will be forced to wait until the message has been completed before the same
-     * thread will continue processing.
-     * <p>
-     * If disabled (default) then the producer thread may be freed and can do other work while the message is continued
-     * processed by other threads (reactive).
-     *
-     * @return true, if flag is set to force synchronous processing, otherwise false
-     */
-    public boolean isSynchronous() {
-        return synchronous;
-    }
-
-    /**
-     * Whether synchronous processing is forced.
-     * <p>
-     * If enabled then the producer thread, will be forced to wait until the message has been completed before the same
-     * thread will continue processing.
-     * <p>
-     * If disabled (default) then the producer thread may be freed and can do other work while the message is continued
-     * processed by other threads (reactive).
-     *
-     * @param synchronous flag to force synchronous processing when true
-     */
-    public void setSynchronous(boolean synchronous) {
-        this.synchronous = synchronous;
-    }
-
-    /**
-     * If sending a message to a direct endpoint which has no active consumer, then we can tell the producer to block
-     * and wait for the consumer to become active.
-     *
-     * @return true, if the producer will block
-     */
-    public boolean isBlock() {
-        return block;
-    }
-
-    /**
-     * If sending a message to a direct endpoint which has no active consumer, then we can tell the producer to block
-     * and wait for the consumer to become active.
-     *
-     * @param block "true" to block, false otherwise
-     */
-    public void setBlock(boolean block) {
-        this.block = block;
-    }
-
-    /**
-     * The timeout value to use if block is enabled.
-     *
-     * @return time, in milliseconds, to block
-     */
-    public long getTimeout() {
-        return timeout;
-    }
-
-    /**
-     * The timeout value to use if block is enabled.
-     *
-     * @param timeout time, in milliseconds, to block
-     */
-    public void setTimeout(long timeout) {
-        this.timeout = timeout;
-    }
-
-    /**
-     * Whether the producer should fail by throwing an exception, when sending to a Dynamic Router endpoint with no
-     * active consumers.
-     *
-     * @return true if we should fail if no consumers are registered
-     */
-    public boolean isFailIfNoConsumers() {
-        return failIfNoConsumers;
-    }
-
-    /**
-     * Whether the producer should fail by throwing an exception, when sending to a Dynamic Router endpoint with no
-     * active consumers.
-     *
-     * @param failIfNoConsumers flag to fail if there are no consumers registered to the {@link DynamicRouter}
-     */
-    public void setFailIfNoConsumers(boolean failIfNoConsumers) {
-        this.failIfNoConsumers = failIfNoConsumers;
+    @Override
+    public Consumer createConsumer(final Processor processor) {
+        throw new IllegalStateException("Dynamic Router is a producer-only component");
     }
 
     /**
-     * Gets the flag to log a warning if no predicates match for an exchange.
+     * A convenience method that wraps the parent method and casts to the {@link DynamicRouterComponent} implementation.
      *
-     * @return true if a warning will be logged if no predicates match for an exchange, otherwise false
+     * @return the {@link DynamicRouterComponent}
      */
-    public boolean isWarnDroppedMessage() {
-        return warnDroppedMessage;
+    public DynamicRouterComponent getDynamicRouterComponent() {
+        return (DynamicRouterComponent) getComponent();
     }
 
     /**
-     * Sets the flag to log a warning if no predicates match for an exchange.
+     * Gets the {@link DynamicRouterConfiguration}.
      *
-     * @param warnDroppedMessage flag to log a warning if no predicates match for an exchange
+     * @return the {@link DynamicRouterConfiguration}
      */
-    public void setWarnDroppedMessage(boolean warnDroppedMessage) {
-        this.warnDroppedMessage = warnDroppedMessage;
+    public DynamicRouterConfiguration getConfiguration() {
+        return configuration;
     }
 
     /**
-     * A convenience method that wraps the parent method and casts to the {@link DynamicRouterComponent} implementation.
+     * Set the {@link DynamicRouterConfiguration}.
      *
-     * @return the {@link DynamicRouterComponent}
+     * @param configuration the {@link DynamicRouterConfiguration}
      */
-    public DynamicRouterComponent getDynamicRouterComponent() {
-        return (DynamicRouterComponent) getComponent();
+    public void setConfiguration(final DynamicRouterConfiguration configuration) {
+        this.configuration = configuration;
     }
 
     /**
@@ -337,54 +219,50 @@ public class DynamicRouterEndpoint extends DefaultEndpoint {
 
         /**
          * Create the Dynamic Router {@link org.apache.camel.Endpoint} for the given endpoint URI. This includes the
-         * creation of a {@link DynamicRouterProcessor} to instantiate a {@link DynamicRouterConsumer} that is
-         * registered with the supplied {@link DynamicRouterComponent}.
+         * creation of a {@link DynamicRouterProcessor} that is registered with the supplied
+         * {@link DynamicRouterComponent}.
          *
          * @param  uri                            the endpoint URI
-         * @param  channel                        the Dynamic Router channel
          * @param  component                      the Dynamic Router {@link org.apache.camel.Component}
+         * @param  configuration                  the {@link DynamicRouterConfiguration}
          * @param  processorFactorySupplier       creates the {@link DynamicRouterProcessor}
          * @param  producerFactorySupplier        creates the {@link DynamicRouterProcessor}
-         * @param  consumerFactorySupplier        creates the {@link DynamicRouterConsumer}
          * @param  filterProcessorFactorySupplier creates the {@link PrioritizedFilterProcessor}
          * @return                                the {@link DynamicRouterEndpoint} for routing exchanges
          */
         public DynamicRouterEndpoint getInstance(
                 final String uri,
-                final String channel,
                 final DynamicRouterComponent component,
+                final DynamicRouterConfiguration configuration,
                 final Supplier<DynamicRouterProcessorFactory> processorFactorySupplier,
                 final Supplier<DynamicRouterProducerFactory> producerFactorySupplier,
-                final Supplier<DynamicRouterConsumerFactory> consumerFactorySupplier,
                 final Supplier<PrioritizedFilterProcessorFactory> filterProcessorFactorySupplier) {
             return new DynamicRouterEndpoint(
-                    uri, channel, component, processorFactorySupplier, producerFactorySupplier,
-                    consumerFactorySupplier, filterProcessorFactorySupplier);
+                    uri, component, configuration, processorFactorySupplier, producerFactorySupplier,
+                    filterProcessorFactorySupplier);
         }
 
         /**
          * Create a specialized Dynamic Router {@link org.apache.camel.Endpoint} for the control channel endpoint URI.
-         * This includes the creation of a {@link DynamicRouterControlChannelProcessor} to instantiate a
-         * {@link DynamicRouterConsumer} that is registered with the supplied {@link DynamicRouterComponent}. Routing
-         * participants use this endpoint to supply {@link DynamicRouterControlMessage}s to subscribe or unsubscribe.
+         * This includes the creation of a {@link DynamicRouterControlChannelProcessor} to instantiate a that is
+         * registered with the supplied {@link DynamicRouterComponent}. Routing participants use this endpoint to supply
+         * {@link DynamicRouterControlMessage}s to subscribe or unsubscribe.
          *
          * @param  uri                      the endpoint URI
-         * @param  channel                  the Dynamic Router channel
          * @param  component                the Dynamic Router {@link org.apache.camel.Component}
+         * @param  configuration            the {@link DynamicRouterConfiguration}
          * @param  processorFactorySupplier creates the {@link DynamicRouterControlChannelProcessor}
          * @param  producerFactorySupplier  creates the {@link DynamicRouterProcessor}
-         * @param  consumerFactorySupplier  creates the {@link DynamicRouterConsumer}
          * @return                          the {@link DynamicRouterEndpoint} for control channel messages
          */
         public DynamicRouterEndpoint getInstance(
                 final String uri,
-                final String channel,
                 final DynamicRouterComponent component,
+                final DynamicRouterConfiguration configuration,
                 final Supplier<DynamicRouterControlChannelProcessorFactory> processorFactorySupplier,
-                final Supplier<DynamicRouterProducerFactory> producerFactorySupplier,
-                final Supplier<DynamicRouterConsumerFactory> consumerFactorySupplier) {
+                final Supplier<DynamicRouterControlProducerFactory> producerFactorySupplier) {
             return new DynamicRouterEndpoint(
-                    uri, channel, component, processorFactorySupplier, producerFactorySupplier, consumerFactorySupplier);
+                    uri, component, configuration, processorFactorySupplier, producerFactorySupplier);
         }
     }
 }
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/processor/DynamicRouterProcessor.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessor.java
similarity index 77%
rename from components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/processor/DynamicRouterProcessor.java
rename to components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessor.java
index bdae695..add184e 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/processor/DynamicRouterProcessor.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessor.java
@@ -14,11 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.dynamicrouter.processor;
+package org.apache.camel.component.dynamicrouter;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.ExecutorService;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
@@ -28,8 +32,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.Traceable;
 import org.apache.camel.api.management.ManagedResource;
-import org.apache.camel.component.dynamicrouter.message.DynamicRouterControlMessage;
-import org.apache.camel.component.dynamicrouter.processor.PrioritizedFilterProcessor.PrioritizedFilterProcessorFactory;
+import org.apache.camel.component.dynamicrouter.PrioritizedFilterProcessor.PrioritizedFilterProcessorFactory;
 import org.apache.camel.processor.FilterProcessor;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.support.AsyncProcessorSupport;
@@ -37,6 +40,8 @@ import org.apache.camel.support.builder.PredicateBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.MODE_FIRST_MATCH;
+
 /**
  * Implements a <a href="http://camel.apache.org/dynamic-router.html">Dynamic Router</a> pattern where the destinations
  * are computed at runtime. Recipients register rules and their endpoint with the dynamic router. For each message, each
@@ -51,10 +56,9 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra
     private static final Logger LOG = LoggerFactory.getLogger(DynamicRouterProcessor.class);
 
     /**
-     * Template for a logging endpoint at the TRACE level, showing all, and multiline. There are placeholders for the
-     * logger name, with one segment, a dot, and the trailing segment.
+     * Template for a logging endpoint, showing all, and multiline.
      */
-    private static final String LOG_ENDPOINT = "log:%s.%s?level=DEBUG&showAll=true&multiline=true";
+    private static final String LOG_ENDPOINT = "log:%s.%s?level=%s&showAll=true&multiline=true";
 
     /**
      * {@link FilterProcessor}s to determine if the incoming exchange should be routed, based on the content.
@@ -67,6 +71,14 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra
     private final CamelContext camelContext;
 
     /**
+     * Indicates the behavior of the Dynamic Router when routing participants are selected to receive an incoming
+     * exchange. If the mode is "firstMatch", then the exchange is routed only to the first participant that has a
+     * matching predicate. If the mode is "allMatch", then the exchange is routed to all participants that have a
+     * matching predicate.
+     */
+    private final String recipientMode;
+
+    /**
      * The tempate for sending messages.
      */
     private final ProducerTemplate producerTemplate;
@@ -77,6 +89,11 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra
     private final PrioritizedFilterProcessor defaultProcessor;
 
     /**
+     * The {@link ExecutorService} for multicasting messages.
+     */
+    private final ExecutorService executorService;
+
+    /**
      * The {@link FilterProcessor} factory.
      */
     private final Supplier<PrioritizedFilterProcessorFactory> filterProcessorFactorySupplier;
@@ -91,16 +108,23 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra
      *
      * @param id                             the id of the processor
      * @param camelContext                   the camel context
+     * @param recipientMode                  the recipient mode
+     * @param warnDroppedMessage             flag to warn if messages are dropped
      * @param filterProcessorFactorySupplier creates the {@link PrioritizedFilterProcessor}
      */
-    public DynamicRouterProcessor(final String id, final CamelContext camelContext, final boolean warnDroppedMessage,
+    public DynamicRouterProcessor(final String id, final CamelContext camelContext, final String recipientMode,
+                                  final boolean warnDroppedMessage,
                                   final Supplier<PrioritizedFilterProcessorFactory> filterProcessorFactorySupplier) {
         this.id = id;
         this.filters = new ArrayList<>();
         this.camelContext = camelContext;
+        this.recipientMode = recipientMode;
         this.producerTemplate = camelContext.createProducerTemplate();
         this.filterProcessorFactorySupplier = filterProcessorFactorySupplier;
-        final String message = String.format(LOG_ENDPOINT, this.getClass().getCanonicalName(), getId());
+        this.executorService = camelContext.getExecutorServiceManager()
+                .newDefaultThreadPool(this, "dynamicRouterMulticastPool");
+        final String message = String.format(LOG_ENDPOINT, this.getClass().getCanonicalName(), getId(),
+                warnDroppedMessage ? "WARN" : "DEBUG");
         this.defaultProcessor = filterProcessorFactorySupplier.get().getInstance(
                 "defaultProcessor",
                 Integer.MAX_VALUE, camelContext, PredicateBuilder.constant(true),
@@ -129,8 +153,9 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra
     PrioritizedFilterProcessor createFilter(final DynamicRouterControlMessage controlMessage) {
         final String id = controlMessage.getId();
         final int priority = controlMessage.getPriority();
+        final String endpoint = controlMessage.getEndpoint();
         final Predicate predicate = controlMessage.getPredicate();
-        final Processor processor = exchange -> producerTemplate.send(controlMessage.getEndpoint(), exchange);
+        final Processor processor = exchange -> producerTemplate.send(endpoint, exchange);
         return filterProcessorFactorySupplier.get().getInstance(id, priority, camelContext, predicate, processor);
     }
 
@@ -195,17 +220,17 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra
      * Match the exchange against all {@link #filters} to determine if any of them are suitable to handle the exchange.
      *
      * @param  exchange the message exchange
-     * @return          the filter that matches for the exchange
+     * @return          list of filters that match for the exchange; if "firstMatch" mode, it is a singleton list of
+     *                  that filter
      */
-    PrioritizedFilterProcessor matchFilters(final Exchange exchange) {
-        PrioritizedFilterProcessor processor = defaultProcessor;
-        for (final PrioritizedFilterProcessor filter : filters) {
-            if (filter.matches(exchange)) {
-                processor = filter;
-                break;
-            }
-        }
-        return processor;
+    List<PrioritizedFilterProcessor> matchFilters(final Exchange exchange) {
+        return Optional.of(
+                filters.stream()
+                        .filter(f -> f.matches(exchange))
+                        .limit(MODE_FIRST_MATCH.equals(recipientMode) ? 1 : Integer.MAX_VALUE)
+                        .collect(Collectors.toList()))
+                .filter(list -> !list.isEmpty())
+                .orElse(Collections.singletonList(defaultProcessor));
     }
 
     /**
@@ -219,13 +244,19 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra
      * @param  callback the {@link AsyncCallback} will be invoked when the processing of the exchange is completed. If
      *                  the exchange is completed synchronously, then the callback is also invoked synchronously. The
      *                  callback should therefore be careful of starting recursive loop.
-     * @return          (doneSync) <tt>true</tt> to continue execute synchronously, <tt>false</tt> to continue being
-     *                  executed asynchronously
+     * @return          (doneSync) <tt>true</tt> to continue to execute synchronously, <tt>false</tt> to continue
+     *                  execution asynchronously
      */
     @Override
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
-        return matchFilters(exchange)
-                .process(exchange, callback);
+        try {
+            for (PrioritizedFilterProcessor filterProcessor : matchFilters(exchange)) {
+                filterProcessor.process(exchange, callback);
+            }
+        } catch (Exception e) {
+            exchange.setException(e);
+        }
+        return false;
     }
 
     /**
@@ -278,15 +309,19 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra
          *
          * @param id                             the id of the processor
          * @param camelContext                   the camel context
+         * @param recipientMode                  the mode for sending exchanges to matching participants
          * @param warnDroppedMessage             warn if no filters match an exchange
          * @param filterProcessorFactorySupplier creates the {@link PrioritizedFilterProcessor}
          */
         public DynamicRouterProcessor getInstance(
                 final String id,
                 final CamelContext camelContext,
+                final String recipientMode,
                 final boolean warnDroppedMessage,
                 final Supplier<PrioritizedFilterProcessorFactory> filterProcessorFactorySupplier) {
-            return new DynamicRouterProcessor(id, camelContext, warnDroppedMessage, filterProcessorFactorySupplier);
+            return new DynamicRouterProcessor(
+                    id, camelContext, recipientMode, warnDroppedMessage,
+                    filterProcessorFactorySupplier);
         }
     }
 }
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProducer.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProducer.java
index 268cd8e..71077bf 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProducer.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProducer.java
@@ -1,168 +1,118 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.dynamicrouter;
-
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.Exchange;
-import org.apache.camel.support.DefaultAsyncProducer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A {@link org.apache.camel.Producer} implementation to process exchanges for the Dynamic Router.
- */
-public class DynamicRouterProducer extends DefaultAsyncProducer {
-
-    private static final Logger LOG = LoggerFactory.getLogger(DynamicRouterProducer.class);
-
-    /**
-     * The {@link org.apache.camel.Endpoint} for the Dynamic Router instance.
-     */
-    private final DynamicRouterEndpoint endpoint;
-
-    /**
-     * The channel of the Dynamic Router. For example, if the Dynamic Router URI is "dynamic-router://test", then the
-     * channel is "test".
-     */
-    private final String channel;
-
-    /**
-     * Create the {@link org.apache.camel.Producer} for the Dynamic Router with the supplied
-     * {@link org.apache.camel.Endpoint} URI.
-     *
-     * @param endpoint the {@link DynamicRouterEndpoint}
-     */
-    public DynamicRouterProducer(final DynamicRouterEndpoint endpoint) {
-        super(endpoint);
-        this.endpoint = endpoint;
-        this.channel = endpoint.getChannel();
-        LOG.debug("Created producer for endpoint '{}', channel '{}'", endpoint.getEndpointUri(), channel);
-    }
-
-    /**
-     * Check that the {@link org.apache.camel.Consumer} instance is not null. If it is null, and if the
-     * {@link org.apache.camel.Endpoint} is set to fail if there are no consumers, then a
-     * {@link DynamicRouterConsumerNotAvailableException} is thrown. If this {@link org.apache.camel.Producer} is not
-     * set to fail if there are no consumers, then the message is dropped and a warning is logged.
-     *
-     * @param  exchange                                   the exchange being processed
-     * @return                                            true if the consumer is not null, otherwise false
-     * @throws DynamicRouterConsumerNotAvailableException if there are no consumers and the producer is set to fail if
-     *                                                    there are no consumers
-     */
-    boolean checkConsumer(final Exchange exchange) throws DynamicRouterConsumerNotAvailableException {
-        DynamicRouterConsumer consumer = null;
-        try {
-            consumer = getConsumer(endpoint.getTimeout());
-        } catch (Exception ignored) {
-            // No-op -- the next block will handle null consumer
-        }
-        if (consumer == null) {
-            if (endpoint.isFailIfNoConsumers()) {
-                throw new DynamicRouterConsumerNotAvailableException(
-                        "No consumers available on endpoint: " + endpoint, exchange);
-            } else {
-                LOG.warn("Message ignored -- no consumers available on endpoint: {}", endpoint.getEndpointUri());
-            }
-        }
-        return consumer != null;
-    }
-
-    /**
-     * Get the {@link DynamicRouterComponent} from the {@link DynamicRouterEndpoint}.
-     *
-     * @return the {@link DynamicRouterComponent}
-     */
-    private DynamicRouterComponent getComponent() {
-        return endpoint.getDynamicRouterComponent();
-    }
-
-    /**
-     * Get the {@link DynamicRouterConsumer}.
-     *
-     * @return the {@link DynamicRouterConsumer}
-     */
-    private DynamicRouterConsumer getConsumer() {
-        return getComponent().getConsumer(channel);
-    }
-
-    /**
-     * Get the {@link DynamicRouterConsumer}, and wait a maximum of the supplied timeout (milliseconds) if the consumer
-     * is not yet available.
-     *
-     * @param  timeout              time, in milliseconds, to wait if the consumer is not yet available
-     * @return                      the {@link DynamicRouterConsumer}
-     *
-     * @throws InterruptedException if interrupted while waiting for the {@link DynamicRouterConsumer}
-     */
-    private DynamicRouterConsumer getConsumer(final long timeout) throws InterruptedException {
-        return getComponent().getConsumer(channel, endpoint.isBlock(), timeout);
-    }
-
-    /**
-     * Process the exchange.
-     *
-     * @param  exchange  the exchange to process
-     * @throws Exception if the consumer has a problem
-     */
-    @Override
-    public void process(final Exchange exchange) throws Exception {
-        if (checkConsumer(exchange)) {
-            getConsumer().getProcessor().process(exchange);
-        }
-    }
-
-    /**
-     * Process the exchange, and use the {@link AsyncCallback} to signal completion.
-     *
-     * @param  exchange the exchange to process
-     * @param  callback the {@link AsyncCallback} to signal when asynchronous processing has completed
-     * @return          true to continue to execute synchronously, or false to continue to execute asynchronously
-     */
-    @Override
-    public boolean process(final Exchange exchange, final AsyncCallback callback) {
-        try {
-            // we may be forced synchronous
-            if (endpoint.isSynchronous()) {
-                process(exchange);
-            } else {
-                return getConsumer().getAsyncProcessor().process(exchange, callback);
-            }
-        } catch (Exception e) {
-            exchange.setException(e);
-        } finally {
-            callback.done(true);
-        }
-        return true;
-    }
-
-    /**
-     * Create a {@link DynamicRouterProducer} instance.
-     */
-    public static class DynamicRouterProducerFactory {
-
-        /**
-         * Create the {@link org.apache.camel.Producer} for the Dynamic Router with the supplied
-         * {@link org.apache.camel.Endpoint} URI.
-         *
-         * @param endpoint the {@link DynamicRouterEndpoint}
-         */
-        public DynamicRouterProducer getInstance(final DynamicRouterEndpoint endpoint) {
-            return new DynamicRouterProducer(endpoint);
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.dynamicrouter;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.DefaultAsyncProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link org.apache.camel.Producer} implementation to process exchanges for the Dynamic Router.
+ */
+public class DynamicRouterProducer extends DefaultAsyncProducer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DynamicRouterProducer.class);
+
+    /**
+     * The {@link org.apache.camel.Endpoint} for the Dynamic Router instance.
+     */
+    private final DynamicRouterEndpoint endpoint;
+
+    /**
+     * The configuration for the Dynamic Router.
+     */
+    private final DynamicRouterConfiguration configuration;
+
+    /**
+     * Create the {@link org.apache.camel.Producer} for the Dynamic Router with the supplied
+     * {@link org.apache.camel.Endpoint} URI.
+     *
+     * @param endpoint the {@link DynamicRouterEndpoint}
+     */
+    public DynamicRouterProducer(final DynamicRouterEndpoint endpoint) {
+        super(endpoint);
+        this.endpoint = endpoint;
+        this.configuration = endpoint.getConfiguration();
+        LOG.debug("Created producer for endpoint '{}', channel '{}'", endpoint.getEndpointUri(), configuration.getChannel());
+    }
+
+    /**
+     * Get the {@link DynamicRouterComponent} from the {@link DynamicRouterEndpoint}.
+     *
+     * @return the {@link DynamicRouterComponent}
+     */
+    private DynamicRouterComponent getComponent() {
+        return endpoint.getDynamicRouterComponent();
+    }
+
+    /**
+     * Process the exchange.
+     *
+     * @param  exchange  the exchange to process
+     * @throws Exception if the consumer has a problem
+     */
+    @Override
+    public void process(final Exchange exchange) throws Exception {
+        getComponent()
+                .getRoutingProcessor(configuration.getChannel())
+                .process(exchange);
+    }
+
+    /**
+     * Process the exchange, and use the {@link AsyncCallback} to signal completion.
+     *
+     * @param  exchange the exchange to process
+     * @param  callback the {@link AsyncCallback} to signal when asynchronous processing has completed
+     * @return          true to continue to execute synchronously, or false to continue to execute asynchronously
+     */
+    @Override
+    public boolean process(final Exchange exchange, final AsyncCallback callback) {
+        try {
+            // we may be forced synchronous
+            if (configuration.isSynchronous()) {
+                process(exchange);
+            } else {
+                return getComponent()
+                        .getRoutingProcessor(configuration.getChannel())
+                        .process(exchange, callback);
+            }
+        } catch (Exception e) {
+            exchange.setException(e);
+        } finally {
+            callback.done(true);
+        }
+        return true;
+    }
+
+    /**
+     * Create a {@link DynamicRouterProducer} instance.
+     */
+    public static class DynamicRouterProducerFactory {
+
+        /**
+         * Create the {@link org.apache.camel.Producer} for the Dynamic Router with the supplied
+         * {@link org.apache.camel.Endpoint} URI.
+         *
+         * @param endpoint the {@link DynamicRouterEndpoint}
+         */
+        public DynamicRouterProducer getInstance(final DynamicRouterEndpoint endpoint) {
+            return new DynamicRouterProducer(endpoint);
+        }
+    }
+}
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/processor/PrioritizedFilterProcessor.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/PrioritizedFilterProcessor.java
similarity index 96%
rename from components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/processor/PrioritizedFilterProcessor.java
rename to components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/PrioritizedFilterProcessor.java
index 64cd830..1dee9f7 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/processor/PrioritizedFilterProcessor.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/PrioritizedFilterProcessor.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.dynamicrouter.processor;
+package org.apache.camel.component.dynamicrouter;
 
 import java.util.Comparator;
 
@@ -88,7 +88,8 @@ public class PrioritizedFilterProcessor extends FilterProcessor implements Compa
 
     @Override
     public String toString() {
-        return String.format("PrioritizedFilterProcessor [id: %s, priority: %s]", this.getId(), this.getPriority());
+        return String.format("PrioritizedFilterProcessor [id: %s, priority: %s, predicate: %s]",
+                this.getId(), this.getPriority(), this.getPredicate());
     }
 
     /**
diff --git a/components/camel-dynamic-router/src/main/resources/META-INF/services/org/apache/camel/component/dynamic-router b/components/camel-dynamic-router/src/main/resources/META-INF/services/org/apache/camel/component/dynamic-router
index 9455468..c2e800d 100644
--- a/components/camel-dynamic-router/src/main/resources/META-INF/services/org/apache/camel/component/dynamic-router
+++ b/components/camel-dynamic-router/src/main/resources/META-INF/services/org/apache/camel/component/dynamic-router
@@ -1 +1 @@
-class=org.apache.camel.dynamicrouter.DynamicRouterComponent
+class=org.apache.camel.dynamicrouter.main.DynamicRouterComponent
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterComponentTest.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterComponentTest.java
index a5cab7c..e2bdc3b 100644
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterComponentTest.java
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterComponentTest.java
@@ -19,64 +19,48 @@ package org.apache.camel.component.dynamicrouter;
 import java.util.Collections;
 
 import org.apache.camel.Endpoint;
-import org.apache.camel.component.dynamicrouter.support.CamelDynamicRouterTestSupport;
-import org.junit.jupiter.api.Assertions;
+import org.apache.camel.component.dynamicrouter.support.DynamicRouterTestSupport;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
-public class DynamicRouterComponentTest extends CamelDynamicRouterTestSupport {
+public class DynamicRouterComponentTest extends DynamicRouterTestSupport {
 
     @BeforeEach
     void localSetup() throws Exception {
         super.setup();
         component = new DynamicRouterComponent(
                 () -> endpointFactory, () -> processorFactory, () -> controlChannelProcessorFactory, () -> producerFactory,
-                () -> consumerFactory, () -> filterProcessorFactory);
+                () -> controlProducerFactory, () -> filterProcessorFactory);
     }
 
     @Test
-    void createEndpoint() throws Exception {
+    void testCreateEndpoint() throws Exception {
         component.setCamelContext(context);
-        Endpoint actualEndpoint = component.createEndpoint("testname", "remaining", Collections.emptyMap());
+        Endpoint actualEndpoint = component.createEndpoint("dynamic-router:testname", "remaining", Collections.emptyMap());
         assertEquals(endpoint, actualEndpoint);
     }
 
     @Test
-    void addConsumer() {
-        component.addConsumer(DYNAMIC_ROUTER_CHANNEL, consumer);
-        assertEquals(consumer, component.getConsumer(DYNAMIC_ROUTER_CHANNEL));
-    }
-
-    @Test
-    void getConsumer() {
-        addConsumer();
-        final DynamicRouterConsumer result = component.getConsumer(DYNAMIC_ROUTER_CHANNEL);
-        Assertions.assertEquals(consumer, result);
+    void testCreateEndpointWithEmptyRemainingError() {
+        component.setCamelContext(context);
+        assertThrows(IllegalArgumentException.class,
+                () -> component.createEndpoint("dynamic-router:testname", "", Collections.emptyMap()));
     }
 
     @Test
-    void getConsumerBlock() throws InterruptedException {
-        addConsumer();
-        final DynamicRouterConsumer result = component.getConsumer(DYNAMIC_ROUTER_CHANNEL, true, endpoint.getTimeout());
-        Assertions.assertEquals(consumer, result);
+    void testAddRoutingProcessor() {
+        component.addRoutingProcessor(DYNAMIC_ROUTER_CHANNEL, processor);
+        assertEquals(processor, component.getRoutingProcessor(DYNAMIC_ROUTER_CHANNEL));
     }
 
     @Test
-    void addDuplicateConsumer() {
-        component.addConsumer(DYNAMIC_ROUTER_CHANNEL, consumer);
-        assertEquals(consumer, component.getConsumer(DYNAMIC_ROUTER_CHANNEL));
-        assertThrows(IllegalArgumentException.class, () -> component.addConsumer(DYNAMIC_ROUTER_CHANNEL, consumer));
-    }
+    void testAddRoutingProcessorWithSecondProcessorForChannelError() {
+        component.addRoutingProcessor(DYNAMIC_ROUTER_CHANNEL, processor);
+        assertEquals(processor, component.getRoutingProcessor(DYNAMIC_ROUTER_CHANNEL));
+        assertThrows(IllegalArgumentException.class, () -> component.addRoutingProcessor(DYNAMIC_ROUTER_CHANNEL, processor));
 
-    @Test
-    void removeConsumer() {
-        component.addConsumer(DYNAMIC_ROUTER_CHANNEL, consumer);
-        assertEquals(consumer, component.getConsumer(DYNAMIC_ROUTER_CHANNEL));
-        component.removeConsumer(DYNAMIC_ROUTER_CHANNEL, consumer);
-        assertNull(component.getConsumer(DYNAMIC_ROUTER_CHANNEL));
     }
 }
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterConfigurationTest.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterConfigurationTest.java
new file mode 100644
index 0000000..a3b409a
--- /dev/null
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterConfigurationTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.dynamicrouter;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class DynamicRouterConfigurationTest {
+
+    @Test
+    void testParsePath() {
+        String channel = "control";
+        String action = "subscribe";
+        String subscribeChannel = "test";
+        String testPath = String.format("%s/%s/%s", channel, action, subscribeChannel);
+        DynamicRouterConfiguration configuration = new DynamicRouterConfiguration();
+        configuration.parsePath(testPath);
+
+        assertEquals(channel, configuration.getChannel());
+        assertEquals(action, configuration.getControlAction());
+        assertEquals(subscribeChannel, configuration.getSubscribeChannel());
+    }
+
+    @Test
+    void testParsePathWithUriSyntaxError() {
+        String channel = "control";
+        String action = "subscribe";
+        String testPath = String.format("%s/%s", channel, action);
+        DynamicRouterConfiguration configuration = new DynamicRouterConfiguration();
+
+        assertThrows(IllegalArgumentException.class, () -> configuration.parsePath(testPath));
+    }
+}
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterConsumerTest.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterConsumerTest.java
deleted file mode 100644
index b5627ac..0000000
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterConsumerTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.dynamicrouter;
-
-import org.apache.camel.ShutdownRunningTask;
-import org.apache.camel.component.dynamicrouter.support.CamelDynamicRouterTestSupport;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
-
-import static org.junit.jupiter.api.Assertions.*;
-import static org.mockito.Mockito.*;
-
-class DynamicRouterConsumerTest extends CamelDynamicRouterTestSupport {
-
-    @BeforeEach
-    void localSetup() throws Exception {
-        super.setup();
-        consumer = new DynamicRouterConsumer(endpoint, processor, DYNAMIC_ROUTER_CHANNEL);
-    }
-
-    @Test
-    void getEndpoint() {
-        assertEquals(endpoint, consumer.getEndpoint());
-    }
-
-    @Test
-    void doStart() throws Exception {
-        consumer.doStart();
-        verify(component, Mockito.times(1)).addConsumer(DYNAMIC_ROUTER_CHANNEL, consumer);
-    }
-
-    @Test
-    void doStop() throws Exception {
-        consumer.doStop();
-        verify(component, Mockito.times(1)).removeConsumer(DYNAMIC_ROUTER_CHANNEL, consumer);
-    }
-
-    @Test
-    void doSuspend() {
-        consumer.doSuspend();
-        verify(component, Mockito.times(1)).removeConsumer(DYNAMIC_ROUTER_CHANNEL, consumer);
-    }
-
-    @Test
-    void doResume() {
-        consumer.doResume();
-        verify(component, Mockito.times(1)).addConsumer(DYNAMIC_ROUTER_CHANNEL, consumer);
-    }
-
-    @Test
-    void deferShutdown() {
-        assertTrue(consumer.deferShutdown(ShutdownRunningTask.CompleteAllTasks));
-    }
-
-    @Test
-    void getPendingExchangesSize() {
-        assertEquals(0, consumer.getPendingExchangesSize());
-    }
-
-    @Test
-    void prepareShutdown() {
-        assertDoesNotThrow(() -> consumer.prepareShutdown(true, true));
-    }
-}
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterControlChannelProcessorTest.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterControlChannelProcessorTest.java
new file mode 100644
index 0000000..3a234a4
--- /dev/null
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterControlChannelProcessorTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.dynamicrouter;
+
+import org.apache.camel.Message;
+import org.apache.camel.NoSuchLanguageException;
+import org.apache.camel.Predicate;
+import org.apache.camel.component.dynamicrouter.DynamicRouterControlMessage.SubscribeMessageBuilder;
+import org.apache.camel.component.dynamicrouter.DynamicRouterControlMessage.UnsubscribeMessageBuilder;
+import org.apache.camel.component.dynamicrouter.support.DynamicRouterTestSupport;
+import org.apache.camel.spi.Language;
+import org.apache.camel.support.builder.PredicateBuilder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.CONTROL_ACTION_SUBSCRIBE;
+import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.CONTROL_ACTION_UNSUBSCRIBE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class DynamicRouterControlChannelProcessorTest extends DynamicRouterTestSupport {
+
+    @BeforeEach
+    void localSetup() throws Exception {
+        super.setup();
+        controlChannelProcessor = new DynamicRouterControlChannelProcessor(component);
+        controlChannelProcessor.setConfiguration(configuration);
+    }
+
+    @Test
+    void testObtainPredicateFromUri() {
+        String language = "simple";
+        String expression = "${body} regex '^\\d*[02468]$'";
+        when(configuration.getExpressionLanguage()).thenReturn(language);
+        when(configuration.getPredicate()).thenReturn(expression);
+
+        final Predicate result = controlChannelProcessor.obtainPredicate(null);
+
+        assertNotNull(result, "Predicate was null");
+    }
+
+    @Test
+    void testObtainPredicateFromBody() {
+        String exLang = "simple";
+        String expression = "${body} regex '^\\d*[02468]$'";
+        Language language = component.getCamelContext().resolveLanguage(exLang);
+        Predicate predicate = language.createPredicate(expression);
+        when(configuration.getExpressionLanguage()).thenReturn(null);
+        when(configuration.getPredicate()).thenReturn(null);
+
+        final Predicate result = controlChannelProcessor.obtainPredicate(predicate);
+
+        assertEquals(predicate, result, "The expected predicate was not returned");
+    }
+
+    @Test
+    void testObtainPredicateWithLanguageResolutionError() {
+        String language = "bad language";
+        String expression = "bad expression";
+        when(configuration.getExpressionLanguage()).thenReturn(language);
+        when(configuration.getPredicate()).thenReturn(expression);
+        when(configuration.getPredicateBean()).thenReturn(null);
+        when(context.resolveLanguage(language)).thenThrow(new NoSuchLanguageException("test"));
+
+        assertThrows(IllegalArgumentException.class, () -> controlChannelProcessor.obtainPredicate(null));
+    }
+
+    @Test
+    void testObtainPredicateWithNoPredicateError() {
+        assertThrows(IllegalArgumentException.class, () -> controlChannelProcessor.obtainPredicate("oops"));
+    }
+
+    @Test
+    void testHandleSubscribeMessageFromUri() {
+        String language = "simple";
+        String expression = "${body} regex '^\\d*[02468]$'";
+        when(configuration.getControlAction()).thenReturn(CONTROL_ACTION_SUBSCRIBE);
+        when(configuration.getSubscribeChannel()).thenReturn("test");
+        when(configuration.getSubscriptionId()).thenReturn(TEST_ID);
+        when(configuration.getDestinationUri()).thenReturn("testUri");
+        when(configuration.getPriority()).thenReturn(10);
+        when(configuration.getExpressionLanguage()).thenReturn(language);
+        when(configuration.getPredicate()).thenReturn(expression);
+        Message message = mock(Message.class);
+        when(message.getBody()).thenReturn(null);
+        when(exchange.getIn()).thenReturn(message);
+
+        final DynamicRouterControlMessage result = controlChannelProcessor.handleControlMessage(exchange);
+
+        assertNotNull(result);
+        assertEquals(CONTROL_ACTION_SUBSCRIBE, result.getMessageType().name().toLowerCase());
+        assertEquals(DYNAMIC_ROUTER_CHANNEL, result.getChannel());
+        assertEquals(TEST_ID, result.getId());
+        assertEquals("testUri", result.getEndpoint());
+        assertEquals(10, result.getPriority());
+    }
+
+    @Test
+    void testHandleUnsubscribeMessageFromUri() {
+        when(configuration.getControlAction()).thenReturn(CONTROL_ACTION_UNSUBSCRIBE);
+        when(configuration.getSubscribeChannel()).thenReturn("test");
+        when(configuration.getSubscriptionId()).thenReturn(TEST_ID);
+        Message message = mock(Message.class);
+        when(message.getBody()).thenReturn(null);
+        when(exchange.getIn()).thenReturn(message);
+
+        final DynamicRouterControlMessage result = controlChannelProcessor.handleControlMessage(exchange);
+
+        assertNotNull(result);
+        assertEquals(CONTROL_ACTION_UNSUBSCRIBE, result.getMessageType().name().toLowerCase());
+        assertEquals(DYNAMIC_ROUTER_CHANNEL, result.getChannel());
+        assertEquals(TEST_ID, result.getId());
+    }
+
+    @Test
+    void testHandleSubscribeMessageFromBody() {
+        when(configuration.getControlAction()).thenReturn(null);
+        final DynamicRouterControlMessage body = new SubscribeMessageBuilder()
+                .id(TEST_ID)
+                .channel(DYNAMIC_ROUTER_CHANNEL)
+                .priority(10)
+                .endpointUri("testUri")
+                .predicate(PredicateBuilder.constant(true))
+                .build();
+        Message message = mock(Message.class);
+        when(message.getBody()).thenReturn(body);
+        when(exchange.getIn()).thenReturn(message);
+
+        final DynamicRouterControlMessage result = controlChannelProcessor.handleControlMessage(exchange);
+
+        assertEquals(body, result, "Did not receive expected subscribe message");
+    }
+
+    @Test
+    void testHandleUnsubscribeMessageFromBody() {
+        when(configuration.getControlAction()).thenReturn(null);
+        final DynamicRouterControlMessage body = new UnsubscribeMessageBuilder()
+                .id(TEST_ID)
+                .channel(DYNAMIC_ROUTER_CHANNEL)
+                .build();
+        Message message = mock(Message.class);
+        when(message.getBody()).thenReturn(body);
+        when(exchange.getIn()).thenReturn(message);
+
+        final DynamicRouterControlMessage result = controlChannelProcessor.handleControlMessage(exchange);
+
+        assertEquals(body, result, "Did not receive expected unsubscribe message");
+    }
+
+    @Test
+    void testHandleControlMessageWithIllegalControlActionError() {
+        when(configuration.getControlAction()).thenReturn("oops");
+        Message message = mock(Message.class);
+        when(exchange.getIn()).thenReturn(message);
+
+        assertThrows(IllegalArgumentException.class, () -> controlChannelProcessor.handleControlMessage(exchange));
+    }
+
+    @Test
+    void testHandleControlMessageWithNoControlMessageError() {
+        when(configuration.getControlAction()).thenReturn(null);
+        Message message = mock(Message.class);
+        when(message.getBody()).thenReturn("oops");
+        when(exchange.getIn()).thenReturn(message);
+
+        assertThrows(IllegalArgumentException.class, () -> controlChannelProcessor.handleControlMessage(exchange));
+    }
+
+    @Test
+    void testProcessSubscribe() {
+        when(configuration.getControlAction()).thenReturn(null);
+        final DynamicRouterControlMessage body = new SubscribeMessageBuilder()
+                .id(TEST_ID)
+                .channel(DYNAMIC_ROUTER_CHANNEL)
+                .priority(10)
+                .endpointUri("testUri")
+                .predicate(PredicateBuilder.constant(true))
+                .build();
+        Message message = mock(Message.class);
+        when(message.getBody()).thenReturn(body);
+        when(exchange.getIn()).thenReturn(message);
+        doNothing().when(processor).addFilter(any(DynamicRouterControlMessage.class));
+
+        boolean result = controlChannelProcessor.process(exchange, asyncCallback);
+
+        assertTrue(result, "Expected 'true' result");
+        verify(processor, times(1)).addFilter(body);
+    }
+
+    @Test
+    void testProcessUnsubscribe() {
+        when(configuration.getControlAction()).thenReturn(null);
+        final DynamicRouterControlMessage body = new UnsubscribeMessageBuilder()
+                .id(TEST_ID)
+                .channel(DYNAMIC_ROUTER_CHANNEL)
+                .build();
+        Message message = mock(Message.class);
+        when(message.getBody()).thenReturn(body);
+        when(exchange.getIn()).thenReturn(message);
+        doNothing().when(processor).removeFilter(TEST_ID);
+
+        boolean result = controlChannelProcessor.process(exchange, asyncCallback);
+
+        assertTrue(result, "Expected 'true' result");
+        verify(processor, times(1)).removeFilter(TEST_ID);
+    }
+}
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterControlMessageTest.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterControlMessageTest.java
new file mode 100644
index 0000000..40cef54
--- /dev/null
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterControlMessageTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.dynamicrouter;
+
+import java.util.stream.Stream;
+
+import org.apache.camel.Predicate;
+import org.apache.camel.component.dynamicrouter.DynamicRouterControlMessage.SubscribeMessageBuilder;
+import org.apache.camel.component.dynamicrouter.DynamicRouterControlMessage.UnsubscribeMessageBuilder;
+import org.apache.camel.component.dynamicrouter.support.DynamicRouterTestSupport;
+import org.apache.camel.support.builder.PredicateBuilder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class DynamicRouterControlMessageTest extends DynamicRouterTestSupport {
+
+    private static Stream<Arguments> provideSubscribeMessageArguments() {
+        return Stream.of(
+                Arguments.of(null, DYNAMIC_ROUTER_CHANNEL, TEST_PRIORITY, "mock:test", PredicateBuilder.constant(true),
+                        "Missing subscription ID"),
+                Arguments.of(TEST_ID, null, TEST_PRIORITY, "mock:test", PredicateBuilder.constant(true),
+                        "Missing subscription channel"),
+                Arguments.of(TEST_ID, DYNAMIC_ROUTER_CHANNEL, TEST_PRIORITY, null, PredicateBuilder.constant(true),
+                        "Missing URI"),
+                Arguments.of(TEST_ID, DYNAMIC_ROUTER_CHANNEL, TEST_PRIORITY, "mock:test", null, "Missing predicate"));
+    }
+
+    private static Stream<Arguments> provideUnsubscribeMessageArguments() {
+        return Stream.of(
+                Arguments.of(null, DYNAMIC_ROUTER_CHANNEL, "Missing subscription ID"),
+                Arguments.of(TEST_ID, null, "Missing subscription channel"));
+    }
+
+    @Test
+    void testBuildSubscribeMessage() {
+        assertDoesNotThrow(() -> new SubscribeMessageBuilder()
+                .id(TEST_ID)
+                .channel(DYNAMIC_ROUTER_CHANNEL)
+                .priority(TEST_PRIORITY)
+                .endpointUri("mock:test")
+                .predicate(PredicateBuilder.constant(true))
+                .build());
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideSubscribeMessageArguments")
+    void testBuildSubscribeMessageWithError(
+            String id, String channel, int priority, String uri, Predicate predicate, String message) {
+        assertThrows(IllegalArgumentException.class, () -> new SubscribeMessageBuilder()
+                .id(id)
+                .channel(channel)
+                .priority(priority)
+                .endpointUri(uri)
+                .predicate(predicate)
+                .build(), message);
+    }
+
+    @Test
+    void testBuildUnsubscribeMessage() {
+        assertDoesNotThrow(() -> new UnsubscribeMessageBuilder()
+                .id(TEST_ID)
+                .channel(DYNAMIC_ROUTER_CHANNEL)
+                .build());
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideUnsubscribeMessageArguments")
+    void testBuildUnsubscribeMessageWithError(String id, String channel, String message) {
+        assertThrows(IllegalArgumentException.class, () -> new UnsubscribeMessageBuilder()
+                .id(id)
+                .channel(channel)
+                .build(), message);
+    }
+}
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterControlProducerTest.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterControlProducerTest.java
new file mode 100644
index 0000000..8eca9c4
--- /dev/null
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterControlProducerTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.dynamicrouter;
+
+import org.apache.camel.component.dynamicrouter.support.DynamicRouterTestSupport;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.mockito.Mockito.when;
+
+class DynamicRouterControlProducerTest extends DynamicRouterTestSupport {
+
+    @BeforeEach
+    void localSetup() throws Exception {
+        super.setup();
+        controlProducer = new DynamicRouterControlProducer(endpoint);
+    }
+
+    @Test
+    void testProcessSynchronous() {
+        when(endpoint.getConfiguration().isSynchronous()).thenReturn(true);
+        boolean result = controlProducer.process(exchange, asyncCallback);
+        Assertions.assertTrue(result);
+    }
+
+    @Test
+    void testProcessAynchronous() {
+        when(endpoint.getConfiguration().isSynchronous()).thenReturn(false);
+        boolean result = controlProducer.process(exchange, asyncCallback);
+        Assertions.assertTrue(result);
+    }
+}
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterEndpointTest.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterEndpointTest.java
index 1e6c7e4..a03d3f5 100644
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterEndpointTest.java
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterEndpointTest.java
@@ -16,49 +16,46 @@
  */
 package org.apache.camel.component.dynamicrouter;
 
-import org.apache.camel.Processor;
-import org.apache.camel.component.dynamicrouter.DynamicRouterConsumer.DynamicRouterConsumerFactory;
-import org.apache.camel.component.dynamicrouter.support.CamelDynamicRouterTestSupport;
-import org.junit.jupiter.api.Assertions;
+import org.apache.camel.Producer;
+import org.apache.camel.component.dynamicrouter.support.DynamicRouterTestSupport;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.CONTROL_CHANNEL_NAME;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
 
-class DynamicRouterEndpointTest extends CamelDynamicRouterTestSupport {
+class DynamicRouterEndpointTest extends DynamicRouterTestSupport {
 
     @BeforeEach
     void localSetup() throws Exception {
         super.setup();
         endpoint = new DynamicRouterEndpoint(
-                BASE_URI, DYNAMIC_ROUTER_CHANNEL, component, () -> processorFactory, () -> producerFactory,
-                () -> consumerFactory,
+                BASE_URI, component, configuration, () -> processorFactory, () -> producerFactory,
                 () -> filterProcessorFactory);
     }
 
     @Test
-    void testCreateEndpointConsumerCreationFailure() {
-        Assertions.assertThrows(IllegalStateException.class, () -> new DynamicRouterEndpoint(
-                BASE_URI, DYNAMIC_ROUTER_CHANNEL, component, () -> processorFactory, () -> producerFactory,
-                () -> new DynamicRouterConsumerFactory() {
-                    @Override
-                    public DynamicRouterConsumer getInstance(
-                            DynamicRouterEndpoint endpoint, Processor processor, String channel) {
-                        throw new IllegalArgumentException("test");
-                    }
-                },
-                () -> filterProcessorFactory));
+    void testCreateProducer() {
+        Producer actualProducer = endpoint.createProducer();
+        assertEquals(producer, actualProducer);
     }
 
     @Test
-    void createProducer() {
-        DynamicRouterProducer actualProducer = endpoint.createProducer();
-        assertEquals(producer, actualProducer);
+    void testCreateConsumerException() {
+        assertThrows(IllegalStateException.class, () -> endpoint.createConsumer(processor));
     }
 
     @Test
-    void createConsumer() throws Exception {
-        DynamicRouterConsumer actualConsumer = endpoint.createConsumer(processor);
-        assertEquals(consumer, actualConsumer);
+    void testInitControlChannelEndpointWithError() {
+        when(configuration.getChannel()).thenReturn(CONTROL_CHANNEL_NAME);
+        DynamicRouterEndpoint controlEndpoint = new DynamicRouterEndpoint(
+                BASE_URI, component, configuration, () -> controlChannelProcessorFactory, () -> controlProducerFactory);
+        doThrow(new RuntimeException()).when(component)
+                .setControlChannelProcessor(any(DynamicRouterControlChannelProcessor.class));
+        assertThrows(IllegalStateException.class, controlEndpoint::doInit);
     }
 }
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/processor/DynamicRouterProcessorTest.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessorTest.java
similarity index 71%
rename from components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/processor/DynamicRouterProcessorTest.java
rename to components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessorTest.java
index 745f298..3e4dd14 100644
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/processor/DynamicRouterProcessorTest.java
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessorTest.java
@@ -14,25 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.dynamicrouter.processor;
+package org.apache.camel.component.dynamicrouter;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
-import org.apache.camel.component.dynamicrouter.support.CamelDynamicRouterTestSupport;
+import org.apache.camel.component.dynamicrouter.support.DynamicRouterTestSupport;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.MODE_FIRST_MATCH;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.when;
 
-class DynamicRouterProcessorTest extends CamelDynamicRouterTestSupport {
+class DynamicRouterProcessorTest extends DynamicRouterTestSupport {
 
     @BeforeEach
     void localSetup() throws Exception {
         super.setup();
-        processor = new DynamicRouterProcessor(PROCESSOR_ID, context, false, () -> filterProcessorFactory);
+        processor = new DynamicRouterProcessor(PROCESSOR_ID, context, MODE_FIRST_MATCH, false, () -> filterProcessorFactory);
     }
 
     @Test
@@ -40,41 +42,41 @@ class DynamicRouterProcessorTest extends CamelDynamicRouterTestSupport {
         when(controlMessage.getPriority()).thenReturn(1);
         when(controlMessage.getPredicate()).thenReturn(e -> true);
         PrioritizedFilterProcessor result = processor.createFilter(controlMessage);
-        Assertions.assertEquals(filterProcessor, result);
+        assertEquals(filterProcessor, result);
     }
 
     @Test
     void addFilterAsControlMessage() {
         processor.addFilter(controlMessage);
-        Assertions.assertNotNull(processor.getFilter(MESSAGE_ID));
+        Assertions.assertNotNull(processor.getFilter(TEST_ID));
     }
 
     @Test
     void addFilterAsFilterProcessor() {
         processor.addFilter(filterProcessor);
-        PrioritizedFilterProcessor result = processor.getFilter(MESSAGE_ID);
-        Assertions.assertEquals(filterProcessor, result);
+        PrioritizedFilterProcessor result = processor.getFilter(TEST_ID);
+        assertEquals(filterProcessor, result);
     }
 
     @Test
     void removeFilter() {
         addFilterAsFilterProcessor();
-        processor.removeFilter(MESSAGE_ID);
-        PrioritizedFilterProcessor result = processor.getFilter(MESSAGE_ID);
+        processor.removeFilter(TEST_ID);
+        PrioritizedFilterProcessor result = processor.getFilter(TEST_ID);
         Assertions.assertNull(result);
     }
 
     @Test
     void matchFiltersMatches() {
         addFilterAsFilterProcessor();
-        PrioritizedFilterProcessor result = processor.matchFilters(exchange);
-        Assertions.assertEquals(MESSAGE_ID, result.getId());
+        PrioritizedFilterProcessor result = processor.matchFilters(exchange).get(0);
+        assertEquals(TEST_ID, result.getId());
     }
 
     @Test
     void matchFiltersDoesNotMatch() {
-        PrioritizedFilterProcessor result = processor.matchFilters(exchange);
-        Assertions.assertEquals(Integer.MAX_VALUE, result.getPriority());
+        PrioritizedFilterProcessor result = processor.matchFilters(exchange).get(0);
+        assertEquals(Integer.MAX_VALUE, result.getPriority());
     }
 
     @Test
@@ -82,7 +84,7 @@ class DynamicRouterProcessorTest extends CamelDynamicRouterTestSupport {
         addFilterAsFilterProcessor();
         when(filterProcessor.matches(exchange)).thenReturn(true);
         lenient().when(filterProcessor.process(any(Exchange.class), any(AsyncCallback.class))).thenReturn(true);
-        Assertions.assertTrue(processor.process(exchange, asyncCallback));
+        Assertions.assertFalse(processor.process(exchange, asyncCallback));
     }
 
     @Test
@@ -91,4 +93,14 @@ class DynamicRouterProcessorTest extends CamelDynamicRouterTestSupport {
         when(filterProcessor.matches(exchange)).thenReturn(false);
         Assertions.assertFalse(processor.process(exchange, asyncCallback));
     }
+
+    @Test
+    void testStringIsId() {
+        assertEquals(PROCESSOR_ID, processor.toString());
+    }
+
+    @Test
+    void testTraceLabelIsId() {
+        assertEquals(PROCESSOR_ID, processor.getTraceLabel());
+    }
 }
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProducerTest.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProducerTest.java
index 4c9c8eb..6d09db9 100644
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProducerTest.java
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProducerTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.camel.component.dynamicrouter;
 
-import org.apache.camel.component.dynamicrouter.support.CamelDynamicRouterTestSupport;
+import org.apache.camel.component.dynamicrouter.support.DynamicRouterTestSupport;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -24,7 +24,7 @@ import org.mockito.Mockito;
 
 import static org.mockito.Mockito.when;
 
-class DynamicRouterProducerTest extends CamelDynamicRouterTestSupport {
+class DynamicRouterProducerTest extends DynamicRouterTestSupport {
 
     @BeforeEach
     void localSetup() throws Exception {
@@ -36,60 +36,15 @@ class DynamicRouterProducerTest extends CamelDynamicRouterTestSupport {
     }
 
     @Test
-    void testCheckConsumer() throws Exception {
-        when(component.getConsumer(DYNAMIC_ROUTER_CHANNEL, true, TIMEOUT))
-                .thenReturn(consumer);
-        boolean result = producer.checkConsumer(exchange);
-        Assertions.assertTrue(result);
-    }
-
-    @Test
-    void testCheckConsumerNull() throws Exception {
-        when(component.getConsumer(DYNAMIC_ROUTER_CHANNEL, true, TIMEOUT))
-                .thenReturn(null);
-        boolean result = producer.checkConsumer(exchange);
-        Assertions.assertFalse(result);
-    }
-
-    @Test
-    void testCheckConsumerInitiallyNullAndFailIfNoConsumers() {
-        when(component.getConsumer(DYNAMIC_ROUTER_CHANNEL))
-                .thenReturn(null);
-        when(endpoint.isFailIfNoConsumers()).thenReturn(true);
-        Assertions.assertThrows(
-                DynamicRouterConsumerNotAvailableException.class,
-                () -> producer.checkConsumer(exchange));
-    }
-
-    @Test
-    void testCheckConsumerExceptionIgnored() throws Exception {
-        when(component.getConsumer(DYNAMIC_ROUTER_CHANNEL, true, TIMEOUT))
-                .thenThrow(new InterruptedException("test"));
-        boolean result = producer.checkConsumer(exchange);
-        Assertions.assertFalse(result);
-    }
-
-    @Test
-    void testProcess() throws Exception {
-        when(component.getConsumer(DYNAMIC_ROUTER_CHANNEL))
-                .thenReturn(consumer);
-        when(component.getConsumer(DYNAMIC_ROUTER_CHANNEL, true, TIMEOUT))
-                .thenReturn(consumer);
-        Assertions.assertDoesNotThrow(() -> producer.process(exchange));
-    }
-
-    @Test
-    void testProcessSynchronous() throws Exception {
-        when(component.getConsumer(DYNAMIC_ROUTER_CHANNEL, true, TIMEOUT))
-                .thenReturn(consumer);
-        when(endpoint.isSynchronous()).thenReturn(true);
+    void testProcessSynchronous() {
+        when(endpoint.getConfiguration().isSynchronous()).thenReturn(true);
         boolean result = producer.process(exchange, asyncCallback);
         Assertions.assertTrue(result);
     }
 
     @Test
     void testProcessAynchronous() {
-        when(endpoint.isSynchronous()).thenReturn(false);
+        when(endpoint.getConfiguration().isSynchronous()).thenReturn(false);
         boolean result = producer.process(exchange, asyncCallback);
         Assertions.assertTrue(result);
     }
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/PrioritizedFilterProcessorTest.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/PrioritizedFilterProcessorTest.java
new file mode 100644
index 0000000..cee989a
--- /dev/null
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/PrioritizedFilterProcessorTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.dynamicrouter;
+
+import org.apache.camel.component.dynamicrouter.support.DynamicRouterTestSupport;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+class PrioritizedFilterProcessorTest extends DynamicRouterTestSupport {
+
+    @Test
+    void testCompareToAndEqual() {
+        PrioritizedFilterProcessor testProcessor
+                = new PrioritizedFilterProcessor(TEST_ID, TEST_PRIORITY, context, predicate, processor);
+        assertEquals(0, testProcessor.compareTo(prioritizedFilterProcessor));
+    }
+
+    @Test
+    void testCompareToAndNotEqualById() {
+        PrioritizedFilterProcessor testProcessor
+                = new PrioritizedFilterProcessor("differentId", TEST_PRIORITY, context, predicate, processor);
+        assertNotEquals(0, testProcessor.compareTo(prioritizedFilterProcessor));
+    }
+
+    @Test
+    void testCompareToAndNotEqualByPriority() {
+        PrioritizedFilterProcessor testProcessor = new PrioritizedFilterProcessor(TEST_ID, 1, context, predicate, processor);
+        assertNotEquals(0, testProcessor.compareTo(prioritizedFilterProcessor));
+    }
+
+    @Test
+    void testToString() {
+        PrioritizedFilterProcessor testProcessor
+                = new PrioritizedFilterProcessor(TEST_ID, TEST_PRIORITY, context, predicate, processor);
+        String expected = String.format("PrioritizedFilterProcessor [id: %s, priority: %s, predicate: %s]",
+                TEST_ID, TEST_PRIORITY, TEST_PREDICATE);
+        String result = testProcessor.toString();
+        assertEquals(expected, result);
+    }
+}
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterBasicSynchronousIT.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterBasicSynchronousIT.java
index 4958e5e..6e62daf 100644
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterBasicSynchronousIT.java
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterBasicSynchronousIT.java
@@ -24,9 +24,9 @@ import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Predicate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.dynamicrouter.DynamicRouterComponent;
-import org.apache.camel.component.dynamicrouter.message.DynamicRouterControlMessage;
-import org.apache.camel.component.dynamicrouter.message.DynamicRouterControlMessage.SubscribeMessageBuilder;
-import org.apache.camel.component.dynamicrouter.message.DynamicRouterControlMessage.UnsubscribeMessageBuilder;
+import org.apache.camel.component.dynamicrouter.DynamicRouterControlMessage;
+import org.apache.camel.component.dynamicrouter.DynamicRouterControlMessage.SubscribeMessageBuilder;
+import org.apache.camel.component.dynamicrouter.DynamicRouterControlMessage.UnsubscribeMessageBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.junit5.CamelTestSupport;
 import org.junit.jupiter.api.Test;
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterSingleRouteTwoParticipantsIT.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterMultipleRecipientModeIT.java
similarity index 72%
copy from components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterSingleRouteTwoParticipantsIT.java
copy to components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterMultipleRecipientModeIT.java
index db45b5c..724c018 100644
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterSingleRouteTwoParticipantsIT.java
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterMultipleRecipientModeIT.java
@@ -23,12 +23,10 @@ import java.util.stream.IntStream;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.EndpointInject;
-import org.apache.camel.Predicate;
 import org.apache.camel.Produce;
 import org.apache.camel.ProducerTemplate;
-import org.apache.camel.builder.Builder;
-import org.apache.camel.component.dynamicrouter.message.DynamicRouterControlMessage;
-import org.apache.camel.component.dynamicrouter.message.DynamicRouterControlMessage.SubscribeMessageBuilder;
+import org.apache.camel.component.dynamicrouter.DynamicRouterControlMessage;
+import org.apache.camel.component.dynamicrouter.DynamicRouterControlMessage.SubscribeMessageBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.spring.junit5.CamelSpringTest;
 import org.junit.jupiter.api.BeforeEach;
@@ -47,7 +45,7 @@ import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.CO
 @CamelSpringTest
 @ContextConfiguration
 @DirtiesContext(classMode = ClassMode.AFTER_EACH_TEST_METHOD)
-public class DynamicRouterSingleRouteTwoParticipantsIT {
+public class DynamicRouterMultipleRecipientModeIT {
 
     @Autowired
     CamelContext camelContext;
@@ -107,39 +105,20 @@ public class DynamicRouterSingleRouteTwoParticipantsIT {
     }
 
     /**
-     * This test demonstrates how a stream of incoming exchanges will be routed to different subscribers based on their
-     * rules. Notice the use of {@link Builder#body()} to create the participant routing rule {@link Predicate}s.
+     * This test shows what happens when there are multiple participants that might have overlapping rules. When the
+     * dynamic router is in "allMatch" mode, then every participant should receive all exchanges that match their filter
+     * predicate.
      *
      * @throws InterruptedException if interrupted while waiting for mocks to be satisfied
      */
     @Test
-    void testConsumersWithNonConflictingRules() throws InterruptedException {
+    void testMultipleMatchingParticipants() throws InterruptedException {
         mockOne.expectedBodiesReceived(0, 2, 4, 6, 8, 10);
         mockTwo.expectedBodiesReceived(1, 3, 5, 7, 9);
-        mockThree.setExpectedCount(0);
-
-        // Subscribe for even and odd numeric message content so that all messages
-        // are received, and neither participant has conflicting rules with each other
-        subscribe(Arrays.asList(evenSubscribeMsg, oddSubscribeMsg));
-        sendMessagesAndAssert();
-    }
-
-    /**
-     * This test shows what happens when there are conflicting rules. The first matching subscriber wins. When two
-     * subscribers have registered at the same priority level, and the predicates match for both, then it is not clearly
-     * determined which subscriber will receive the exchange.
-     *
-     * @throws InterruptedException if interrupted while waiting for mocks to be satisfied
-     */
-    @Test
-    void testConsumersWithConflictingRules() throws InterruptedException {
-        mockOne.setExpectedCount(0);
-        mockTwo.setExpectedCount(0);
         mockThree.expectedBodiesReceived(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
 
-        // Subscribe for all numeric message content, and odd numeric message content so that
-        // the participant that wants all message content conflicts with everyone else.  Since
-        // that subscription has the highest priority (lowest number), it will receive all
+        // Subscribe for all numeric message content to verify that in "allMatch" mode,
+        // every participant receives all messages that pertain to them
         subscribe(Arrays.asList(allSubscribeMsg, evenSubscribeMsg, oddSubscribeMsg));
         sendMessagesAndAssert();
     }
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterSingleRouteTwoParticipantsIT.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterSingleRouteTwoParticipantsIT.java
index db45b5c..60e0a1d 100644
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterSingleRouteTwoParticipantsIT.java
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterSingleRouteTwoParticipantsIT.java
@@ -27,8 +27,8 @@ import org.apache.camel.Predicate;
 import org.apache.camel.Produce;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.Builder;
-import org.apache.camel.component.dynamicrouter.message.DynamicRouterControlMessage;
-import org.apache.camel.component.dynamicrouter.message.DynamicRouterControlMessage.SubscribeMessageBuilder;
+import org.apache.camel.component.dynamicrouter.DynamicRouterControlMessage;
+import org.apache.camel.component.dynamicrouter.DynamicRouterControlMessage.SubscribeMessageBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.spring.junit5.CamelSpringTest;
 import org.junit.jupiter.api.BeforeEach;
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterTwoRoutesIT.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterTwoRoutesIT.java
index e87142d..192617b 100644
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterTwoRoutesIT.java
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterTwoRoutesIT.java
@@ -23,8 +23,8 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.Produce;
 import org.apache.camel.ProducerTemplate;
-import org.apache.camel.component.dynamicrouter.message.DynamicRouterControlMessage;
-import org.apache.camel.component.dynamicrouter.message.DynamicRouterControlMessage.SubscribeMessageBuilder;
+import org.apache.camel.component.dynamicrouter.DynamicRouterControlMessage;
+import org.apache.camel.component.dynamicrouter.DynamicRouterControlMessage.SubscribeMessageBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.spring.junit5.CamelSpringTest;
 import org.junit.jupiter.api.Test;
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterUriControlIT.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterUriControlIT.java
new file mode 100644
index 0000000..63601e9
--- /dev/null
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterUriControlIT.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.dynamicrouter.integration;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.Predicate;
+import org.apache.camel.builder.PredicateBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.dynamicrouter.DynamicRouterComponent;
+import org.apache.camel.component.dynamicrouter.DynamicRouterControlMessage;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.COMPONENT_SCHEME;
+import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.CONTROL_ACTION_SUBSCRIBE;
+import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.CONTROL_ACTION_UNSUBSCRIBE;
+import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.CONTROL_CHANNEL_URI;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * This test verifies basic functionality with the Dynamic Router in synchronous mode. Instead of using
+ * {@link DynamicRouterControlMessage}s for subscribing and unsubscribing, this test uses the control channel URI.
+ */
+public class DynamicRouterUriControlIT extends CamelTestSupport {
+
+    /**
+     * Tests participant subscription, and that messages are received at their registered destination endpoints.
+     *
+     * @throws Exception if interrupted while waiting for mocks to be satisfied
+     */
+    @Test
+    public void testSubscribeWithUri() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMinimumMessageCount(1);
+
+        String subscribeUri = createControlChannelUri(CONTROL_ACTION_SUBSCRIBE, "test", "testSubscriptionId",
+                mock.getEndpointUri(), 1, "${body} contains 'test'");
+
+        template.sendBody(subscribeUri, "");
+
+        // Trigger events to subscribers
+        template.sendBody("direct:start", "testMessage");
+
+        MockEndpoint.assertIsSatisfied(context, 5, TimeUnit.SECONDS);
+    }
+
+    /**
+     * Tests participant subscription, and that messages are received at their registered destination endpoints.
+     * Subscription ID is omitted to check generated ID.
+     *
+     * @throws Exception if interrupted while waiting for mocks to be satisfied
+     */
+    @Test
+    public void testSubscribeWithUriAndWithoutSubscriptionId() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMinimumMessageCount(1);
+
+        String subscribeUri = createControlChannelUri(CONTROL_ACTION_SUBSCRIBE, "test", null,
+                mock.getEndpointUri(), 1, "${body} contains 'test'");
+
+        String generatedId = (String) template.sendBody(subscribeUri, ExchangePattern.InOut, "");
+
+        // Trigger events to subscribers
+        template.sendBody("direct:start", "testMessage");
+
+        assertTrue(generatedId.matches("[a-f0-9]{8}(-[a-f0-9]{4}){4}[a-f0-9]{8}"));
+
+        MockEndpoint.assertIsSatisfied(context, 5, TimeUnit.SECONDS);
+    }
+
+    /**
+     * Tests unsubscribe.
+     *
+     * @throws Exception if interrupted while waiting for mocks to be satisfied
+     */
+    @Test
+    void testUnsubscribe() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMinimumMessageCount(1);
+
+        // Initially subscribe
+        String subscribeUri = createControlChannelUri(CONTROL_ACTION_SUBSCRIBE, "test", "testId",
+                mock.getEndpointUri(), 1, "${body} contains 'test'");
+
+        template.sendBody(subscribeUri, "");
+        template.sendBody("direct:start", "testMessage");
+
+        // Subscription should lead to the mock endpoint receiving one message
+        MockEndpoint.assertIsSatisfied(context, 5, TimeUnit.SECONDS);
+
+        // Now unsubscribe
+        String unsubscribeUri = createControlChannelUri(CONTROL_ACTION_UNSUBSCRIBE, "test", "testId",
+                null, null, null);
+        template.sendBody(unsubscribeUri, "");
+
+        // Sends another message that should not be received
+        template.sendBody("direct:start", "testMessage");
+
+        // The unsubscribe message should mean that the expected count
+        // of received messages to remain unchanged
+        MockEndpoint.assertIsSatisfied(context, 5, TimeUnit.SECONDS);
+    }
+
+    @Test
+    void testSubscribeUriWithBodyPredicate() throws InterruptedException {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMinimumMessageCount(1);
+
+        Predicate predicate = PredicateBuilder.constant(true);
+        String subscribeUri = createControlChannelUri(CONTROL_ACTION_SUBSCRIBE, "test", "testSubscriptionId",
+                mock.getEndpointUri(), 1, null);
+
+        template.sendBody(subscribeUri, predicate);
+
+        // Trigger events to subscribers
+        template.sendBody("direct:start", "testMessage");
+
+        MockEndpoint.assertIsSatisfied(context, 5, TimeUnit.SECONDS);
+    }
+
+    @Test
+    void testSubscribeUriWithBeanPredicate() throws InterruptedException {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMinimumMessageCount(1);
+
+        context.getRegistry().bind("predicateBean", Predicate.class, PredicateBuilder.constant(true));
+        String subscribeUri = createControlChannelUri(CONTROL_ACTION_SUBSCRIBE, "test", "testSubscriptionId",
+                mock.getEndpointUri(), 1, "#bean:predicateBean");
+
+        template.sendBody(subscribeUri, null);
+
+        // Trigger events to subscribers
+        template.sendBody("direct:start", "testMessage");
+
+        MockEndpoint.assertIsSatisfied(context, 5, TimeUnit.SECONDS);
+    }
+
+    String createControlChannelUri(
+            final String action,
+            final String subscribeChannel,
+            final String subscriptionId,
+            final String endpointUri,
+            final Integer priority,
+            final String predicate) {
+        StringBuilder builder = new StringBuilder(
+                String.format("%s/%s/%s",
+                        CONTROL_CHANNEL_URI, action, subscribeChannel));
+        if (subscriptionId != null) {
+            builder.append("&subscriptionId=").append(subscriptionId);
+        }
+        if (endpointUri != null) {
+            builder.append("&destinationUri=").append(endpointUri);
+        }
+        if (priority != null) {
+            builder.append("&priority=").append(priority);
+        }
+        if (predicate != null) {
+            builder.append(
+                    predicate.startsWith("#bean:") ? "&predicateBean=" : "&predicate=")
+                    .append(predicate);
+        }
+        String uriString = builder.toString();
+        if (uriString.contains("&") && !uriString.contains("?")) {
+            uriString = uriString.replaceFirst("&", "?");
+        }
+        return uriString;
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        ExtendedCamelContext context = super.createCamelContext().adapt(ExtendedCamelContext.class);
+        context.addComponent(COMPONENT_SCHEME, new DynamicRouterComponent());
+        return context;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:start").to("dynamic-router://test?synchronous=true");
+            }
+        };
+    }
+}
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/support/CamelDynamicRouterTestSupport.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/support/DynamicRouterTestSupport.java
similarity index 54%
rename from components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/support/CamelDynamicRouterTestSupport.java
rename to components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/support/DynamicRouterTestSupport.java
index cf016a7..3deffb2 100644
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/support/CamelDynamicRouterTestSupport.java
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/support/DynamicRouterTestSupport.java
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.component.dynamicrouter.support;
 
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.function.Supplier;
 
 import org.apache.camel.AsyncCallback;
@@ -26,14 +29,23 @@ import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.component.dynamicrouter.DynamicRouterComponent;
-import org.apache.camel.component.dynamicrouter.DynamicRouterConsumer;
+import org.apache.camel.component.dynamicrouter.DynamicRouterConfiguration;
 import org.apache.camel.component.dynamicrouter.DynamicRouterControlChannelProcessor;
+import org.apache.camel.component.dynamicrouter.DynamicRouterControlChannelProcessor.DynamicRouterControlChannelProcessorFactory;
+import org.apache.camel.component.dynamicrouter.DynamicRouterControlMessage;
+import org.apache.camel.component.dynamicrouter.DynamicRouterControlProducer;
+import org.apache.camel.component.dynamicrouter.DynamicRouterControlProducer.DynamicRouterControlProducerFactory;
 import org.apache.camel.component.dynamicrouter.DynamicRouterEndpoint;
+import org.apache.camel.component.dynamicrouter.DynamicRouterEndpoint.DynamicRouterEndpointFactory;
+import org.apache.camel.component.dynamicrouter.DynamicRouterProcessor;
+import org.apache.camel.component.dynamicrouter.DynamicRouterProcessor.DynamicRouterProcessorFactory;
 import org.apache.camel.component.dynamicrouter.DynamicRouterProducer;
-import org.apache.camel.component.dynamicrouter.message.DynamicRouterControlMessage;
-import org.apache.camel.component.dynamicrouter.processor.DynamicRouterProcessor;
-import org.apache.camel.component.dynamicrouter.processor.PrioritizedFilterProcessor;
+import org.apache.camel.component.dynamicrouter.DynamicRouterProducer.DynamicRouterProducerFactory;
+import org.apache.camel.component.dynamicrouter.PrioritizedFilterProcessor;
+import org.apache.camel.component.dynamicrouter.PrioritizedFilterProcessor.PrioritizedFilterProcessorFactory;
+import org.apache.camel.language.simple.SimpleLanguage;
 import org.apache.camel.spi.ExchangeFactory;
+import org.apache.camel.spi.ExecutorServiceManager;
 import org.apache.camel.support.builder.PredicateBuilder;
 import org.apache.camel.test.junit5.CamelTestSupport;
 import org.junit.jupiter.api.BeforeEach;
@@ -52,13 +64,14 @@ import static org.mockito.Mockito.lenient;
  * make the tests cleaner and easier to follow.
  */
 @ExtendWith(MockitoExtension.class)
-public class CamelDynamicRouterTestSupport extends CamelTestSupport {
+public class DynamicRouterTestSupport extends CamelTestSupport {
 
     public static final String DYNAMIC_ROUTER_CHANNEL = "test";
     public static final String BASE_URI = String.format("%s:%s", COMPONENT_SCHEME, DYNAMIC_ROUTER_CHANNEL);
     public static final String PROCESSOR_ID = "testProcessorId";
-    public static final String MESSAGE_ID = "testId";
-    public static final long TIMEOUT = 30000L;
+    public static final String TEST_ID = "testId";
+    public static final int TEST_PRIORITY = 10;
+    public static final String TEST_PREDICATE = "testPredicate";
 
     @Mock
     protected ExtendedCamelContext context;
@@ -67,6 +80,18 @@ public class CamelDynamicRouterTestSupport extends CamelTestSupport {
     protected ExchangeFactory exchangeFactory;
 
     @Mock
+    protected ExecutorServiceManager executorServiceManager;
+
+    @Mock
+    protected ExecutorService executorService;
+
+    @Mock
+    protected Future<?> booleanFuture;
+
+    @Mock
+    protected DynamicRouterConfiguration configuration;
+
+    @Mock
     protected DynamicRouterComponent component;
 
     @Mock
@@ -79,10 +104,13 @@ public class CamelDynamicRouterTestSupport extends CamelTestSupport {
     protected DynamicRouterControlChannelProcessor controlChannelProcessor;
 
     @Mock
+    protected PrioritizedFilterProcessor prioritizedFilterProcessor;
+
+    @Mock
     protected DynamicRouterProducer producer;
 
     @Mock
-    protected DynamicRouterConsumer consumer;
+    protected DynamicRouterControlProducer controlProducer;
 
     @Mock
     protected PrioritizedFilterProcessor filterProcessor;
@@ -96,15 +124,22 @@ public class CamelDynamicRouterTestSupport extends CamelTestSupport {
     @Mock
     protected Exchange exchange;
 
+    @Mock
+    protected SimpleLanguage simpleLanguage;
+
+    @Mock
+    protected Predicate predicate;
+
     // Since most pieces of the Dynamic Router are instantiated by calling factories,
     // this provides greatly simplified testing of all components without extensive
     // mocking or entangling of external units
-    protected DynamicRouterEndpoint.DynamicRouterEndpointFactory endpointFactory;
-    protected DynamicRouterProcessor.DynamicRouterProcessorFactory processorFactory;
-    protected DynamicRouterControlChannelProcessor.DynamicRouterControlChannelProcessorFactory controlChannelProcessorFactory;
-    protected DynamicRouterProducer.DynamicRouterProducerFactory producerFactory;
-    protected DynamicRouterConsumer.DynamicRouterConsumerFactory consumerFactory;
-    protected PrioritizedFilterProcessor.PrioritizedFilterProcessorFactory filterProcessorFactory;
+    protected DynamicRouterEndpointFactory endpointFactory;
+    protected DynamicRouterProcessorFactory processorFactory;
+    protected DynamicRouterControlChannelProcessorFactory controlChannelProcessorFactory;
+    protected PrioritizedFilterProcessorFactory prioritizedFilterProcessorFactory;
+    protected DynamicRouterProducerFactory producerFactory;
+    protected DynamicRouterControlProducerFactory controlProducerFactory;
+    protected PrioritizedFilterProcessorFactory filterProcessorFactory;
 
     /**
      * Sets up lenient mocking so that regular behavior "just happens" in tests, but each test can customize behavior
@@ -117,82 +152,100 @@ public class CamelDynamicRouterTestSupport extends CamelTestSupport {
     protected void setup() throws Exception {
         super.setUp();
 
+        lenient().when(configuration.getChannel()).thenReturn(DYNAMIC_ROUTER_CHANNEL);
+
         lenient().when(exchangeFactory.newExchangeFactory(any(Consumer.class))).thenReturn(exchangeFactory);
 
-        lenient().when(consumer.getProcessor()).thenReturn(processor);
+        lenient().when(prioritizedFilterProcessor.getId()).thenReturn("testId");
+        lenient().when(prioritizedFilterProcessor.getPriority()).thenReturn(TEST_PRIORITY);
 
         lenient().doNothing().when(processor).process(any(Exchange.class));
 
         lenient().when(component.getCamelContext()).thenReturn(context);
-        lenient().when(component.getConsumer(anyString())).thenReturn(consumer);
+        lenient().when(component.getRoutingProcessor(anyString())).thenReturn(processor);
 
         lenient().when(endpoint.getCamelContext()).thenReturn(context);
         lenient().when(endpoint.getComponent()).thenReturn(component);
         lenient().when(endpoint.getDynamicRouterComponent()).thenReturn(component);
-        lenient().when(endpoint.getChannel()).thenReturn(DYNAMIC_ROUTER_CHANNEL);
-        lenient().when(endpoint.getTimeout()).thenReturn(TIMEOUT);
-        lenient().when(endpoint.isFailIfNoConsumers()).thenReturn(false);
-        lenient().when(endpoint.isBlock()).thenReturn(true);
+        lenient().when(endpoint.getConfiguration()).thenReturn(configuration);
 
         lenient().when(context.adapt(ExtendedCamelContext.class)).thenReturn(context);
         lenient().when(context.getExchangeFactory()).thenReturn(exchangeFactory);
+        lenient().when(context.resolveLanguage("simple")).thenReturn(simpleLanguage);
+        lenient().when(context.getExecutorServiceManager()).thenReturn(executorServiceManager);
+
+        lenient().when(executorServiceManager.newDefaultThreadPool(any(DynamicRouterProcessor.class), anyString()))
+                .thenReturn(executorService);
+
+        lenient().when(executorService.submit(any(Callable.class))).thenReturn(booleanFuture);
+
+        lenient().when(predicate.toString()).thenReturn(TEST_PREDICATE);
 
-        lenient().when(filterProcessor.getId()).thenReturn(MESSAGE_ID);
+        lenient().when(simpleLanguage.createPredicate(anyString())).thenReturn(predicate);
+
+        lenient().when(filterProcessor.getId()).thenReturn(TEST_ID);
         lenient().when(filterProcessor.getPriority()).thenReturn(Integer.MAX_VALUE);
 
         lenient().doNothing().when(asyncCallback).done(anyBoolean());
 
-        lenient().when(controlMessage.getId()).thenReturn(MESSAGE_ID);
+        lenient().when(controlMessage.getId()).thenReturn(TEST_ID);
         lenient().when(controlMessage.getChannel()).thenReturn(DYNAMIC_ROUTER_CHANNEL);
         lenient().when(controlMessage.getPriority()).thenReturn(1);
         lenient().when(controlMessage.getPredicate()).thenReturn(PredicateBuilder.constant(true));
         lenient().when(controlMessage.getEndpoint()).thenReturn("test");
 
-        endpointFactory = new DynamicRouterEndpoint.DynamicRouterEndpointFactory() {
+        endpointFactory = new DynamicRouterEndpointFactory() {
             @Override
             public DynamicRouterEndpoint getInstance(
-                    String uri, String channel, DynamicRouterComponent component,
-                    Supplier<DynamicRouterProcessor.DynamicRouterProcessorFactory> processorFactorySupplier,
-                    Supplier<DynamicRouterProducer.DynamicRouterProducerFactory> producerFactorySupplier,
-                    Supplier<DynamicRouterConsumer.DynamicRouterConsumerFactory> consumerFactorySupplier,
-                    Supplier<PrioritizedFilterProcessor.PrioritizedFilterProcessorFactory> filterProcessorFactorySupplier) {
+                    String uri, DynamicRouterComponent component, DynamicRouterConfiguration configuration,
+                    Supplier<DynamicRouterProcessorFactory> processorFactorySupplier,
+                    Supplier<DynamicRouterProducerFactory> producerFactorySupplier,
+                    Supplier<PrioritizedFilterProcessorFactory> filterProcessorFactorySupplier) {
                 return endpoint;
             }
         };
 
-        processorFactory = new DynamicRouterProcessor.DynamicRouterProcessorFactory() {
+        processorFactory = new DynamicRouterProcessorFactory() {
             @Override
             public DynamicRouterProcessor getInstance(
-                    String id, CamelContext camelContext, boolean warnDroppedMessage,
-                    Supplier<PrioritizedFilterProcessor.PrioritizedFilterProcessorFactory> filterProcessorFactorySupplier) {
+                    String id, CamelContext camelContext, String recipientMode, boolean warnDroppedMessage,
+                    Supplier<PrioritizedFilterProcessorFactory> filterProcessorFactorySupplier) {
                 return processor;
             }
         };
 
         controlChannelProcessorFactory
-                = new DynamicRouterControlChannelProcessor.DynamicRouterControlChannelProcessorFactory() {
+                = new DynamicRouterControlChannelProcessorFactory() {
                     @Override
                     public DynamicRouterControlChannelProcessor getInstance(DynamicRouterComponent component) {
                         return controlChannelProcessor;
                     }
                 };
 
-        producerFactory = new DynamicRouterProducer.DynamicRouterProducerFactory() {
+        prioritizedFilterProcessorFactory = new PrioritizedFilterProcessorFactory() {
+            @Override
+            public PrioritizedFilterProcessor getInstance(
+                    String id, int priority, CamelContext context, Predicate predicate,
+                    Processor processor) {
+                return prioritizedFilterProcessor;
+            }
+        };
+
+        producerFactory = new DynamicRouterProducerFactory() {
             @Override
             public DynamicRouterProducer getInstance(DynamicRouterEndpoint endpoint) {
                 return producer;
             }
         };
 
-        consumerFactory = new DynamicRouterConsumer.DynamicRouterConsumerFactory() {
+        controlProducerFactory = new DynamicRouterControlProducerFactory() {
             @Override
-            public DynamicRouterConsumer getInstance(
-                    DynamicRouterEndpoint endpoint, Processor processor, String channel) {
-                return consumer;
+            public DynamicRouterControlProducer getInstance(DynamicRouterEndpoint endpoint) {
+                return controlProducer;
             }
         };
 
-        filterProcessorFactory = new PrioritizedFilterProcessor.PrioritizedFilterProcessorFactory() {
+        filterProcessorFactory = new PrioritizedFilterProcessorFactory() {
             @Override
             public PrioritizedFilterProcessor getInstance(
                     String id, int priority, CamelContext context, Predicate predicate, Processor processor) {
diff --git a/components/camel-dynamic-router/src/test/resources/org/apache/camel/component/dynamicrouter/integration/DynamicRouterTwoParticipantsIT-context.xml b/components/camel-dynamic-router/src/test/resources/org/apache/camel/component/dynamicrouter/integration/DynamicRouterMultipleRecipientModeIT-context.xml
similarity index 93%
copy from components/camel-dynamic-router/src/test/resources/org/apache/camel/component/dynamicrouter/integration/DynamicRouterTwoParticipantsIT-context.xml
copy to components/camel-dynamic-router/src/test/resources/org/apache/camel/component/dynamicrouter/integration/DynamicRouterMultipleRecipientModeIT-context.xml
index 97583ff..433a221 100644
--- a/components/camel-dynamic-router/src/test/resources/org/apache/camel/component/dynamicrouter/integration/DynamicRouterTwoParticipantsIT-context.xml
+++ b/components/camel-dynamic-router/src/test/resources/org/apache/camel/component/dynamicrouter/integration/DynamicRouterMultipleRecipientModeIT-context.xml
@@ -28,7 +28,7 @@
     <camelContext id="camelContext" xmlns="http://camel.apache.org/schema/spring">
         <route>
             <from uri="direct:start"/>
-            <to uri="dynamic-router:test"/>
+            <to uri="dynamic-router:test?recipientMode=allMatch"/>
         </route>
     </camelContext>
 </beans>
diff --git a/components/camel-dynamic-router/src/test/resources/org/apache/camel/component/dynamicrouter/integration/DynamicRouterTwoParticipantsIT-context.xml b/components/camel-dynamic-router/src/test/resources/org/apache/camel/component/dynamicrouter/integration/DynamicRouterSingleRouteTwoParticipantsIT-context.xml
similarity index 100%
rename from components/camel-dynamic-router/src/test/resources/org/apache/camel/component/dynamicrouter/integration/DynamicRouterTwoParticipantsIT-context.xml
rename to components/camel-dynamic-router/src/test/resources/org/apache/camel/component/dynamicrouter/integration/DynamicRouterSingleRouteTwoParticipantsIT-context.xml