You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2024/01/17 15:02:23 UTC
(camel) branch main updated: CAMEL-00000: Code quality improvements and more test coverage. (#12798)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 2ff032238e6 CAMEL-00000: Code quality improvements and more test coverage. (#12798)
2ff032238e6 is described below
commit 2ff032238e611aec390d54199a30fc0eb1f4a0eb
Author: Steve Storck <st...@gmail.com>
AuthorDate: Wed Jan 17 10:02:16 2024 -0500
CAMEL-00000: Code quality improvements and more test coverage. (#12798)
---
...ynamicRouterControlChannelSendDynamicAware.java | 4 -
.../control/DynamicRouterControlComponent.java | 38 +-
.../control/DynamicRouterControlConfiguration.java | 5 +-
.../control/DynamicRouterControlConstants.java | 6 -
.../control/DynamicRouterControlEndpoint.java | 95 +++--
.../control/DynamicRouterControlMessage.java | 28 +-
.../control/DynamicRouterControlProducer.java | 3 +
.../control/DynamicRouterControlService.java | 4 +-
.../filter/DynamicRouterFilterService.java | 12 +-
.../dynamicrouter/filter/PrioritizedFilter.java | 5 +
.../filter/PrioritizedFilterStatistics.java | 55 +++
.../routing/DynamicRouterComponent.java | 18 +-
.../routing/DynamicRouterConfiguration.java | 160 ++++++++
.../routing/DynamicRouterConstants.java | 8 +
.../routing/DynamicRouterEndpoint.java | 20 +-
.../routing/DynamicRouterProcessor.java | 25 +-
.../routing/DynamicRouterRecipientListHelper.java | 94 +++--
...icRouterControlChannelSendDynamicAwareTest.java | 33 ++
.../control/DynamicRouterControlComponentTest.java | 8 +-
.../control/DynamicRouterControlEndpointTest.java | 8 +-
.../control/DynamicRouterControlProducerTest.java | 123 +++++-
.../control/DynamicRouterControlServiceTest.java | 11 +-
.../filter/DynamicRouterFilterServiceTest.java | 68 ++-
.../routing/DynamicRouterComponentTest.java | 14 +-
.../routing/DynamicRouterEndpointTest.java | 24 +-
.../routing/DynamicRouterProcessorTest.java | 11 +-
.../DynamicRouterRecipientListHelperTest.java | 457 +++++++++++++++++++++
.../integration/DynamicRouterJmxIT-context.xml | 6 +
28 files changed, 1172 insertions(+), 171 deletions(-)
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlChannelSendDynamicAware.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlChannelSendDynamicAware.java
index 2bb1d34794a..13662241b27 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlChannelSendDynamicAware.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlChannelSendDynamicAware.java
@@ -30,7 +30,6 @@ import org.apache.camel.util.URISupport;
import static org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.COMPONENT_SCHEME_CONTROL;
import static org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_ACTION_PROPERTY;
-import static org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.PROTOCOL_SUFFIX;
/**
* A {@link SendDynamicAwareSupport} implementation to process control channel messages for the Dynamic Router.
@@ -78,9 +77,6 @@ public class DynamicRouterControlChannelSendDynamicAware extends SendDynamicAwar
Map<String, Object> properties = endpointProperties(exchange, uri);
URI normalizedUri = URISupport.normalizeUriAsURI(uri);
String controlAction = URISupport.extractRemainderPath(normalizedUri, false);
- if (controlAction.contains(PROTOCOL_SUFFIX)) {
- controlAction = controlAction.substring(controlAction.indexOf(PROTOCOL_SUFFIX) + 3);
- }
properties.put(CONTROL_ACTION_PROPERTY, controlAction);
return new DynamicAwareEntry(uri, originalUri, properties, null);
}
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlComponent.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlComponent.java
index fc49915249b..8c79b6d09be 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlComponent.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlComponent.java
@@ -17,22 +17,16 @@
package org.apache.camel.component.dynamicrouter.control;
import java.util.Map;
-import java.util.Optional;
import java.util.function.Supplier;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
-import org.apache.camel.component.dynamicrouter.control.DynamicRouterControlService.DynamicRouterControlServiceFactory;
-import org.apache.camel.component.dynamicrouter.filter.DynamicRouterFilterService;
-import org.apache.camel.component.dynamicrouter.routing.DynamicRouterComponent;
import org.apache.camel.spi.annotations.Component;
import org.apache.camel.support.DefaultComponent;
import static org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.COMPONENT_SCHEME_CONTROL;
import static org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_ENDPOINT_FACTORY_SUPPLIER;
-import static org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_SERVICE_FACTORY_SUPPLIER;
import static org.apache.camel.component.dynamicrouter.control.DynamicRouterControlEndpoint.DynamicRouterControlEndpointFactory;
-import static org.apache.camel.component.dynamicrouter.routing.DynamicRouterConstants.COMPONENT_SCHEME_ROUTING;
/**
* The component for the Dynamic router control operations that allow routing participants to subscribe or unsubscribe
@@ -47,16 +41,11 @@ public class DynamicRouterControlComponent extends DefaultComponent {
*/
private final Supplier<DynamicRouterControlEndpointFactory> controlEndpointFactorySupplier;
- private final Supplier<DynamicRouterControlServiceFactory> controlServiceFactorySupplier;
-
- private DynamicRouterControlService controlService;
-
/**
* Default constructor to create the instance.
*/
public DynamicRouterControlComponent() {
this.controlEndpointFactorySupplier = CONTROL_ENDPOINT_FACTORY_SUPPLIER;
- this.controlServiceFactorySupplier = CONTROL_SERVICE_FACTORY_SUPPLIER;
}
/**
@@ -67,39 +56,18 @@ public class DynamicRouterControlComponent extends DefaultComponent {
public DynamicRouterControlComponent(CamelContext context) {
super(context);
this.controlEndpointFactorySupplier = CONTROL_ENDPOINT_FACTORY_SUPPLIER;
- this.controlServiceFactorySupplier = CONTROL_SERVICE_FACTORY_SUPPLIER;
}
/**
- * Create the instance.
+ * Create the instance, allowing the caller to specify the factory suppliers.
*
* @param context the {@link CamelContext}
* @param controlEndpointFactorySupplier the {@link Supplier<DynamicRouterControlEndpointFactory>}
- * @param controlServiceFactorySupplier the {@link Supplier<DynamicRouterControlServiceFactory>}
*/
public DynamicRouterControlComponent(CamelContext context,
- Supplier<DynamicRouterControlEndpointFactory> controlEndpointFactorySupplier,
- Supplier<DynamicRouterControlServiceFactory> controlServiceFactorySupplier) {
+ Supplier<DynamicRouterControlEndpointFactory> controlEndpointFactorySupplier) {
super(context);
this.controlEndpointFactorySupplier = controlEndpointFactorySupplier;
- this.controlServiceFactorySupplier = controlServiceFactorySupplier;
- }
-
- @Override
- protected void doStart() throws Exception {
- super.doStart();
- DynamicRouterFilterService filterService = Optional.ofNullable(getCamelContext()
- .getComponent(COMPONENT_SCHEME_ROUTING, DynamicRouterComponent.class))
- .map(DynamicRouterComponent::getFilterService)
- .orElseThrow(() -> new IllegalStateException("DynamicRouter component could not be found"));
- this.controlService = controlServiceFactorySupplier.get().getInstance(getCamelContext(), filterService);
- getCamelContext().addService(controlService);
- }
-
- @Override
- protected void doStop() throws Exception {
- getCamelContext().removeService(controlService);
- super.doStop();
}
/**
@@ -114,7 +82,7 @@ public class DynamicRouterControlComponent extends DefaultComponent {
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
DynamicRouterControlConfiguration configuration = new DynamicRouterControlConfiguration();
DynamicRouterControlEndpoint endpoint = controlEndpointFactorySupplier.get()
- .getInstance(uri, this, remaining, configuration, controlService);
+ .getInstance(uri, this, remaining, configuration);
setProperties(endpoint, parameters);
return endpoint;
}
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlConfiguration.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlConfiguration.java
index 9bacd349565..27a02fbe168 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlConfiguration.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlConfiguration.java
@@ -22,6 +22,9 @@ import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriParams;
import org.apache.camel.spi.UriPath;
+/**
+ * Configuration for the {@link DynamicRouterControlEndpoint}.
+ */
@UriParams
public class DynamicRouterControlConfiguration {
@@ -87,7 +90,7 @@ public class DynamicRouterControlConfiguration {
}
/**
- * The control action (subscribe or unsubscribe).
+ * The control action (subscribe or unsubscribe) that returns the default value of "subscribe" if null.
*
* @return the control action
*/
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlConstants.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlConstants.java
index 1108cc22ff2..c5516973453 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlConstants.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlConstants.java
@@ -124,12 +124,6 @@ public final class DynamicRouterControlConstants {
*/
public static final String ERROR_NO_PREDICATE_BEAN_FOUND = "Predicate bean could not be found";
- /**
- * The characters immediately following a protocol (or camel component name) in a URI. For the latter, there is
- * always a colon, but the two slashes do not always follow.
- */
- public static final String PROTOCOL_SUFFIX = "://";
-
/**
* The configuration property for the control channel action.
*/
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlEndpoint.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlEndpoint.java
index 30daa03795d..6fb009cbcb0 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlEndpoint.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlEndpoint.java
@@ -16,13 +16,18 @@
*/
package org.apache.camel.component.dynamicrouter.control;
+import java.util.Optional;
import java.util.function.Supplier;
+import org.apache.camel.CamelContext;
import org.apache.camel.Category;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.component.dynamicrouter.control.DynamicRouterControlProducer.DynamicRouterControlProducerFactory;
+import org.apache.camel.component.dynamicrouter.control.DynamicRouterControlService.DynamicRouterControlServiceFactory;
+import org.apache.camel.component.dynamicrouter.filter.DynamicRouterFilterService;
+import org.apache.camel.component.dynamicrouter.routing.DynamicRouterComponent;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
@@ -36,9 +41,11 @@ import static org.apache.camel.component.dynamicrouter.control.DynamicRouterCont
import static org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_ACTION_UNSUBSCRIBE;
import static org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_ACTION_UPDATE;
import static org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_PRODUCER_FACTORY_SUPPLIER;
+import static org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_SERVICE_FACTORY_SUPPLIER;
import static org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.FIRST_VERSION_CONTROL;
import static org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.SYNTAX_CONTROL;
import static org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.TITLE_CONTROL;
+import static org.apache.camel.component.dynamicrouter.routing.DynamicRouterConstants.COMPONENT_SCHEME_ROUTING;
/**
* The Dynamic Router control endpoint for operations that allow routing participants to subscribe or unsubscribe to
@@ -68,14 +75,21 @@ public class DynamicRouterControlEndpoint extends DefaultEndpoint {
private final DynamicRouterControlConfiguration configuration;
/**
- * Service that responds to control messages.
+ * Creates a {@link DynamicRouterControlProducer} instance.
*/
- private final DynamicRouterControlService controlService;
+ private final Supplier<DynamicRouterControlProducerFactory> controlProducerFactorySupplier;
/**
- * Creates a {@link DynamicRouterControlProducer} instance.
+ * The {@link Supplier<DynamicRouterControlServiceFactory>} to get an instance of the
+ * {@link DynamicRouterControlServiceFactory}. Using this supplier facilitates things like testing, where it
+ * simplifies swapping out another factory implementation.
*/
- private final Supplier<DynamicRouterControlProducerFactory> controlProducerFactorySupplier;
+ private final Supplier<DynamicRouterControlServiceFactory> controlServiceFactorySupplier;
+
+ /**
+ * Service that responds to control messages.
+ */
+ private DynamicRouterControlService controlService;
/**
* Creates the instance.
@@ -85,40 +99,62 @@ public class DynamicRouterControlEndpoint extends DefaultEndpoint {
* routing participants
* @param controlAction the control action of the endpoint
* @param configuration the component/endpoint configuration
- * @param controlService the {@link DynamicRouterControlService}
* @param controlProducerFactorySupplier the {@link DynamicRouterControlProducerFactory} supplier
+ * @param controlServiceFactorySupplier the {@link DynamicRouterControlServiceFactory} supplier
*/
public DynamicRouterControlEndpoint(String uri, DynamicRouterControlComponent component, String controlAction,
DynamicRouterControlConfiguration configuration,
- DynamicRouterControlService controlService,
- Supplier<DynamicRouterControlProducerFactory> controlProducerFactorySupplier) {
+ Supplier<DynamicRouterControlProducerFactory> controlProducerFactorySupplier,
+ Supplier<DynamicRouterControlServiceFactory> controlServiceFactorySupplier) {
super(uri, component);
this.controlAction = controlAction;
this.configuration = configuration;
this.configuration.setControlAction(controlAction);
- this.controlService = controlService;
this.controlProducerFactorySupplier = controlProducerFactorySupplier;
+ this.controlServiceFactorySupplier = controlServiceFactorySupplier;
}
/**
* Creates the instance.
*
- * @param uri the URI that was used to cause the endpoint creation
- * @param component the routing component to handle management of subscriber information from routing
- * participants
- * @param controlAction the control action of the endpoint
- * @param configuration the component/endpoint configuration
- * @param controlService the {@link DynamicRouterControlService}
+ * @param uri the URI that was used to cause the endpoint creation
+ * @param component the routing component to handle management of subscriber information from routing
+ * participants
+ * @param controlAction the control action of the endpoint
+ * @param configuration the component/endpoint configuration
*/
public DynamicRouterControlEndpoint(String uri, DynamicRouterControlComponent component, String controlAction,
- DynamicRouterControlConfiguration configuration,
- DynamicRouterControlService controlService) {
+ DynamicRouterControlConfiguration configuration) {
super(uri, component);
this.controlAction = controlAction;
this.configuration = configuration;
this.configuration.setControlAction(controlAction);
- this.controlService = controlService;
this.controlProducerFactorySupplier = CONTROL_PRODUCER_FACTORY_SUPPLIER;
+ this.controlServiceFactorySupplier = CONTROL_SERVICE_FACTORY_SUPPLIER;
+ }
+
+ /**
+ * Starts the component, and creates the {@link DynamicRouterControlService} and adds it to the
+ * {@link CamelContext}.
+ */
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ DynamicRouterFilterService filterService = Optional.ofNullable(getCamelContext()
+ .getComponent(COMPONENT_SCHEME_ROUTING, DynamicRouterComponent.class))
+ .map(DynamicRouterComponent::getFilterService)
+ .orElseThrow(() -> new IllegalStateException("DynamicRouter component could not be found"));
+ this.controlService = controlServiceFactorySupplier.get().getInstance(getCamelContext(), filterService);
+ getCamelContext().addService(controlService);
+ }
+
+ /**
+ * Stops the component, and removes the {@link DynamicRouterControlService} from the {@link CamelContext}.
+ */
+ @Override
+ protected void doStop() throws Exception {
+ getCamelContext().removeService(controlService);
+ super.doStop();
}
@Override
@@ -175,20 +211,18 @@ public class DynamicRouterControlEndpoint extends DefaultEndpoint {
/**
* Gets an instance of a {@link DynamicRouterControlEndpoint}.
*
- * @param uri the URI that was used to trigger the creation of the endpoint
- * @param component the {@link DynamicRouterControlComponent}
- * @param controlAction the control action of the endpoint
- * @param configuration the {@link DynamicRouterControlConfiguration}
- * @param controlService the {@link DynamicRouterControlService}
- * @return the {@link DynamicRouterControlEndpoint}
+ * @param uri the URI that was used to trigger the creation of the endpoint
+ * @param component the {@link DynamicRouterControlComponent}
+ * @param controlAction the control action of the endpoint
+ * @param configuration the {@link DynamicRouterControlConfiguration}
+ * @return the {@link DynamicRouterControlEndpoint}
*/
public DynamicRouterControlEndpoint getInstance(
final String uri,
final DynamicRouterControlComponent component,
final String controlAction,
- final DynamicRouterControlConfiguration configuration,
- final DynamicRouterControlService controlService) {
- return new DynamicRouterControlEndpoint(uri, component, controlAction, configuration, controlService);
+ final DynamicRouterControlConfiguration configuration) {
+ return new DynamicRouterControlEndpoint(uri, component, controlAction, configuration);
}
/**
@@ -198,8 +232,8 @@ public class DynamicRouterControlEndpoint extends DefaultEndpoint {
* @param component the {@link DynamicRouterControlComponent}
* @param controlAction the control action of the endpoint
* @param configuration the {@link DynamicRouterControlConfiguration}
- * @param controlService the {@link DynamicRouterControlService}
* @param controlProducerFactorySupplier the {@link DynamicRouterControlProducerFactory} supplier
+ * @param controlServiceFactorySupplier the {@link DynamicRouterControlServiceFactory} supplier
* @return the {@link DynamicRouterControlEndpoint}
*/
public DynamicRouterControlEndpoint getInstance(
@@ -207,10 +241,11 @@ public class DynamicRouterControlEndpoint extends DefaultEndpoint {
final DynamicRouterControlComponent component,
final String controlAction,
final DynamicRouterControlConfiguration configuration,
- final DynamicRouterControlService controlService,
- final Supplier<DynamicRouterControlProducerFactory> controlProducerFactorySupplier) {
+ final Supplier<DynamicRouterControlProducerFactory> controlProducerFactorySupplier,
+ final Supplier<DynamicRouterControlServiceFactory> controlServiceFactorySupplier) {
return new DynamicRouterControlEndpoint(
- uri, component, controlAction, configuration, controlService, controlProducerFactorySupplier);
+ uri, component, controlAction, configuration, controlProducerFactorySupplier,
+ controlServiceFactorySupplier);
}
}
}
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlMessage.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlMessage.java
index 46836fcb379..55c801a0d7d 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlMessage.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlMessage.java
@@ -60,12 +60,23 @@ public final class DynamicRouterControlMessage {
*/
private String expressionLanguage;
+ /**
+ * Default constructor for a new Dynamic Router control message without any values.
+ */
public DynamicRouterControlMessage() {
// Default constructor
}
/**
- * All-argument constructor. At a minimum, this must include
+ * Constructor for a new Dynamic Router control message with the given values.
+ *
+ * @param subscribeChannel the channel to subscribe to
+ * @param subscriptionId the subscription ID
+ * @param destinationUri the destination URI
+ * @param priority the subscription priority
+ * @param predicateBean the name of a predicate bean in the registry
+ * @param predicate the predicate expression
+ * @param expressionLanguage the subscription predicate language
*/
public DynamicRouterControlMessage(String subscribeChannel, String subscriptionId,
String destinationUri, int priority, String predicateBean, String predicate,
@@ -80,6 +91,11 @@ public final class DynamicRouterControlMessage {
? "simple" : expressionLanguage;
}
+ /**
+ * Constructor for a new Dynamic Router control message with the given values.
+ *
+ * @param builder the {@link Builder} to construct the new {@link DynamicRouterControlMessage}.
+ */
private DynamicRouterControlMessage(Builder builder) {
subscribeChannel = builder.subscribeChannel;
subscriptionId = builder.subscriptionId;
@@ -174,9 +190,19 @@ public final class DynamicRouterControlMessage {
private String expressionLanguage;
+ /**
+ * Instantiates a new {@code DynamicRouterControlMessage.Builder}.
+ */
private Builder() {
}
+ /**
+ * Returns a {@code DynamicRouterControlMessage.Builder} object that can be used to create a new
+ * {@code DynamicRouterControlMessage}.
+ *
+ * @return a {@code DynamicRouterControlMessage.Builder} object that can be used to create a new
+ * {@code DynamicRouterControlMessage}
+ */
public static Builder newBuilder() {
return new Builder();
}
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducer.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducer.java
index 35c6a7f8087..797052f56a1 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducer.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducer.java
@@ -49,6 +49,9 @@ import static org.apache.camel.component.dynamicrouter.control.DynamicRouterCont
*/
public class DynamicRouterControlProducer extends HeaderSelectorProducer {
+ /**
+ * The {@link DynamicRouterControlService} for the Dynamic Router.
+ */
private final DynamicRouterControlService dynamicRouterControlService;
/**
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlService.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlService.java
index 504b885e797..230084caafc 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlService.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlService.java
@@ -31,7 +31,6 @@ import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.component.dynamicrouter.filter.DynamicRouterFilterService;
import org.apache.camel.component.dynamicrouter.filter.PrioritizedFilter;
import org.apache.camel.component.dynamicrouter.filter.PrioritizedFilterStatistics;
-import org.apache.camel.spi.Language;
import org.apache.camel.support.CamelContextHelper;
import org.apache.camel.support.service.ServiceSupport;
@@ -78,8 +77,7 @@ public class DynamicRouterControlService extends ServiceSupport {
final String predExpression,
final String expressionLanguage) {
try {
- Language language = camelContext.resolveLanguage(expressionLanguage);
- return language.createPredicate(predExpression);
+ return camelContext.resolveLanguage(expressionLanguage).createPredicate(predExpression);
} catch (Exception e) {
String message = String.format(ERROR_INVALID_PREDICATE_EXPRESSION, expressionLanguage, predExpression);
throw new IllegalArgumentException(message, e);
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/filter/DynamicRouterFilterService.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/filter/DynamicRouterFilterService.java
index 2102ebcc805..8664959d3a5 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/filter/DynamicRouterFilterService.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/filter/DynamicRouterFilterService.java
@@ -62,7 +62,7 @@ public class DynamicRouterFilterService {
private final Map<String, List<PrioritizedFilterStatistics>> filterStatisticsMap = new ConcurrentHashMap<>();
/**
- * Creates a {@link PrioritizedFilter} instance.
+ * Supplier for the {@link PrioritizedFilterFactory} instance.
*/
private final Supplier<PrioritizedFilterFactory> filterFactorySupplier;
@@ -71,11 +71,21 @@ public class DynamicRouterFilterService {
LOG.debug("Created Dynamic Router component");
}
+ /**
+ * Constructor that allows the {@link PrioritizedFilterFactory} supplier to be specified.
+ *
+ * @param filterFactorySupplier the {@link PrioritizedFilterFactory} supplier
+ */
public DynamicRouterFilterService(final Supplier<PrioritizedFilterFactory> filterFactorySupplier) {
this.filterFactorySupplier = filterFactorySupplier;
LOG.debug("Created Dynamic Router component");
}
+ /**
+ * Initialize the filter list for the specified channel.
+ *
+ * @param channel channel to initialize filter list for
+ */
public void initializeChannelFilters(final String channel) {
filterMap.computeIfAbsent(channel, c -> new ConcurrentSkipListSet<>(DynamicRouterConstants.FILTER_COMPARATOR));
filterStatisticsMap.computeIfAbsent(channel, c -> Collections.synchronizedList(new ArrayList<>()));
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/filter/PrioritizedFilter.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/filter/PrioritizedFilter.java
index 111ed5529fc..2d2f17ba888 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/filter/PrioritizedFilter.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/filter/PrioritizedFilter.java
@@ -46,6 +46,11 @@ public record PrioritizedFilter(String id, int priority, Predicate predicate, St
return DynamicRouterConstants.FILTER_COMPARATOR.compare(this, other);
}
+ /**
+ * Create a string representation of this processor.
+ *
+ * @return the string representation of this processor
+ */
@Override
public String toString() {
return String.format("PrioritizedFilterProcessor [id: %s, priority: %s, predicate: %s, endpoint: %s]",
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/filter/PrioritizedFilterStatistics.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/filter/PrioritizedFilterStatistics.java
index f1de9df1875..d8192deccd4 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/filter/PrioritizedFilterStatistics.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/filter/PrioritizedFilterStatistics.java
@@ -18,16 +18,42 @@ package org.apache.camel.component.dynamicrouter.filter;
import java.util.concurrent.atomic.AtomicLong;
+/**
+ * Statistics for a {@link PrioritizedFilter} that tracks details that include:
+ * <ul>
+ * <li>the id of the filter</li>
+ * <li>the number of times the filter was invoked</li>
+ * <li>the first time the filter was invoked (as epoch milliseconds)</li>
+ * <li>the last time the filter was invoked (as epoch milliseconds)</li>
+ * </ul>
+ */
public class PrioritizedFilterStatistics {
+ /**
+ * The id of the filter.
+ */
private final String filterId;
+ /**
+ * The number of times the filter was invoked.
+ */
private final AtomicLong count;
+ /**
+ * The first time the filter was invoked (as epoch milliseconds).
+ */
private final AtomicLong first;
+ /**
+ * The last time the filter was invoked (as epoch milliseconds).
+ */
private final AtomicLong last;
+ /**
+ * Creates a new {@link PrioritizedFilterStatistics} instance with the given filter id.
+ *
+ * @param filterId the id of the filter that these statistics represent
+ */
public PrioritizedFilterStatistics(String filterId) {
this.filterId = filterId;
this.count = new AtomicLong(0);
@@ -35,10 +61,19 @@ public class PrioritizedFilterStatistics {
this.last = new AtomicLong(0);
}
+ /**
+ * Returns the id of the filter that these statistics represent.
+ *
+ * @return the id of the filter that these statistics represent
+ */
public String getFilterId() {
return filterId;
}
+ /**
+ * Increments the number of times the filter was invoked, and updates the first and last times the filter was
+ * invoked.
+ */
public void incrementCount() {
long now = System.currentTimeMillis();
if (count.incrementAndGet() == 1) {
@@ -47,18 +82,38 @@ public class PrioritizedFilterStatistics {
last.updateAndGet(v -> Math.max(v, now));
}
+ /**
+ * Returns the number of times the filter was invoked.
+ *
+ * @return the number of times the filter was invoked
+ */
public long getCount() {
return count.get();
}
+ /**
+ * Returns the first time the filter was invoked (as epoch milliseconds).
+ *
+ * @return the first time the filter was invoked (as epoch milliseconds)
+ */
public long getFirst() {
return first.get();
}
+ /**
+ * Returns the last time the filter was invoked (as epoch milliseconds).
+ *
+ * @return the last time the filter was invoked (as epoch milliseconds)
+ */
public long getLast() {
return last.get();
}
+ /**
+ * Returns a string representation of this statistics object.
+ *
+ * @return a string representation of this statistics object
+ */
@Override
public String toString() {
return String.format("PrioritizedFilterStatistics [id: %s, count: %d, first: %d, last: %d]",
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterComponent.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterComponent.java
index 6850be9155e..09113ba59a2 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterComponent.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterComponent.java
@@ -18,15 +18,19 @@ package org.apache.camel.component.dynamicrouter.routing;
import java.util.HashMap;
import java.util.Map;
+import java.util.function.BiFunction;
import java.util.function.Supplier;
+import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
+import org.apache.camel.Expression;
import org.apache.camel.component.dynamicrouter.filter.DynamicRouterFilterService;
import org.apache.camel.component.dynamicrouter.filter.DynamicRouterFilterService.DynamicRouterFilterServiceFactory;
import org.apache.camel.component.dynamicrouter.filter.PrioritizedFilter;
import org.apache.camel.component.dynamicrouter.filter.PrioritizedFilter.PrioritizedFilterFactory;
import org.apache.camel.component.dynamicrouter.routing.DynamicRouterProcessor.DynamicRouterProcessorFactory;
import org.apache.camel.component.dynamicrouter.routing.DynamicRouterProducer.DynamicRouterProducerFactory;
+import org.apache.camel.processor.RecipientList;
import org.apache.camel.spi.annotations.Component;
import org.apache.camel.support.DefaultComponent;
import org.apache.camel.support.service.ServiceHelper;
@@ -40,6 +44,7 @@ import static org.apache.camel.component.dynamicrouter.routing.DynamicRouterCons
import static org.apache.camel.component.dynamicrouter.routing.DynamicRouterConstants.FILTER_SERVICE_FACTORY_SUPPLIER;
import static org.apache.camel.component.dynamicrouter.routing.DynamicRouterConstants.PROCESSOR_FACTORY_SUPPLIER;
import static org.apache.camel.component.dynamicrouter.routing.DynamicRouterConstants.PRODUCER_FACTORY_SUPPLIER;
+import static org.apache.camel.component.dynamicrouter.routing.DynamicRouterConstants.RECIPIENT_LIST_SUPPLIER;
import static org.apache.camel.component.dynamicrouter.routing.DynamicRouterEndpoint.DynamicRouterEndpointFactory;
/**
@@ -75,6 +80,11 @@ public class DynamicRouterComponent extends DefaultComponent {
*/
private final Supplier<DynamicRouterProducerFactory> producerFactorySupplier;
+ /**
+ * Creates a {@link RecipientList} instance.
+ */
+ private final BiFunction<CamelContext, Expression, RecipientList> recipientListSupplier;
+
/**
* Service that manages {@link PrioritizedFilter}s for the Dynamic Router channels.
*/
@@ -87,6 +97,7 @@ public class DynamicRouterComponent extends DefaultComponent {
this.endpointFactorySupplier = ENDPOINT_FACTORY_SUPPLIER;
this.processorFactorySupplier = PROCESSOR_FACTORY_SUPPLIER;
this.producerFactorySupplier = PRODUCER_FACTORY_SUPPLIER;
+ this.recipientListSupplier = RECIPIENT_LIST_SUPPLIER;
this.filterService = FILTER_SERVICE_FACTORY_SUPPLIER.get().getInstance(FILTER_FACTORY_SUPPLIER);
LOG.debug("Created Dynamic Router component");
}
@@ -97,6 +108,7 @@ public class DynamicRouterComponent extends DefaultComponent {
* @param endpointFactorySupplier creates the {@link DynamicRouterEndpoint}
* @param processorFactorySupplier creates the {@link DynamicRouterProcessor}
* @param producerFactorySupplier creates the {@link DynamicRouterProducer}
+ * @param recipientListSupplier creates the {@link RecipientList}
* @param filterFactorySupplier creates the {@link PrioritizedFilter}
* @param filterServiceFactorySupplier creates the {@link DynamicRouterFilterService}
*/
@@ -104,11 +116,13 @@ public class DynamicRouterComponent extends DefaultComponent {
final Supplier<DynamicRouterEndpointFactory> endpointFactorySupplier,
final Supplier<DynamicRouterProcessorFactory> processorFactorySupplier,
final Supplier<DynamicRouterProducerFactory> producerFactorySupplier,
+ final BiFunction<CamelContext, Expression, RecipientList> recipientListSupplier,
final Supplier<PrioritizedFilterFactory> filterFactorySupplier,
final Supplier<DynamicRouterFilterServiceFactory> filterServiceFactorySupplier) {
this.endpointFactorySupplier = endpointFactorySupplier;
this.processorFactorySupplier = processorFactorySupplier;
this.producerFactorySupplier = producerFactorySupplier;
+ this.recipientListSupplier = recipientListSupplier;
this.filterService = filterServiceFactorySupplier.get().getInstance(filterFactorySupplier);
LOG.debug("Created Dynamic Router component");
}
@@ -131,8 +145,8 @@ public class DynamicRouterComponent extends DefaultComponent {
configuration.setChannel(remaining);
filterService.initializeChannelFilters(configuration.getChannel());
DynamicRouterEndpoint endpoint = endpointFactorySupplier.get()
- .getInstance(uri, this, configuration, processorFactorySupplier,
- producerFactorySupplier, filterService);
+ .getInstance(uri, this, configuration, processorFactorySupplier, producerFactorySupplier, recipientListSupplier,
+ filterService);
setProperties(endpoint, parameters);
return endpoint;
}
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterConfiguration.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterConfiguration.java
index 22132b9cd9e..f4c77b94f21 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterConfiguration.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterConfiguration.java
@@ -280,130 +280,290 @@ public class DynamicRouterConfiguration {
this.warnDroppedMessage = warnDroppedMessage;
}
+ /**
+ * Gets whether the multicast should be streaming or not.
+ *
+ * @return true if streaming, false otherwise
+ */
public boolean isStreaming() {
return streaming;
}
+ /**
+ * Sets whether the multicast should be streaming or not.
+ *
+ * @param streaming flag to set whether the multicast should be streaming or not
+ */
public void setStreaming(boolean streaming) {
this.streaming = streaming;
}
+ /**
+ * Gets whether invalid endpoints should be ignored.
+ *
+ * @return true if invalid endpoints should be ignored, false otherwise
+ */
public boolean isIgnoreInvalidEndpoints() {
return ignoreInvalidEndpoints;
}
+ /**
+ * Sets whether invalid endpoints should be ignored.
+ *
+ * @param ignoreInvalidEndpoints flag to set whether invalid endpoints should be ignored
+ */
public void setIgnoreInvalidEndpoints(boolean ignoreInvalidEndpoints) {
this.ignoreInvalidEndpoints = ignoreInvalidEndpoints;
}
+ /**
+ * Gets whether parallel processing is enabled.
+ *
+ * @return true if parallel processing is enabled, false otherwise
+ */
public boolean isParallelProcessing() {
return parallelProcessing;
}
+ /**
+ * Sets whether parallel processing is enabled.
+ *
+ * @param parallelProcessing flag to set whether parallel processing is enabled
+ */
public void setParallelProcessing(boolean parallelProcessing) {
this.parallelProcessing = parallelProcessing;
}
+ /**
+ * Gets whether parallel aggregation is enabled.
+ *
+ * @return true if parallel aggregation is enabled, false otherwise
+ */
public boolean isParallelAggregate() {
return parallelAggregate;
}
+ /**
+ * Sets whether parallel aggregation is enabled.
+ *
+ * @param parallelAggregate flag to set whether parallel aggregation is enabled
+ */
public void setParallelAggregate(boolean parallelAggregate) {
this.parallelAggregate = parallelAggregate;
}
+ /**
+ * Gets whether the multicast should stop on exception.
+ *
+ * @return true if the multicast should stop on exception, false otherwise
+ */
public boolean isStopOnException() {
return stopOnException;
}
+ /**
+ * Sets whether the multicast should stop on exception.
+ *
+ * @param stopOnException flag to set whether the multicast should stop on exception
+ */
public void setStopOnException(boolean stopOnException) {
this.stopOnException = stopOnException;
}
+ /**
+ * Gets the executor service.
+ *
+ * @return the executor service
+ */
public String getExecutorService() {
return executorService;
}
+ /**
+ * Sets the executor service.
+ *
+ * @param executorService the executor service
+ */
public void setExecutorService(String executorService) {
this.executorService = executorService;
}
+ /**
+ * Gets the executor service bean.
+ *
+ * @return the executor service bean
+ */
public ExecutorService getExecutorServiceBean() {
return executorServiceBean;
}
+ /**
+ * Sets the executor service bean.
+ *
+ * @param executorServiceBean the executor service bean
+ */
public void setExecutorServiceBean(ExecutorService executorServiceBean) {
this.executorServiceBean = executorServiceBean;
}
+ /**
+ * Gets the aggregation strategy.
+ *
+ * @return the aggregation strategy
+ */
public String getAggregationStrategy() {
return aggregationStrategy;
}
+ /**
+ * Sets the aggregation strategy.
+ *
+ * @param aggregationStrategy the aggregation strategy
+ */
public void setAggregationStrategy(String aggregationStrategy) {
this.aggregationStrategy = aggregationStrategy;
}
+ /**
+ * Gets the timeout.
+ *
+ * @return the timeout
+ */
public long getTimeout() {
return timeout;
}
+ /**
+ * Sets the timeout.
+ *
+ * @param timeout the timeout
+ */
public void setTimeout(long timeout) {
this.timeout = timeout;
}
+ /**
+ * Gets the cache size.
+ *
+ * @return the cache size
+ */
public int getCacheSize() {
return cacheSize;
}
+ /**
+ * Sets the cache size.
+ *
+ * @param cacheSize the cache size
+ */
public void setCacheSize(int cacheSize) {
this.cacheSize = cacheSize;
}
+ /**
+ * Gets the on prepare processor reference.
+ *
+ * @return the on prepare processor reference
+ */
public String getOnPrepare() {
return onPrepare;
}
+ /**
+ * Sets the on prepare processor reference.
+ *
+ * @param onPrepare the on prepare processor reference
+ */
public void setOnPrepare(String onPrepare) {
this.onPrepare = onPrepare;
}
+ /**
+ * Gets the on prepare processor.
+ *
+ * @return the on prepare processor
+ */
public Processor getOnPrepareProcessor() {
return onPrepareProcessor;
}
+ /**
+ * Sets the on prepare processor.
+ *
+ * @param onPrepare the on prepare processor
+ */
public void setOnPrepareProcessor(Processor onPrepare) {
this.onPrepareProcessor = onPrepare;
}
+ /**
+ * Gets the share unit of work flag.
+ *
+ * @return the share unit of work flag
+ */
public boolean isShareUnitOfWork() {
return shareUnitOfWork;
}
+ /**
+ * Sets the share unit of work flag.
+ *
+ * @param shareUnitOfWork the share unit of work flag
+ */
public void setShareUnitOfWork(boolean shareUnitOfWork) {
this.shareUnitOfWork = shareUnitOfWork;
}
+ /**
+ * Gets the aggregation strategy method name.
+ *
+ * @return the aggregation strategy method name
+ */
public String getAggregationStrategyMethodName() {
return aggregationStrategyMethodName;
}
+ /**
+ * Sets the aggregation strategy method name.
+ *
+ * @param aggregationStrategyMethodName the aggregation strategy method name
+ */
public void setAggregationStrategyMethodName(String aggregationStrategyMethodName) {
this.aggregationStrategyMethodName = aggregationStrategyMethodName;
}
+ /**
+ * Gets whether the aggregation strategy method allows null.
+ *
+ * @return true if the aggregation strategy method allows null, false otherwise
+ */
public boolean isAggregationStrategyMethodAllowNull() {
return aggregationStrategyMethodAllowNull;
}
+ /**
+ * Sets whether the aggregation strategy method allows null.
+ *
+ * @param aggregationStrategyMethodAllowNull flag to set whether the aggregation strategy method allows null
+ */
public void setAggregationStrategyMethodAllowNull(boolean aggregationStrategyMethodAllowNull) {
this.aggregationStrategyMethodAllowNull = aggregationStrategyMethodAllowNull;
}
+ /**
+ * Gets the aggregation strategy bean.
+ *
+ * @return the aggregation strategy bean
+ */
public AggregationStrategy getAggregationStrategyBean() {
return aggregationStrategyBean;
}
+ /**
+ * Sets the aggregation strategy bean.
+ *
+ * @param aggregationStrategyBean the aggregation strategy bean
+ */
public void setAggregationStrategyBean(AggregationStrategy aggregationStrategyBean) {
this.aggregationStrategyBean = aggregationStrategyBean;
}
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterConstants.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterConstants.java
index 2c21105b90c..e9d21d87d80 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterConstants.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterConstants.java
@@ -18,8 +18,10 @@
package org.apache.camel.component.dynamicrouter.routing;
import java.util.Comparator;
+import java.util.function.BiFunction;
import java.util.function.Supplier;
+import org.apache.camel.CamelContext;
import org.apache.camel.Expression;
import org.apache.camel.component.dynamicrouter.filter.DynamicRouterFilterService;
import org.apache.camel.component.dynamicrouter.filter.DynamicRouterFilterService.DynamicRouterFilterServiceFactory;
@@ -28,6 +30,7 @@ import org.apache.camel.component.dynamicrouter.filter.PrioritizedFilter.Priorit
import org.apache.camel.component.dynamicrouter.routing.DynamicRouterEndpoint.DynamicRouterEndpointFactory;
import org.apache.camel.component.dynamicrouter.routing.DynamicRouterProcessor.DynamicRouterProcessorFactory;
import org.apache.camel.component.dynamicrouter.routing.DynamicRouterProducer.DynamicRouterProducerFactory;
+import org.apache.camel.processor.RecipientList;
import org.apache.camel.support.builder.ExpressionBuilder;
/**
@@ -117,6 +120,11 @@ public final class DynamicRouterConstants {
.comparingInt(PrioritizedFilter::priority)
.thenComparing(PrioritizedFilter::id);
+ /**
+ * Creates a {@link RecipientList} instance.
+ */
+ public static final BiFunction<CamelContext, Expression, RecipientList> RECIPIENT_LIST_SUPPLIER = RecipientList::new;
+
/**
* Template for a logging endpoint, showing all, and multiline.
*/
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterEndpoint.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterEndpoint.java
index c637bc3f453..202ac9fef77 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterEndpoint.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterEndpoint.java
@@ -16,16 +16,19 @@
*/
package org.apache.camel.component.dynamicrouter.routing;
+import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.apache.camel.CamelContext;
import org.apache.camel.Category;
import org.apache.camel.Consumer;
+import org.apache.camel.Expression;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.component.dynamicrouter.filter.DynamicRouterFilterService;
import org.apache.camel.component.dynamicrouter.routing.DynamicRouterProcessor.DynamicRouterProcessorFactory;
import org.apache.camel.component.dynamicrouter.routing.DynamicRouterProducer.DynamicRouterProducerFactory;
+import org.apache.camel.processor.RecipientList;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
@@ -37,6 +40,7 @@ import static org.apache.camel.component.dynamicrouter.routing.DynamicRouterCons
import static org.apache.camel.component.dynamicrouter.routing.DynamicRouterConstants.FIRST_VERSION;
import static org.apache.camel.component.dynamicrouter.routing.DynamicRouterConstants.PROCESSOR_FACTORY_SUPPLIER;
import static org.apache.camel.component.dynamicrouter.routing.DynamicRouterConstants.PRODUCER_FACTORY_SUPPLIER;
+import static org.apache.camel.component.dynamicrouter.routing.DynamicRouterConstants.RECIPIENT_LIST_SUPPLIER;
import static org.apache.camel.component.dynamicrouter.routing.DynamicRouterConstants.SYNTAX;
import static org.apache.camel.component.dynamicrouter.routing.DynamicRouterConstants.TITLE;
@@ -65,6 +69,11 @@ public class DynamicRouterEndpoint extends DefaultEndpoint {
*/
private final Supplier<DynamicRouterProducerFactory> producerFactorySupplier;
+ /**
+ * Creates a {@link RecipientList} instance.
+ */
+ private final BiFunction<CamelContext, Expression, RecipientList> recipientListSupplier;
+
/**
* Channel for the Dynamic Router. For example, if the Dynamic Router URI is "dynamic-router://test", then the
* channel is "test". Channels are a way of keeping routing participants, their rules, and exchanges logically
@@ -94,18 +103,21 @@ public class DynamicRouterEndpoint extends DefaultEndpoint {
* @param configuration the {@link DynamicRouterConfiguration}
* @param processorFactorySupplier creates the {@link DynamicRouterProcessor}
* @param producerFactorySupplier creates the {@link DynamicRouterProcessor}
+ * @param recipientListSupplier creates the {@link RecipientList}
* @param filterService the {@link DynamicRouterFilterService}
*/
public DynamicRouterEndpoint(final String uri, final DynamicRouterComponent component,
final DynamicRouterConfiguration configuration,
final Supplier<DynamicRouterProcessorFactory> processorFactorySupplier,
final Supplier<DynamicRouterProducerFactory> producerFactorySupplier,
+ final BiFunction<CamelContext, Expression, RecipientList> recipientListSupplier,
final DynamicRouterFilterService filterService) {
super(uri, component);
this.channel = configuration.getChannel();
this.configuration = configuration;
this.processorFactorySupplier = processorFactorySupplier;
this.producerFactorySupplier = producerFactorySupplier;
+ this.recipientListSupplier = recipientListSupplier;
this.configuration.setChannel(channel);
this.filterService = filterService;
LOG.debug("Created Dynamic Router endpoint URI: {}", uri);
@@ -125,6 +137,7 @@ public class DynamicRouterEndpoint extends DefaultEndpoint {
super(uri, component);
this.processorFactorySupplier = PROCESSOR_FACTORY_SUPPLIER;
this.producerFactorySupplier = PRODUCER_FACTORY_SUPPLIER;
+ this.recipientListSupplier = RECIPIENT_LIST_SUPPLIER;
this.channel = configuration.getChannel();
this.configuration = configuration;
this.filterService = filterService;
@@ -143,7 +156,7 @@ public class DynamicRouterEndpoint extends DefaultEndpoint {
DynamicRouterComponent component = getDynamicRouterComponent();
CamelContext camelContext = getCamelContext();
DynamicRouterProcessor processor = processorFactorySupplier.get()
- .getInstance(camelContext, configuration, filterService);
+ .getInstance(camelContext, configuration, filterService, recipientListSupplier);
component.addRoutingProcessor(configuration.getChannel(), processor);
}
@@ -210,10 +223,11 @@ public class DynamicRouterEndpoint extends DefaultEndpoint {
final DynamicRouterConfiguration configuration,
final Supplier<DynamicRouterProcessorFactory> processorFactorySupplier,
final Supplier<DynamicRouterProducerFactory> producerFactorySupplier,
+ final BiFunction<CamelContext, Expression, RecipientList> recipientListSupplier,
final DynamicRouterFilterService filterService) {
return new DynamicRouterEndpoint(
- uri, component, configuration, processorFactorySupplier,
- producerFactorySupplier, filterService);
+ uri, component, configuration, processorFactorySupplier, producerFactorySupplier,
+ recipientListSupplier, filterService);
}
/**
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterProcessor.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterProcessor.java
index 16810e5d871..51ab3afc550 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterProcessor.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterProcessor.java
@@ -16,9 +16,12 @@
*/
package org.apache.camel.component.dynamicrouter.routing;
+import java.util.function.BiFunction;
+
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
import org.apache.camel.Message;
import org.apache.camel.component.dynamicrouter.filter.DynamicRouterFilterService;
import org.apache.camel.component.dynamicrouter.filter.PrioritizedFilter;
@@ -28,6 +31,12 @@ import org.apache.camel.support.AsyncProcessorSupport;
import static org.apache.camel.component.dynamicrouter.routing.DynamicRouterConstants.MODE_FIRST_MATCH;
import static org.apache.camel.component.dynamicrouter.routing.DynamicRouterConstants.RECIPIENT_LIST_HEADER;
+/**
+ * The {@link DynamicRouterProcessor} is responsible for routing an exchange to the appropriate recipients. It uses the
+ * {@link DynamicRouterFilterService} to determine which filters match the exchange, and then sets the recipient list
+ * header on the exchange to the list of matching filters. The {@link RecipientList} processor is then used to route the
+ * exchange to the matching recipients.
+ */
public class DynamicRouterProcessor extends AsyncProcessorSupport {
/**
@@ -137,16 +146,18 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport {
* create a {@link RecipientList} instance for the {@link DynamicRouterProcessor} to use for routing the
* exchange to matching recipients.
*
- * @param camelContext the CamelContext
- * @param configuration the configuration
- * @param filterService service that manages {@link PrioritizedFilter}s for dynamic router channels
- * @return the {@link DynamicRouterProcessor} instance
+ * @param camelContext the CamelContext
+ * @param configuration the configuration
+ * @param filterService service that manages {@link PrioritizedFilter}s for dynamic router channels
+ * @param recipientListSupplier the supplier for the {@link RecipientList} instance
+ * @return the {@link DynamicRouterProcessor} instance
*/
public DynamicRouterProcessor getInstance(
CamelContext camelContext, DynamicRouterConfiguration configuration,
- DynamicRouterFilterService filterService) {
- RecipientList recipientList
- = (RecipientList) DynamicRouterRecipientListHelper.createProcessor(camelContext, configuration);
+ DynamicRouterFilterService filterService,
+ BiFunction<CamelContext, Expression, RecipientList> recipientListSupplier) {
+ RecipientList recipientList = (RecipientList) DynamicRouterRecipientListHelper
+ .createProcessor(camelContext, configuration, recipientListSupplier);
return new DynamicRouterProcessor(
configuration.getRecipientMode(), configuration.isWarnDroppedMessage(),
configuration.getChannel(), recipientList, filterService);
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterRecipientListHelper.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterRecipientListHelper.java
index 975cd2ae799..4d1df1c681f 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterRecipientListHelper.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterRecipientListHelper.java
@@ -24,6 +24,7 @@ import org.apache.camel.AggregationStrategy;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
import org.apache.camel.Processor;
import org.apache.camel.processor.RecipientList;
import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
@@ -47,27 +48,43 @@ public final class DynamicRouterRecipientListHelper {
private DynamicRouterRecipientListHelper() {
}
+ /**
+ * Creates an {@link AggregationStrategyBiFunctionAdapter} from a {@link BiFunction} and a
+ * {@link DynamicRouterConfiguration}.
+ */
+ static BiFunction<BiFunction<Exchange, Exchange, Object>, DynamicRouterConfiguration, AggregationStrategyBiFunctionAdapter> createBiFunctionAdapter
+ = (bf, cfg) -> {
+ AggregationStrategyBiFunctionAdapter adapter = new AggregationStrategyBiFunctionAdapter(bf);
+ adapter.setAllowNullNewExchange(cfg.isAggregationStrategyMethodAllowNull());
+ adapter.setAllowNullOldExchange(cfg.isAggregationStrategyMethodAllowNull());
+ return adapter;
+ };
+
+ /**
+ * Creates an {@link AggregationStrategyBeanAdapter} from an object and a {@link DynamicRouterConfiguration}.
+ */
+ static BiFunction<Object, DynamicRouterConfiguration, AggregationStrategyBeanAdapter> createBeanAdapter = (obj, cfg) -> {
+ AggregationStrategyBeanAdapter adapter
+ = new AggregationStrategyBeanAdapter(obj, cfg.getAggregationStrategyMethodName());
+ adapter.setAllowNullNewExchange(cfg.isAggregationStrategyMethodAllowNull());
+ adapter.setAllowNullOldExchange(cfg.isAggregationStrategyMethodAllowNull());
+ return adapter;
+ };
+
/**
* Given an object, convert it to an {@link AggregationStrategy} based on its class.
*/
@SuppressWarnings({ "unchecked" })
- static BiFunction<Object, DynamicRouterConfiguration, AggregationStrategy> convertAggregationStrategy = (aggStr, cfg) -> {
- if (aggStr instanceof AggregationStrategy as) {
- return as;
- } else if (aggStr instanceof BiFunction<?, ?, ?> bf) {
- AggregationStrategyBiFunctionAdapter adapter
- = new AggregationStrategyBiFunctionAdapter((BiFunction<Exchange, Exchange, Object>) bf);
- adapter.setAllowNullNewExchange(cfg.isAggregationStrategyMethodAllowNull());
- adapter.setAllowNullOldExchange(cfg.isAggregationStrategyMethodAllowNull());
- return adapter;
- } else {
- AggregationStrategyBeanAdapter adapter
- = new AggregationStrategyBeanAdapter(aggStr, cfg.getAggregationStrategyMethodName());
- adapter.setAllowNullNewExchange(cfg.isAggregationStrategyMethodAllowNull());
- adapter.setAllowNullOldExchange(cfg.isAggregationStrategyMethodAllowNull());
- return adapter;
- }
- };
+ static BiFunction<Object, DynamicRouterConfiguration, AggregationStrategy> convertAggregationStrategy
+ = (aggStr, cfg) -> Optional.ofNullable(aggStr)
+ .filter(AggregationStrategy.class::isInstance)
+ .map(s -> (AggregationStrategy) s)
+ .or(() -> Optional.ofNullable(aggStr)
+ .filter(BiFunction.class::isInstance)
+ .map(s -> createBiFunctionAdapter.apply((BiFunction<Exchange, Exchange, Object>) s, cfg)))
+ .or(() -> Optional.ofNullable(aggStr)
+ .map(s -> createBeanAdapter.apply(s, cfg)))
+ .orElseThrow(() -> new IllegalArgumentException("Cannot convert AggregationStrategy from: " + aggStr));
/**
* Create and configure the {@link RecipientList} {@link Processor}. The returned recipient list will be started.
@@ -76,8 +93,10 @@ public final class DynamicRouterRecipientListHelper {
* @param cfg the {@link DynamicRouterConfiguration}
* @return the configured {@link RecipientList} {@link Processor}
*/
- public static Processor createProcessor(CamelContext camelContext, DynamicRouterConfiguration cfg) {
- RecipientList recipientList = new RecipientList(camelContext, RECIPIENT_LIST_EXPRESSION, ",");
+ public static Processor createProcessor(
+ CamelContext camelContext, DynamicRouterConfiguration cfg,
+ BiFunction<CamelContext, Expression, RecipientList> recipientListSupplier) {
+ RecipientList recipientList = recipientListSupplier.apply(camelContext, RECIPIENT_LIST_EXPRESSION);
setPropertiesForRecipientList(recipientList, camelContext, cfg);
ExecutorService threadPool
= getConfiguredExecutorService(camelContext, "RecipientList", cfg, cfg.isParallelProcessing());
@@ -128,7 +147,7 @@ public final class DynamicRouterRecipientListHelper {
static AggregationStrategy createAggregationStrategy(CamelContext camelContext, DynamicRouterConfiguration cfg) {
AggregationStrategy strategy = Optional.ofNullable(cfg.getAggregationStrategyBean())
.or(() -> Optional.ofNullable(cfg.getAggregationStrategy())
- .map(ref -> Optional.ofNullable(lookupByNameAndType(camelContext, ref, Object.class))
+ .map(ref -> lookupByNameAndType(camelContext, ref, Object.class)
.map(aggStr -> convertAggregationStrategy.apply(aggStr, cfg))
.orElseThrow(() -> new IllegalArgumentException(
"Cannot find AggregationStrategy in Registry with name: " +
@@ -138,12 +157,11 @@ public final class DynamicRouterRecipientListHelper {
return cfg.isShareUnitOfWork() ? new ShareUnitOfWorkAggregationStrategy(strategy) : strategy;
}
- static <T> T lookupByNameAndType(CamelContext camelContext, String name, Class<T> type) {
+ static <T> Optional<T> lookupByNameAndType(CamelContext camelContext, String name, Class<T> type) {
return Optional.ofNullable(ObjectHelper.isEmpty(name) ? null : name)
.map(n -> EndpointHelper.isReferenceParameter(n)
? EndpointHelper.resolveReferenceParameter(camelContext, n, type, false)
- : camelContext.getRegistry().lookupByNameAndType(n, type))
- .orElse(null);
+ : camelContext.getRegistry().lookupByNameAndType(n, type));
}
/**
@@ -162,7 +180,7 @@ public final class DynamicRouterRecipientListHelper {
return Optional.ofNullable(cfg.getExecutorServiceBean())
.map(esb -> false)
.or(() -> Optional.ofNullable(cfg.getExecutorService())
- .map(es -> lookupByNameAndType(camelContext, es, ExecutorService.class) == null))
+ .map(es -> lookupByNameAndType(camelContext, es, ExecutorService.class).isEmpty()))
.orElse(useDefault);
}
@@ -183,16 +201,15 @@ public final class DynamicRouterRecipientListHelper {
* @param executorServiceRef reference name of the thread pool
* @return the executor service, or <tt>null</tt> if none was found.
*/
- static ExecutorService lookupExecutorServiceRef(
- CamelContext camelContext, String name, Object source,
- String executorServiceRef) {
+ static Optional<ExecutorService> lookupExecutorServiceRef(
+ CamelContext camelContext, String name, Object source, String executorServiceRef) {
ExecutorServiceManager manager = camelContext.getExecutorServiceManager();
- ObjectHelper.notNull(manager, ESM_NAME, camelContext);
+ ObjectHelper.notNull(manager, ESM_NAME);
ObjectHelper.notNull(executorServiceRef, "executorServiceRef");
// lookup in registry first and use existing thread pool if exists,
// or create a new thread pool, assuming that the executor service ref is a thread pool ID
- return Optional.ofNullable(lookupByNameAndType(camelContext, executorServiceRef, ExecutorService.class))
- .orElse(manager.newThreadPool(source, name, executorServiceRef));
+ return lookupByNameAndType(camelContext, executorServiceRef, ExecutorService.class)
+ .or(() -> Optional.ofNullable(manager.newThreadPool(source, name, executorServiceRef)));
}
/**
@@ -215,22 +232,23 @@ public final class DynamicRouterRecipientListHelper {
* was not found
*/
static ExecutorService getConfiguredExecutorService(
- CamelContext camelContext, String name,
- DynamicRouterConfiguration cfg, boolean useDefault)
+ CamelContext camelContext, String name, DynamicRouterConfiguration cfg, boolean useDefault)
throws IllegalArgumentException {
ExecutorServiceManager manager = camelContext.getExecutorServiceManager();
ObjectHelper.notNull(manager, ESM_NAME, camelContext);
+ String exSvcRef = cfg.getExecutorService();
+ ExecutorService exSvcBean = cfg.getExecutorServiceBean();
+ String errorMessage = "ExecutorServiceRef '" + exSvcRef + "' not found in registry as an ExecutorService " +
+ "instance or as a thread pool profile";
// The first (preferred) option is to use an explicitly-configured executor if the configuration has it
- return Optional.ofNullable(cfg.getExecutorServiceBean())
+ return Optional.ofNullable(exSvcBean)
// The second preference is to check for an executor service reference
- .or(() -> Optional.ofNullable(cfg.getExecutorService())
+ .or(() -> Optional.ofNullable(exSvcRef)
// Try to get the referenced executor service
- .map(r -> Optional.ofNullable(lookupExecutorServiceRef(camelContext, name, cfg, r))
+ .map(r -> lookupExecutorServiceRef(camelContext, name, cfg, r)
// But, if the reference is specified in the config,
// and could not be obtained, this is an error
- .orElseThrow(() -> new IllegalArgumentException(
- "ExecutorServiceRef %s not found in registry ".formatted(cfg.getExecutorService()) +
- "(as an ExecutorService instance) or as a thread pool profile."))))
+ .orElseThrow(() -> new IllegalArgumentException(errorMessage))))
// The third and final option is to create a new "default" thread pool if the parameter
// specifies to that the default thread pool should be used as a fallback
.or(() -> useDefault ? Optional.of(manager.newDefaultThreadPool(cfg, name)) : Optional.empty())
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlChannelSendDynamicAwareTest.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlChannelSendDynamicAwareTest.java
index 313c303c2aa..634d942297b 100644
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlChannelSendDynamicAwareTest.java
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlChannelSendDynamicAwareTest.java
@@ -68,6 +68,17 @@ class DynamicRouterControlChannelSendDynamicAwareTest {
}
}
+ @Test
+ void resolveStaticUriShouldNotOptimize() throws Exception {
+ String originalUri = "dynamic-router-ctrl:subscribe?subscriptionId=testSub1";
+ String uri = "dynamic-router-ctrl://subscribe?subscriptionId=testSub1";
+ try (DynamicRouterControlChannelSendDynamicAware testSubject = new DynamicRouterControlChannelSendDynamicAware()) {
+ SendDynamicAware.DynamicAwareEntry entry = testSubject.prepare(exchange, uri, originalUri);
+ String result = testSubject.resolveStaticUri(exchange, entry);
+ assertEquals(null, result);
+ }
+ }
+
@Test
void createPreProcessor() throws Exception {
Mockito.when(exchange.getMessage()).thenReturn(message);
@@ -83,6 +94,17 @@ class DynamicRouterControlChannelSendDynamicAwareTest {
}
}
+ @Test
+ void createPreProcessorShouldNotOptimize() throws Exception {
+ String originalUri = "dynamic-router-ctrl:subscribe?subscriptionId=testSub1";
+ String uri = "dynamic-router-ctrl://subscribe?subscriptionId=testSub1";
+ try (DynamicRouterControlChannelSendDynamicAware testSubject = new DynamicRouterControlChannelSendDynamicAware()) {
+ SendDynamicAware.DynamicAwareEntry entry = testSubject.prepare(exchange, uri, originalUri);
+ Processor preProcessor = testSubject.createPreProcessor(exchange, entry);
+ Assertions.assertNull(preProcessor);
+ }
+ }
+
@Test
void createPostProcessor() throws Exception {
Mockito.when(exchange.getMessage()).thenReturn(message);
@@ -96,4 +118,15 @@ class DynamicRouterControlChannelSendDynamicAwareTest {
}
Mockito.verify(message, Mockito.times(URI_PARAMS_TO_HEADER_NAMES.size())).removeHeader(any());
}
+
+ @Test
+ void createPostProcessorShouldNotOptimize() throws Exception {
+ String originalUri = "dynamic-router-ctrl:subscribe?subscriptionId=testSub1";
+ String uri = "dynamic-router-ctrl://subscribe?subscriptionId=testSub1";
+ try (DynamicRouterControlChannelSendDynamicAware testSubject = new DynamicRouterControlChannelSendDynamicAware()) {
+ SendDynamicAware.DynamicAwareEntry entry = testSubject.prepare(exchange, uri, originalUri);
+ Processor postProcessor = testSubject.createPostProcessor(exchange, entry);
+ assertNull(postProcessor);
+ }
+ }
}
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlComponentTest.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlComponentTest.java
index b0be1ad91c8..c69f6eb6070 100644
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlComponentTest.java
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlComponentTest.java
@@ -64,8 +64,7 @@ class DynamicRouterControlComponentTest {
final String uri,
final DynamicRouterControlComponent component,
final String controlAction,
- final DynamicRouterControlConfiguration configuration,
- final DynamicRouterControlService controlService) {
+ final DynamicRouterControlConfiguration configuration) {
return endpoint;
}
};
@@ -76,7 +75,7 @@ class DynamicRouterControlComponentTest {
return controlService;
}
};
- component = new DynamicRouterControlComponent(context, () -> endpointFactory, () -> controlServiceFactory);
+ component = new DynamicRouterControlComponent(context, () -> endpointFactory);
}
@Test
@@ -93,8 +92,7 @@ class DynamicRouterControlComponentTest {
@Test
void testCreateInstanceAllArgs() {
- DynamicRouterControlComponent instance = new DynamicRouterControlComponent(
- context, () -> endpointFactory, () -> controlServiceFactory);
+ DynamicRouterControlComponent instance = new DynamicRouterControlComponent(context, () -> endpointFactory);
Assertions.assertNotNull(instance);
}
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlEndpointTest.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlEndpointTest.java
index 4739e8958e4..b66576c1ff5 100644
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlEndpointTest.java
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlEndpointTest.java
@@ -73,6 +73,8 @@ class DynamicRouterControlEndpointTest {
DynamicRouterControlProducerFactory producerFactory;
+ DynamicRouterControlService.DynamicRouterControlServiceFactory controlServiceFactory;
+
@BeforeEach
void setup() {
context = contextExtension.getContext();
@@ -86,8 +88,8 @@ class DynamicRouterControlEndpointTest {
}
};
endpoint = new DynamicRouterControlEndpoint(
- CONTROL_ENDPOINT_URI, component, CONTROL_ACTION_SUBSCRIBE,
- configuration, controlService, () -> producerFactory);
+ CONTROL_ENDPOINT_URI, component, CONTROL_ACTION_SUBSCRIBE, configuration, () -> producerFactory,
+ () -> controlServiceFactory);
endpoint.setCamelContext(context);
}
@@ -99,7 +101,7 @@ class DynamicRouterControlEndpointTest {
@Test
void testConstruction() {
DynamicRouterControlEndpoint instance = new DynamicRouterControlEndpoint(
- CONTROL_ENDPOINT_URI, component, CONTROL_ACTION_SUBSCRIBE, configuration, controlService);
+ CONTROL_ENDPOINT_URI, component, CONTROL_ACTION_SUBSCRIBE, configuration);
Assertions.assertNotNull(instance);
}
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducerTest.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducerTest.java
index 45589dad438..ea65164dfe5 100644
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducerTest.java
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducerTest.java
@@ -49,6 +49,9 @@ import static org.apache.camel.component.dynamicrouter.control.DynamicRouterCont
import static org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_PRIORITY;
import static org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_SUBSCRIBE_CHANNEL;
import static org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_SUBSCRIPTION_ID;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class DynamicRouterControlProducerTest {
@@ -96,13 +99,31 @@ class DynamicRouterControlProducerTest {
CONTROL_PREDICATE, "true",
CONTROL_EXPRESSION_LANGUAGE, "simple",
CONTROL_PRIORITY, 10);
- Mockito.when(message.getHeaders()).thenReturn(headers);
+ when(message.getHeaders()).thenReturn(headers);
Mockito.doNothing().when(callback).done(false);
producer.performSubscribe(message, callback);
Mockito.verify(controlService, Mockito.times(1)).subscribeWithPredicateExpression(
subscribeChannel, "testId", "mock://test", 10, "true", "simple", false);
}
+ @Test
+ void performSubscribeActionWithEmptyExpressionLanguage() {
+ String subscribeChannel = "testChannel";
+ Map<String, Object> headers = Map.of(
+ CONTROL_ACTION_HEADER, CONTROL_ACTION_SUBSCRIBE,
+ CONTROL_SUBSCRIBE_CHANNEL, subscribeChannel,
+ CONTROL_SUBSCRIPTION_ID, "testId",
+ CONTROL_DESTINATION_URI, "mock://test",
+ CONTROL_PREDICATE, "true",
+ CONTROL_EXPRESSION_LANGUAGE, "",
+ CONTROL_PRIORITY, 10);
+ when(message.getHeaders()).thenReturn(headers);
+ Mockito.doNothing().when(callback).done(false);
+ producer.performSubscribe(message, callback);
+ Mockito.verify(controlService, Mockito.times(1)).subscribeWithPredicateInstance(
+ subscribeChannel, "testId", "mock://test", 10, null, false);
+ }
+
@Test
void performUnsubscribeAction() {
String subscriptionId = "testId";
@@ -111,7 +132,7 @@ class DynamicRouterControlProducerTest {
CONTROL_ACTION_HEADER, CONTROL_ACTION_UNSUBSCRIBE,
CONTROL_SUBSCRIBE_CHANNEL, subscribeChannel,
CONTROL_SUBSCRIPTION_ID, subscriptionId);
- Mockito.when(message.getHeaders()).thenReturn(headers);
+ when(message.getHeaders()).thenReturn(headers);
Mockito.doNothing().when(callback).done(false);
producer.performUnsubscribe(message, callback);
Mockito.verify(controlService, Mockito.times(1))
@@ -130,7 +151,7 @@ class DynamicRouterControlProducerTest {
CONTROL_PREDICATE, "true",
CONTROL_EXPRESSION_LANGUAGE, "simple",
CONTROL_PRIORITY, 10);
- Mockito.when(message.getHeaders()).thenReturn(headers);
+ when(message.getHeaders()).thenReturn(headers);
Mockito.doNothing().when(callback).done(false);
producer.performSubscribe(message, callback);
Mockito.verify(controlService, Mockito.times(1))
@@ -146,7 +167,7 @@ class DynamicRouterControlProducerTest {
CONTROL_PREDICATE, "true",
CONTROL_EXPRESSION_LANGUAGE, "simple",
CONTROL_PRIORITY, 100);
- Mockito.when(message.getHeaders()).thenReturn(headers);
+ when(message.getHeaders()).thenReturn(headers);
Mockito.doNothing().when(callback).done(false);
producer.performUpdate(message, callback);
Mockito.verify(controlService, Mockito.times(1))
@@ -165,8 +186,8 @@ class DynamicRouterControlProducerTest {
CONTROL_PRIORITY, 10);
Language language = context.resolveLanguage("simple");
Predicate predicate = language.createPredicate("true");
- Mockito.when(message.getBody()).thenReturn(predicate);
- Mockito.when(message.getHeaders()).thenReturn(headers);
+ when(message.getBody()).thenReturn(predicate);
+ when(message.getHeaders()).thenReturn(headers);
Mockito.doNothing().when(callback).done(false);
producer.performSubscribe(message, callback);
Mockito.verify(controlService, Mockito.times(1))
@@ -184,7 +205,7 @@ class DynamicRouterControlProducerTest {
CONTROL_DESTINATION_URI, "mock://test",
CONTROL_PRIORITY, 10,
CONTROL_PREDICATE_BEAN, "testPredicate");
- Mockito.when(message.getHeaders()).thenReturn(headers);
+ when(message.getHeaders()).thenReturn(headers);
Mockito.doNothing().when(callback).done(false);
producer.performSubscribe(message, callback);
Mockito.verify(controlService, Mockito.times(1))
@@ -203,8 +224,8 @@ class DynamicRouterControlProducerTest {
.predicate("true")
.expressionLanguage("simple")
.build();
- Mockito.when(message.getBody()).thenReturn(subMsg);
- Mockito.when(message.getBody(DynamicRouterControlMessage.class)).thenReturn(subMsg);
+ when(message.getBody()).thenReturn(subMsg);
+ when(message.getBody(DynamicRouterControlMessage.class)).thenReturn(subMsg);
Mockito.doNothing().when(callback).done(false);
producer.performSubscribe(message, callback);
Mockito.verify(controlService, Mockito.times(1))
@@ -212,6 +233,40 @@ class DynamicRouterControlProducerTest {
subscribeChannel, "testId", "mock://test", 10, "true", "simple", false);
}
+ @Test
+ void performSubscribeActionWithMessageInBodyWithEmptyExpressionLanguage() {
+ String subscribeChannel = "testChannel";
+ DynamicRouterControlMessage subMsg = DynamicRouterControlMessage.Builder.newBuilder()
+ .subscribeChannel(subscribeChannel)
+ .subscriptionId("testId")
+ .destinationUri("mock://test")
+ .priority(10)
+ .predicate("true")
+ .expressionLanguage("")
+ .build();
+ when(message.getBody()).thenReturn(subMsg);
+ when(message.getBody(DynamicRouterControlMessage.class)).thenReturn(subMsg);
+ Exception ex = assertThrows(IllegalStateException.class, () -> producer.performSubscribe(message, callback));
+ assertEquals("Predicate bean could not be found", ex.getMessage());
+ }
+
+ @Test
+ void performSubscribeActionWithMessageInBodyWithEmptyExpression() {
+ String subscribeChannel = "testChannel";
+ DynamicRouterControlMessage subMsg = DynamicRouterControlMessage.Builder.newBuilder()
+ .subscribeChannel(subscribeChannel)
+ .subscriptionId("testId")
+ .destinationUri("mock://test")
+ .priority(10)
+ .predicate("")
+ .expressionLanguage("simple")
+ .build();
+ when(message.getBody()).thenReturn(subMsg);
+ when(message.getBody(DynamicRouterControlMessage.class)).thenReturn(subMsg);
+ Exception ex = assertThrows(IllegalStateException.class, () -> producer.performSubscribe(message, callback));
+ assertEquals("Predicate bean could not be found", ex.getMessage());
+ }
+
@Test
void performSubscribeActionWithMessageInBodyAndPredicateBean() {
String subscribeChannel = "testChannel";
@@ -222,8 +277,8 @@ class DynamicRouterControlProducerTest {
.priority(10)
.predicateBean("testPredicate")
.build();
- Mockito.when(message.getBody()).thenReturn(subMsg);
- Mockito.when(message.getBody(DynamicRouterControlMessage.class)).thenReturn(subMsg);
+ when(message.getBody()).thenReturn(subMsg);
+ when(message.getBody(DynamicRouterControlMessage.class)).thenReturn(subMsg);
Mockito.doNothing().when(callback).done(false);
producer.performSubscribe(message, callback);
Mockito.verify(controlService, Mockito.times(1))
@@ -242,8 +297,8 @@ class DynamicRouterControlProducerTest {
.predicate("true")
.expressionLanguage("simple")
.build();
- Mockito.when(message.getBody()).thenReturn(subMsg);
- Mockito.when(message.getBody(DynamicRouterControlMessage.class)).thenReturn(subMsg);
+ when(message.getBody()).thenReturn(subMsg);
+ when(message.getBody(DynamicRouterControlMessage.class)).thenReturn(subMsg);
Mockito.doNothing().when(callback).done(false);
producer.performUpdate(message, callback);
Mockito.verify(controlService, Mockito.times(1))
@@ -258,14 +313,29 @@ class DynamicRouterControlProducerTest {
Map<String, Object> headers = Map.of(
CONTROL_ACTION_HEADER, CONTROL_ACTION_LIST,
CONTROL_SUBSCRIBE_CHANNEL, subscribeChannel);
- Mockito.when(exchange.getMessage()).thenReturn(message);
- Mockito.when(message.getHeaders()).thenReturn(headers);
- Mockito.when(controlService.getSubscriptionsForChannel(subscribeChannel)).thenReturn("[" + filterString + "]");
+ when(exchange.getMessage()).thenReturn(message);
+ when(message.getHeaders()).thenReturn(headers);
+ when(controlService.getSubscriptionsForChannel(subscribeChannel)).thenReturn("[" + filterString + "]");
Mockito.doNothing().when(callback).done(false);
producer.performList(exchange, callback);
Mockito.verify(message, Mockito.times(1)).setBody("[" + filterString + "]", String.class);
}
+ @Test
+ void testPerformListActionWithException() {
+ String subscribeChannel = "testChannel";
+ Map<String, Object> headers = Map.of(
+ CONTROL_ACTION_HEADER, CONTROL_ACTION_LIST,
+ CONTROL_SUBSCRIBE_CHANNEL, subscribeChannel);
+ when(exchange.getMessage()).thenReturn(message);
+ when(message.getHeaders()).thenReturn(headers);
+ Mockito.doNothing().when(callback).done(false);
+ Exception ex = new IllegalArgumentException("test exception");
+ Mockito.doThrow(ex).when(controlService).getSubscriptionsForChannel(subscribeChannel);
+ producer.performList(exchange, callback);
+ Mockito.verify(exchange, Mockito.times(1)).setException(ex);
+ }
+
@Test
void testPerformStatsAction() {
String subscribeChannel = "testChannel";
@@ -273,14 +343,29 @@ class DynamicRouterControlProducerTest {
Map<String, Object> headers = Map.of(
CONTROL_ACTION_HEADER, CONTROL_ACTION_STATS,
CONTROL_SUBSCRIBE_CHANNEL, subscribeChannel);
- Mockito.when(exchange.getMessage()).thenReturn(message);
- Mockito.when(message.getHeaders()).thenReturn(headers);
- Mockito.when(controlService.getStatisticsForChannel(subscribeChannel)).thenReturn("[" + statString + "]");
+ when(exchange.getMessage()).thenReturn(message);
+ when(message.getHeaders()).thenReturn(headers);
+ when(controlService.getStatisticsForChannel(subscribeChannel)).thenReturn("[" + statString + "]");
Mockito.doNothing().when(callback).done(false);
producer.performStats(exchange, callback);
Mockito.verify(message, Mockito.times(1)).setBody("[" + statString + "]", String.class);
}
+ @Test
+ void testPerformStatsActionWithException() {
+ String subscribeChannel = "testChannel";
+ Map<String, Object> headers = Map.of(
+ CONTROL_ACTION_HEADER, CONTROL_ACTION_STATS,
+ CONTROL_SUBSCRIBE_CHANNEL, subscribeChannel);
+ when(exchange.getMessage()).thenReturn(message);
+ when(message.getHeaders()).thenReturn(headers);
+ Exception ex = new IllegalArgumentException("test exception");
+ Mockito.doThrow(ex).when(controlService).getStatisticsForChannel(subscribeChannel);
+ Mockito.doNothing().when(callback).done(false);
+ producer.performStats(exchange, callback);
+ Mockito.verify(exchange, Mockito.times(1)).setException(ex);
+ }
+
@Test
void testGetInstance() {
DynamicRouterControlProducer instance = new DynamicRouterControlProducerFactory()
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlServiceTest.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlServiceTest.java
index 809838ef671..7119ba215aa 100644
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlServiceTest.java
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlServiceTest.java
@@ -32,6 +32,7 @@ import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import static org.apache.camel.builder.PredicateBuilder.constant;
+import static org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.ERROR_PREDICATE_CLASS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
@@ -73,7 +74,7 @@ class DynamicRouterControlServiceTest {
}
@Test
- void obtainPredicateFromBean() {
+ void obtainPredicateFromBeanName() {
context.getRegistry().bind(predicateBeanName, Predicate.class, predicateInstance);
Predicate actualPredicate = DynamicRouterControlService.obtainPredicateFromBeanName(predicateBeanName, context);
assertEquals(predicateInstance, actualPredicate);
@@ -96,6 +97,14 @@ class DynamicRouterControlServiceTest {
assertEquals(expectedPredicate, actualPredicate);
}
+ @Test
+ void obtainPredicateFromInstanceWhenNotPredicateInstance() {
+ String expectedPredicate = "thisMightHurt";
+ Exception ex = assertThrows(IllegalArgumentException.class,
+ () -> DynamicRouterControlService.obtainPredicateFromInstance(expectedPredicate));
+ assertEquals(ERROR_PREDICATE_CLASS, ex.getMessage());
+ }
+
@Test
void obtainPredicateFromExpressionWithError() {
String expression = "not a valid expression";
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/filter/DynamicRouterFilterServiceTest.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/filter/DynamicRouterFilterServiceTest.java
index e627bd0b3a1..bb9e7e7a308 100644
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/filter/DynamicRouterFilterServiceTest.java
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/filter/DynamicRouterFilterServiceTest.java
@@ -34,6 +34,7 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -76,7 +77,7 @@ class DynamicRouterFilterServiceTest {
@Test
void testDefaultConstruct() {
- assertNotNull(new DynamicRouterFilterService());
+ assertDoesNotThrow(() -> new DynamicRouterFilterService());
}
@Test
@@ -119,6 +120,47 @@ class DynamicRouterFilterServiceTest {
assertEquals(1, filters.size());
}
+ @Test
+ void testAddFilterInstanceAlreadyExists() {
+ Mockito.when(prioritizedFilter.id()).thenReturn("id");
+ Mockito.when(prioritizedFilter.priority()).thenReturn(1);
+ filterService.addFilterForChannel(prioritizedFilter, DYNAMIC_ROUTER_CHANNEL, false);
+ Collection<PrioritizedFilter> filters = filterService.getFiltersForChannel(DYNAMIC_ROUTER_CHANNEL);
+ assertEquals(1, filters.size());
+ String result = filterService.addFilterForChannel(prioritizedFilter, DYNAMIC_ROUTER_CHANNEL, false);
+ assertEquals("Error: Filter could not be added -- existing filter found with matching ID: true", result);
+ }
+
+ @Test
+ void testUpdateFilter() {
+ Mockito.when(prioritizedFilter.id()).thenReturn("id");
+ Mockito.when(prioritizedFilter.priority()).thenReturn(1);
+ filterService.addFilterForChannel(prioritizedFilter, DYNAMIC_ROUTER_CHANNEL, false);
+ Collection<PrioritizedFilter> filters = filterService.getFiltersForChannel(DYNAMIC_ROUTER_CHANNEL);
+ assertEquals(1, filters.size());
+ PrioritizedFilter filter = filters.stream()
+ .findFirst()
+ .orElseThrow(() -> new AssertionError("Could not get added filter"));
+ // Verify filter priority is (originally) 1
+ assertEquals(1, filter.priority());
+ // Update filter (change priority from 1 to 10)
+ Mockito.when(prioritizedFilter.priority()).thenReturn(10);
+ filterService.addFilterForChannel(prioritizedFilter, DYNAMIC_ROUTER_CHANNEL, true);
+ filters = filterService.getFiltersForChannel(DYNAMIC_ROUTER_CHANNEL);
+ assertEquals(1, filters.size());
+ filter = filters.stream()
+ .findFirst()
+ .orElseThrow(() -> new AssertionError("Could not get added filter"));
+ // Verify filter priority is now 10
+ assertEquals(10, filter.priority());
+ }
+
+ @Test
+ void testUpdateFilterDoesNotExist() {
+ String result = filterService.addFilterForChannel(prioritizedFilter, DYNAMIC_ROUTER_CHANNEL, true);
+ assertEquals("Error: Filter could not be updated -- existing filter found with matching ID: false", result);
+ }
+
@Test
void testGetFilter() {
Mockito.when(prioritizedFilter.id()).thenReturn("id");
@@ -228,6 +270,28 @@ class DynamicRouterFilterServiceTest {
Mockito.when(predicate.matches(exchange)).thenReturn(false);
filterService.addFilterForChannel(prioritizedFilter, DYNAMIC_ROUTER_CHANNEL, false);
String result = filterService.getMatchingEndpointsForExchangeByChannel(exchange, channel, false, false);
- Assertions.assertTrue(result.startsWith("log:"));
+ assertEquals("log:org.apache.camel.component.dynamicrouter.filter.DynamicRouterFilterService.test" +
+ "?level=DEBUG" +
+ "&showAll=true" +
+ "&multiline=true",
+ result);
+ }
+
+ @Test
+ void testGetMatchingEndpointsForExchangeByChannelWithNoMatchingRecipientsWithWarnDroppedMessage() {
+ String channel = "test";
+ Mockito.when(exchange.getMessage()).thenReturn(message);
+ Mockito.when(prioritizedFilter.id()).thenReturn("id");
+ Mockito.when(prioritizedFilter.priority()).thenReturn(1);
+ Mockito.when(prioritizedFilter.predicate()).thenReturn(predicate);
+ Mockito.when(prioritizedFilter.statistics()).thenReturn(prioritizedFilterStatistics);
+ Mockito.when(predicate.matches(exchange)).thenReturn(false);
+ filterService.addFilterForChannel(prioritizedFilter, DYNAMIC_ROUTER_CHANNEL, false);
+ String result = filterService.getMatchingEndpointsForExchangeByChannel(exchange, channel, false, true);
+ assertEquals("log:org.apache.camel.component.dynamicrouter.filter.DynamicRouterFilterService.test" +
+ "?level=WARN" +
+ "&showAll=true" +
+ "&multiline=true",
+ result);
}
}
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterComponentTest.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterComponentTest.java
index fc0c009c1f5..d4f6438ba74 100644
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterComponentTest.java
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterComponentTest.java
@@ -17,10 +17,12 @@
package org.apache.camel.component.dynamicrouter.routing;
import java.util.Collections;
+import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
+import org.apache.camel.Expression;
import org.apache.camel.Predicate;
import org.apache.camel.component.dynamicrouter.filter.DynamicRouterFilterService;
import org.apache.camel.component.dynamicrouter.filter.DynamicRouterFilterService.DynamicRouterFilterServiceFactory;
@@ -30,6 +32,7 @@ import org.apache.camel.component.dynamicrouter.filter.PrioritizedFilterStatisti
import org.apache.camel.component.dynamicrouter.routing.DynamicRouterEndpoint.DynamicRouterEndpointFactory;
import org.apache.camel.component.dynamicrouter.routing.DynamicRouterProcessor.DynamicRouterProcessorFactory;
import org.apache.camel.component.dynamicrouter.routing.DynamicRouterProducer.DynamicRouterProducerFactory;
+import org.apache.camel.processor.RecipientList;
import org.apache.camel.test.infra.core.CamelContextExtension;
import org.apache.camel.test.infra.core.DefaultCamelContextExtension;
import org.junit.jupiter.api.Assertions;
@@ -76,6 +79,8 @@ class DynamicRouterComponentTest {
DynamicRouterProducerFactory producerFactory;
+ BiFunction<CamelContext, Expression, RecipientList> recipientListSupplier;
+
PrioritizedFilterFactory prioritizedFilterFactory;
DynamicRouterFilterServiceFactory filterServiceFactory;
@@ -91,6 +96,7 @@ class DynamicRouterComponentTest {
final DynamicRouterConfiguration configuration,
final Supplier<DynamicRouterProcessorFactory> processorFactorySupplier,
final Supplier<DynamicRouterProducerFactory> producerFactorySupplier,
+ final BiFunction<CamelContext, Expression, RecipientList> recipientListSupplier,
final DynamicRouterFilterService filterService) {
return endpoint;
}
@@ -99,7 +105,8 @@ class DynamicRouterComponentTest {
@Override
public DynamicRouterProcessor getInstance(
CamelContext camelContext, DynamicRouterConfiguration configuration,
- DynamicRouterFilterService filterService) {
+ DynamicRouterFilterService filterService,
+ final BiFunction<CamelContext, Expression, RecipientList> recipientListSupplier) {
return processor;
}
};
@@ -125,9 +132,8 @@ class DynamicRouterComponentTest {
}
};
component = new DynamicRouterComponent(
- () -> endpointFactory, () -> processorFactory,
- () -> producerFactory, () -> prioritizedFilterFactory,
- () -> filterServiceFactory);
+ () -> endpointFactory, () -> processorFactory, () -> producerFactory, recipientListSupplier,
+ () -> prioritizedFilterFactory, () -> filterServiceFactory);
}
@Test
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterEndpointTest.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterEndpointTest.java
index 50263c1ba17..47deb67d4cb 100644
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterEndpointTest.java
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterEndpointTest.java
@@ -16,15 +16,20 @@
*/
package org.apache.camel.component.dynamicrouter.routing;
+import java.util.function.BiFunction;
+
import org.apache.camel.CamelContext;
+import org.apache.camel.Expression;
import org.apache.camel.Predicate;
import org.apache.camel.Producer;
import org.apache.camel.component.dynamicrouter.filter.DynamicRouterFilterService;
import org.apache.camel.component.dynamicrouter.filter.PrioritizedFilter;
import org.apache.camel.component.dynamicrouter.filter.PrioritizedFilter.PrioritizedFilterFactory;
import org.apache.camel.component.dynamicrouter.filter.PrioritizedFilterStatistics;
+import org.apache.camel.component.dynamicrouter.routing.DynamicRouterEndpoint.DynamicRouterEndpointFactory;
import org.apache.camel.component.dynamicrouter.routing.DynamicRouterProcessor.DynamicRouterProcessorFactory;
import org.apache.camel.component.dynamicrouter.routing.DynamicRouterProducer.DynamicRouterProducerFactory;
+import org.apache.camel.processor.RecipientList;
import org.apache.camel.test.infra.core.CamelContextExtension;
import org.apache.camel.test.infra.core.DefaultCamelContextExtension;
import org.junit.jupiter.api.BeforeEach;
@@ -67,6 +72,12 @@ class DynamicRouterEndpointTest {
@Mock
DynamicRouterFilterService filterService;
+ @Mock
+ RecipientList recipientList;
+
+ @Mock
+ BiFunction<CamelContext, Expression, RecipientList> recipientListSupplier;
+
DynamicRouterEndpoint endpoint;
CamelContext context;
@@ -84,7 +95,8 @@ class DynamicRouterEndpointTest {
@Override
public DynamicRouterProcessor getInstance(
CamelContext camelContext, DynamicRouterConfiguration configuration,
- DynamicRouterFilterService filterService) {
+ DynamicRouterFilterService filterService,
+ BiFunction<CamelContext, Expression, RecipientList> recipientListSupplier) {
return processor;
}
};
@@ -104,7 +116,8 @@ class DynamicRouterEndpointTest {
}
};
endpoint = new DynamicRouterEndpoint(
- BASE_URI, component, configuration, () -> processorFactory, () -> producerFactory, filterService);
+ BASE_URI, component, configuration, () -> processorFactory, () -> producerFactory, recipientListSupplier,
+ filterService);
}
@Test
@@ -120,15 +133,16 @@ class DynamicRouterEndpointTest {
@Test
void testGetInstanceWithDefaults() {
- DynamicRouterEndpoint endpoint = new DynamicRouterEndpoint.DynamicRouterEndpointFactory()
+ DynamicRouterEndpoint endpoint = new DynamicRouterEndpointFactory()
.getInstance(BASE_URI, component, configuration, filterService);
assertNotNull(endpoint);
}
@Test
void testGetInstance() {
- DynamicRouterEndpoint endpoint = new DynamicRouterEndpoint.DynamicRouterEndpointFactory()
- .getInstance(BASE_URI, component, configuration, () -> processorFactory, () -> producerFactory, filterService);
+ DynamicRouterEndpoint endpoint = new DynamicRouterEndpointFactory()
+ .getInstance(BASE_URI, component, configuration, () -> processorFactory, () -> producerFactory,
+ recipientListSupplier, filterService);
assertNotNull(endpoint);
}
}
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterProcessorTest.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterProcessorTest.java
index a351bc9a7a2..43d0c25732f 100644
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterProcessorTest.java
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterProcessorTest.java
@@ -16,9 +16,12 @@
*/
package org.apache.camel.component.dynamicrouter.routing;
+import java.util.function.BiFunction;
+
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
import org.apache.camel.Message;
import org.apache.camel.component.dynamicrouter.filter.DynamicRouterFilterService;
import org.apache.camel.component.dynamicrouter.routing.DynamicRouterProcessor.DynamicRouterProcessorFactory;
@@ -37,6 +40,8 @@ import org.mockito.junit.jupiter.MockitoExtension;
import static org.apache.camel.component.dynamicrouter.routing.DynamicRouterConstants.MODE_FIRST_MATCH;
import static org.apache.camel.component.dynamicrouter.routing.DynamicRouterConstants.RECIPIENT_LIST_HEADER;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -63,6 +68,9 @@ class DynamicRouterProcessorTest {
@Mock
DynamicRouterFilterService filterService;
+ @Mock
+ BiFunction<CamelContext, Expression, RecipientList> recipientListSupplier;
+
@Mock
Exchange exchange;
@@ -116,8 +124,9 @@ class DynamicRouterProcessorTest {
@Test
void testGetInstance() {
+ when(recipientListSupplier.apply(eq(context), any(Expression.class))).thenReturn(recipientList);
DynamicRouterProcessor instance = new DynamicRouterProcessorFactory()
- .getInstance(context, configuration, filterService);
+ .getInstance(context, configuration, filterService, recipientListSupplier);
Assertions.assertNotNull(instance);
}
}
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterRecipientListHelperTest.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterRecipientListHelperTest.java
new file mode 100644
index 00000000000..09062722e5c
--- /dev/null
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/routing/DynamicRouterRecipientListHelperTest.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.dynamicrouter.routing;
+
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.function.BiFunction;
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+import org.apache.camel.Processor;
+import org.apache.camel.processor.RecipientList;
+import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
+import org.apache.camel.processor.aggregate.AggregationStrategyBiFunctionAdapter;
+import org.apache.camel.spi.ExecutorServiceManager;
+import org.apache.camel.spi.Registry;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class DynamicRouterRecipientListHelperTest {
+
+ @Mock
+ CamelContext camelContext;
+
+ @Mock
+ Registry mockRegistry;
+
+ @Mock
+ BiFunction<Exchange, Exchange, Object> mockBiFunction;
+
+ @Mock
+ Object mockBean;
+
+ @Mock
+ AggregationStrategy mockStrategy;
+
+ @Mock
+ DynamicRouterConfiguration mockConfig;
+
+ @Mock
+ RecipientList recipientList;
+
+ @Mock
+ ExecutorService newThreadPool;
+
+ @Mock
+ ExecutorService existingThreadPool;
+
+ @Mock
+ ExecutorServiceManager manager;
+
+ @Mock
+ BiFunction<CamelContext, Expression, RecipientList> mockRecipientListSupplier;
+
+ @Mock
+ Processor mockProcessor;
+
+ @Mock
+ Exchange oldExchange;
+
+ @Mock
+ Exchange newExchange;
+
+ @Test
+ void testCreateBiFunctionAdapter() {
+ when(mockConfig.isAggregationStrategyMethodAllowNull()).thenReturn(true);
+ AggregationStrategyBiFunctionAdapter result = DynamicRouterRecipientListHelper.createBiFunctionAdapter.apply(mockBiFunction, mockConfig);
+ assertNotNull(result);
+ assertTrue(result.isAllowNullNewExchange());
+ assertTrue(result.isAllowNullOldExchange());
+ }
+
+ @Test
+ void testCreateBeanAdapter() {
+ when(mockConfig.isAggregationStrategyMethodAllowNull()).thenReturn(true);
+ AggregationStrategyBeanAdapter result = DynamicRouterRecipientListHelper.createBeanAdapter.apply(mockBean, mockConfig);
+ assertNotNull(result);
+ assertTrue(result.isAllowNullNewExchange());
+ assertTrue(result.isAllowNullOldExchange());
+ }
+
+ @Test
+ void testConvertAggregationStrategyWithAggregationStrategyClass() {
+ AggregationStrategy result
+ = DynamicRouterRecipientListHelper.convertAggregationStrategy.apply(mockStrategy, mockConfig);
+ assertSame(mockStrategy, result);
+ }
+
+ @Test
+ void testConvertAggregationStrategyWithBiFunctionClass() {
+ AggregationStrategy result
+ = DynamicRouterRecipientListHelper.convertAggregationStrategy.apply(mockBiFunction, mockConfig);
+ assertEquals(AggregationStrategyBiFunctionAdapter.class, result.getClass());
+ }
+
+ @Test
+ void testConvertAggregationStrategyWithBean() {
+ AggregationStrategy result = DynamicRouterRecipientListHelper.convertAggregationStrategy.apply(mockBean, mockConfig);
+ assertEquals(AggregationStrategyBeanAdapter.class, result.getClass());
+ }
+
+ @Test
+ void testConvertAggregationStrategyWithNull() {
+ assertThrows(IllegalArgumentException.class,
+ () -> DynamicRouterRecipientListHelper.convertAggregationStrategy.apply(null, mockConfig));
+ }
+
+ @Test
+ void testCreateProcessor() {
+ when(camelContext.getExecutorServiceManager()).thenReturn(manager);
+ when(mockRecipientListSupplier.apply(eq(camelContext), any(Expression.class))).thenReturn(recipientList);
+ Processor processor
+ = DynamicRouterRecipientListHelper.createProcessor(camelContext, mockConfig, mockRecipientListSupplier);
+ Assertions.assertNotNull(processor);
+ }
+
+ @Test
+ void testSetPropertiesForRecipientList() {
+ // Set up mocking
+ when(mockConfig.isParallelProcessing()).thenReturn(true);
+ when(mockConfig.isParallelAggregate()).thenReturn(true);
+ when(mockConfig.isSynchronous()).thenReturn(true);
+ when(mockConfig.isStreaming()).thenReturn(true);
+ when(mockConfig.isShareUnitOfWork()).thenReturn(true);
+ when(mockConfig.isStopOnException()).thenReturn(true);
+ when(mockConfig.isIgnoreInvalidEndpoints()).thenReturn(true);
+ when(mockConfig.getCacheSize()).thenReturn(10);
+ // Invoke the method under test
+ DynamicRouterRecipientListHelper.setPropertiesForRecipientList(recipientList, camelContext, mockConfig);
+ // Verify results
+ verify(recipientList, times(1)).setParallelProcessing(true);
+ verify(recipientList, times(1)).setParallelAggregate(true);
+ verify(recipientList, times(1)).setSynchronous(true);
+ verify(recipientList, times(1)).setStreaming(true);
+ verify(recipientList, times(1)).setShareUnitOfWork(true);
+ verify(recipientList, times(1)).setStopOnException(true);
+ verify(recipientList, times(1)).setIgnoreInvalidEndpoints(true);
+ verify(recipientList, times(1)).setCacheSize(10);
+ }
+
+ @Test
+ void testSetPropertiesForRecipientListWithGetOnPrepare() {
+ // Set up mocking
+ when(mockConfig.isParallelProcessing()).thenReturn(true);
+ when(mockConfig.getTimeout()).thenReturn(1000L);
+ when(mockConfig.getOnPrepare()).thenReturn("onPrepareRef");
+ when(camelContext.getRegistry()).thenReturn(mockRegistry);
+ when(camelContext.getRegistry().lookupByNameAndType("onPrepareRef", Processor.class))
+ .thenReturn(mockProcessor);
+ // Invoke the method under test
+ DynamicRouterRecipientListHelper.setPropertiesForRecipientList(recipientList, camelContext, mockConfig);
+ // Verify results
+ verify(recipientList, times(1)).setOnPrepare(mockProcessor);
+ }
+
+ @Test
+ void testSetPropertiesForRecipientListWithTimeoutAndNotParallelProcessing() {
+ // Set up mocking
+ when(mockConfig.isParallelProcessing()).thenReturn(false);
+ when(mockConfig.getTimeout()).thenReturn(1000L);
+ // Invoke the method under test
+ Exception ex = assertThrows(IllegalArgumentException.class,
+ () ->DynamicRouterRecipientListHelper.setPropertiesForRecipientList(recipientList, camelContext, mockConfig));
+ assertEquals("Timeout is used but ParallelProcessing has not been enabled.", ex.getMessage());
+ }
+
+ @Test
+ void testCreateAggregationStrategyWithInstance() {
+ when(mockConfig.getAggregationStrategyBean()).thenReturn(mockStrategy);
+ AggregationStrategy strategy = DynamicRouterRecipientListHelper.createAggregationStrategy(camelContext, mockConfig);
+ Assertions.assertNotNull(strategy);
+ }
+
+ @Test
+ void testCreateAggregationStrategyWithRef() {
+ when(mockConfig.getAggregationStrategy()).thenReturn("ref");
+ when(camelContext.getRegistry()).thenReturn(mockRegistry);
+ when(mockRegistry.lookupByNameAndType("ref", Object.class)).thenReturn(mockStrategy);
+ AggregationStrategy strategy = DynamicRouterRecipientListHelper.createAggregationStrategy(camelContext, mockConfig);
+ Assertions.assertNotNull(strategy);
+ }
+
+ @Test
+ void testCreateAggregationStrategyWithRefNotFound() {
+ when(mockConfig.getAggregationStrategy()).thenReturn("ref");
+ when(camelContext.getRegistry()).thenReturn(mockRegistry);
+ when(mockRegistry.lookupByNameAndType("ref", Object.class)).thenReturn(null);
+ Exception ex = assertThrows(IllegalArgumentException.class,
+ () -> DynamicRouterRecipientListHelper.createAggregationStrategy(camelContext, mockConfig));
+ assertEquals("Cannot find AggregationStrategy in Registry with name: ref", ex.getMessage());
+ }
+
+ @Test
+ void testCreateAggregationStrategyNoOp() {
+ when(mockConfig.getAggregationStrategyBean()).thenReturn(null);
+ when(mockConfig.getAggregationStrategy()).thenReturn(null);
+ AggregationStrategy strategy = DynamicRouterRecipientListHelper.createAggregationStrategy(camelContext, mockConfig);
+ Assertions.assertInstanceOf(DynamicRouterRecipientListHelper.NoopAggregationStrategy.class, strategy);
+ }
+
+ @Test
+ void testCreateAggregationStrategyWithShareUnitOfWorkStrategy() {
+ when(mockConfig.isShareUnitOfWork()).thenReturn(true);
+ AggregationStrategy strategy = DynamicRouterRecipientListHelper.createAggregationStrategy(camelContext, mockConfig);
+ Assertions.assertNotNull(strategy);
+ }
+
+ @Test
+ void testLookupExecutorServiceRef() {
+ String name = "ThreadPool";
+ Object source = new Object();
+ String executorServiceRef = "ThreadPoolRef";
+ when(camelContext.getRegistry()).thenReturn(mockRegistry);
+ when(camelContext.getExecutorServiceManager()).thenReturn(manager);
+ when(mockRegistry.lookupByNameAndType(executorServiceRef, ExecutorService.class)).thenReturn(existingThreadPool);
+ Optional<ExecutorService> executorService
+ = DynamicRouterRecipientListHelper.lookupExecutorServiceRef(camelContext, name, source, executorServiceRef);
+ Assertions.assertTrue(executorService.isPresent());
+ }
+
+ @Test
+ void testLookupExecutorServiceRefWithNullManager() {
+ String name = "ThreadPool";
+ Object source = new Object();
+ String executorServiceRef = "ThreadPoolRef";
+ when(camelContext.getExecutorServiceManager()).thenReturn(null);
+ Exception ex = assertThrows(IllegalArgumentException.class,
+ () -> DynamicRouterRecipientListHelper.lookupExecutorServiceRef(camelContext, name, source,
+ executorServiceRef));
+ assertEquals("ExecutorServiceManager must be specified", ex.getMessage());
+ }
+
+ @Test
+ void testLookupExecutorServiceRefWithNullRef() {
+ String name = "ThreadPool";
+ Object source = new Object();
+ when(camelContext.getExecutorServiceManager()).thenReturn(manager);
+ Exception ex = assertThrows(IllegalArgumentException.class,
+ () -> DynamicRouterRecipientListHelper.lookupExecutorServiceRef(camelContext, name, source, null));
+ assertEquals("executorServiceRef must be specified", ex.getMessage());
+ }
+
+ @Test
+ void testLookupExecutorServiceRefWithInvalidRef() {
+ String name = "ThreadPool";
+ Object source = new Object();
+ String executorServiceRef = "InvalidRef";
+ when(camelContext.getExecutorServiceManager()).thenReturn(manager);
+ when(camelContext.getRegistry()).thenReturn(mockRegistry);
+ Optional<ExecutorService> executorService
+ = DynamicRouterRecipientListHelper.lookupExecutorServiceRef(camelContext, name, source, executorServiceRef);
+ Assertions.assertFalse(executorService.isPresent());
+ }
+
+ @Test
+ void testLookupExecutorServiceRefWithExistingThreadPool() {
+ String name = "ThreadPool";
+ Object source = new Object();
+ String executorServiceRef = "ExistingThreadPool";
+ when(camelContext.getExecutorServiceManager()).thenReturn(manager);
+ when(camelContext.getRegistry()).thenReturn(mockRegistry);
+ when(camelContext.getRegistry().lookupByNameAndType(executorServiceRef, ExecutorService.class))
+ .thenReturn(existingThreadPool);
+ Optional<ExecutorService> executorService
+ = DynamicRouterRecipientListHelper.lookupExecutorServiceRef(camelContext, name, source, executorServiceRef);
+ Assertions.assertTrue(executorService.isPresent());
+ Assertions.assertEquals(existingThreadPool, executorService.get());
+ }
+
+ @Test
+ void testLookupExecutorServiceRefWithNewThreadPool() {
+ String name = "ThreadPool";
+ Object source = new Object();
+ String executorServiceRef = "NewThreadPool";
+ when(camelContext.getExecutorServiceManager()).thenReturn(manager);
+ when(manager.newThreadPool(source, name, executorServiceRef)).thenReturn(newThreadPool);
+ when(camelContext.getRegistry()).thenReturn(mockRegistry);
+ Optional<ExecutorService> executorService
+ = DynamicRouterRecipientListHelper.lookupExecutorServiceRef(camelContext, name, source, executorServiceRef);
+ Assertions.assertTrue(executorService.isPresent());
+ Assertions.assertEquals(newThreadPool, executorService.get());
+ }
+
+ @Test
+ void testLookupByNameAndTypeWithExistingObject() {
+ String name = "ExistingObject";
+ Object existingObject = new Object();
+ when(camelContext.getRegistry()).thenReturn(mockRegistry);
+ when(camelContext.getRegistry().lookupByNameAndType(name, Object.class))
+ .thenReturn(existingObject);
+ Optional<Object> object = DynamicRouterRecipientListHelper.lookupByNameAndType(camelContext, name, Object.class);
+ Assertions.assertTrue(object.isPresent());
+ Assertions.assertEquals(existingObject, object.get());
+ }
+
+ @Test
+ void testLookupByNameAndTypeWithNullName() {
+ Optional<Object> object = DynamicRouterRecipientListHelper.lookupByNameAndType(camelContext, null, Object.class);
+ Assertions.assertFalse(object.isPresent());
+ }
+
+ @Test
+ void testLookupByNameAndTypeWithReferenceParameter() {
+ String name = "#referenceParameter";
+ when(camelContext.getRegistry()).thenReturn(mockRegistry);
+ Optional<Object> object = DynamicRouterRecipientListHelper.lookupByNameAndType(camelContext, name, Object.class);
+ Assertions.assertFalse(object.isPresent());
+ }
+
+ @Test
+ void testLookupByNameAndTypeWithEmptyName() {
+ String name = "";
+ Optional<Object> object = DynamicRouterRecipientListHelper.lookupByNameAndType(camelContext, name, Object.class);
+ Assertions.assertFalse(object.isPresent());
+ }
+
+ @Test
+ void testWillCreateNewThreadPoolWithExecutorServiceBean() {
+ when(camelContext.getExecutorServiceManager()).thenReturn(manager);
+ when(mockConfig.getExecutorServiceBean()).thenReturn(existingThreadPool);
+ assertFalse(DynamicRouterRecipientListHelper.willCreateNewThreadPool(camelContext, mockConfig, true));
+ }
+
+ @Test
+ void testWillCreateNewThreadPoolWithExecutorServiceRef() {
+ String ref = "executorServiceRef";
+ when(camelContext.getExecutorServiceManager()).thenReturn(manager);
+ when(mockConfig.getExecutorService()).thenReturn(ref);
+ when(camelContext.getRegistry()).thenReturn(mockRegistry);
+ when(camelContext.getRegistry().lookupByNameAndType(ref, ExecutorService.class))
+ .thenReturn(existingThreadPool);
+ assertFalse(DynamicRouterRecipientListHelper.willCreateNewThreadPool(camelContext, mockConfig, true));
+ }
+
+ @Test
+ void testWillCreateNewThreadPoolWithDefault() {
+ when(camelContext.getExecutorServiceManager()).thenReturn(manager);
+ Assertions.assertTrue(DynamicRouterRecipientListHelper.willCreateNewThreadPool(camelContext, mockConfig, true));
+ }
+
+ @Test
+ void testGetConfiguredExecutorServiceWithExecutorServiceBean() {
+ when(mockConfig.getExecutorServiceBean()).thenReturn(existingThreadPool);
+ when(camelContext.getExecutorServiceManager()).thenReturn(manager);
+ ExecutorService result = DynamicRouterRecipientListHelper.getConfiguredExecutorService(camelContext, "someName", mockConfig, true);
+ assertEquals(existingThreadPool, result);
+ }
+
+ @Test
+ void testGetConfiguredExecutorServiceWithExecutorServiceRef() {
+ when(mockConfig.getExecutorServiceBean()).thenReturn(null);
+ when(mockConfig.getExecutorService()).thenReturn("existingThreadPool");
+ when(camelContext.getRegistry()).thenReturn(mockRegistry);
+ when(camelContext.getExecutorServiceManager()).thenReturn(manager);
+ when(mockRegistry.lookupByNameAndType("existingThreadPool", ExecutorService.class)).thenReturn(existingThreadPool);
+ ExecutorService result = DynamicRouterRecipientListHelper.getConfiguredExecutorService(camelContext, "someName", mockConfig, true);
+ assertEquals(existingThreadPool, result);
+ }
+
+ @Test
+ void testGetConfiguredExecutorServiceWithInvalidExecutorServiceRef() {
+ assertThrows(IllegalArgumentException.class,
+ () -> DynamicRouterRecipientListHelper.getConfiguredExecutorService(
+ camelContext, "someName", mockConfig, true));
+ }
+
+ @Test
+ void testGetConfiguredExecutorServiceWithDefault() {
+ when(mockConfig.getExecutorServiceBean()).thenReturn(null);
+ when(mockConfig.getExecutorService()).thenReturn(null);
+ when(camelContext.getExecutorServiceManager()).thenReturn(manager);
+ when(manager.newDefaultThreadPool(mockConfig, "someName")).thenReturn(newThreadPool);
+ ExecutorService result = DynamicRouterRecipientListHelper.getConfiguredExecutorService(camelContext, "someName", mockConfig, true);
+ assertEquals(newThreadPool, result);
+ }
+
+ @Test
+ void testGetConfiguredExecutorServiceWithoutBeanAndServiceRefAndUseDefaultFalse() {
+ when(mockConfig.getExecutorServiceBean()).thenReturn(null);
+ when(mockConfig.getExecutorService()).thenReturn(null);
+ when(camelContext.getExecutorServiceManager()).thenReturn(manager);
+ ExecutorService result = DynamicRouterRecipientListHelper.getConfiguredExecutorService(camelContext, "someName", mockConfig, false);
+ assertNull(result);
+ }
+
+ @Test
+ void testGetConfiguredExecutorServiceWithReferenceNotFound() {
+ String ref = "executorServiceRef";
+ when(camelContext.getExecutorServiceManager()).thenReturn(manager);
+ when(mockConfig.getExecutorService()).thenReturn(ref);
+ when(camelContext.getRegistry()).thenReturn(mockRegistry);
+ when(camelContext.getRegistry().lookupByNameAndType(ref, ExecutorService.class))
+ .thenReturn(null);
+ Exception ex = assertThrows(IllegalArgumentException.class,
+ () -> DynamicRouterRecipientListHelper.getConfiguredExecutorService(camelContext, ref, mockConfig, false));
+ assertEquals(
+ "ExecutorServiceRef 'executorServiceRef' not found in registry as an ExecutorService instance or as a thread pool profile",
+ ex.getMessage());
+ }
+
+ @Test
+ void testNoOpAggregationStrategy() {
+ DynamicRouterRecipientListHelper.NoopAggregationStrategy strategy
+ = new DynamicRouterRecipientListHelper.NoopAggregationStrategy();
+ Exchange result = strategy.aggregate(oldExchange, newExchange);
+ assertEquals(oldExchange, result);
+ }
+
+ @Test
+ void testNoOpAggregationStrategyWithNullOldExchange() {
+ DynamicRouterRecipientListHelper.NoopAggregationStrategy strategy
+ = new DynamicRouterRecipientListHelper.NoopAggregationStrategy();
+ Exchange result = strategy.aggregate(null, newExchange);
+ assertEquals(newExchange, result);
+ }
+
+ @Test
+ void testNoOpAggregationStrategyWithNullNewExchange() {
+ DynamicRouterRecipientListHelper.NoopAggregationStrategy strategy
+ = new DynamicRouterRecipientListHelper.NoopAggregationStrategy();
+ Exchange result = strategy.aggregate(oldExchange, null);
+ assertEquals(oldExchange, result);
+ }
+}
diff --git a/components/camel-dynamic-router/src/test/resources/org/apache/camel/component/dynamicrouter/integration/DynamicRouterJmxIT-context.xml b/components/camel-dynamic-router/src/test/resources/org/apache/camel/component/dynamicrouter/integration/DynamicRouterJmxIT-context.xml
index 85b6392df8d..3423809c649 100644
--- a/components/camel-dynamic-router/src/test/resources/org/apache/camel/component/dynamicrouter/integration/DynamicRouterJmxIT-context.xml
+++ b/components/camel-dynamic-router/src/test/resources/org/apache/camel/component/dynamicrouter/integration/DynamicRouterJmxIT-context.xml
@@ -40,5 +40,11 @@
&destinationUri=${headers.destinationUri}
&priority=${headers.priority}"/>
</route>
+ <!-- This route is not used, but it uses a non-dynamic "to" so that the control component and endpoint are
+ instantiated right away, which causes the control service to be registered with the context. -->
+ <route id="directControlToSubscribeNonDynamic">
+ <from uri="direct:control"/>
+ <to uri="dynamic-router-control://subscribe?subscribeChannel=test"/>
+ </route>
</camelContext>
</beans>