You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/03/25 21:10:16 UTC
[camel] branch main updated: CAMEL-19198: Added sorting logic to ensure dynamic router eip component filters are evaluated in order of their priority property, and updated tests. (#9637)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 0541b03fdb5 CAMEL-19198: Added sorting logic to ensure dynamic router eip component filters are evaluated in order of their priority property, and updated tests. (#9637)
0541b03fdb5 is described below
commit 0541b03fdb5ebd538e5c3c21abe8f32e71bdcedd
Author: Steve Storck <st...@gmail.com>
AuthorDate: Sat Mar 25 17:10:10 2023 -0400
CAMEL-19198: Added sorting logic to ensure dynamic router eip component filters are evaluated in order of their priority property, and updated tests. (#9637)
---
.../apache/camel/catalog/schemas/camel-spring.xsd | 2 +-
.../dynamicrouter/DynamicRouterProcessor.java | 1 +
.../dynamicrouter/DynamicRouterProcessorTest.java | 33 ++++++++++++-------
.../DynamicRouterSingleRouteTwoParticipantsIT.java | 37 +++++++++++++++++++++-
.../support/DynamicRouterTestSupport.java | 13 +++++---
5 files changed, 69 insertions(+), 17 deletions(-)
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
index d0d578bee11..22be641e2cb 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
@@ -384,7 +384,7 @@ Enriches a message with data from a secondary resource
<xs:element name="errorHandler" nillable="true" type="xs:anyType">
<xs:annotation>
<xs:documentation xml:lang="en"><![CDATA[
-Camel error handling.
+Error handler settings
]]></xs:documentation>
</xs:annotation>
</xs:element>
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessor.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessor.java
index 1872b71ba7a..b8b41ed669c 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessor.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessor.java
@@ -237,6 +237,7 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra
List<PrioritizedFilterProcessor> matchFilters(final Exchange exchange) {
return Optional.of(
filterMap.values().stream()
+ .sorted()
.filter(f -> f.matches(exchange))
.limit(MODE_FIRST_MATCH.equals(recipientMode) ? 1 : Integer.MAX_VALUE)
.collect(Collectors.toList()))
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessorTest.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessorTest.java
index 8ecf81f0dcd..28a812cb999 100644
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessorTest.java
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessorTest.java
@@ -45,7 +45,7 @@ class DynamicRouterProcessorTest extends DynamicRouterTestSupport {
when(controlMessage.getPriority()).thenReturn(1);
when(controlMessage.getPredicate()).thenReturn(e -> true);
PrioritizedFilterProcessor result = processor.createFilter(controlMessage);
- assertEquals(filterProcessor, result);
+ assertEquals(filterProcessorLowPriority, result);
}
@Test
@@ -56,21 +56,32 @@ class DynamicRouterProcessorTest extends DynamicRouterTestSupport {
@Test
void addFilterAsFilterProcessor() {
- processor.addFilter(filterProcessor);
+ processor.addFilter(filterProcessorLowPriority);
PrioritizedFilterProcessor result = processor.getFilter(TEST_ID);
- assertEquals(filterProcessor, result);
+ assertEquals(filterProcessorLowPriority, result);
}
@Test
void addMultipleFiltersWithSameId() {
- processor.addFilter(filterProcessor);
- processor.addFilter(filterProcessor);
- processor.addFilter(filterProcessor);
- processor.addFilter(filterProcessor);
+ processor.addFilter(filterProcessorLowPriority);
+ processor.addFilter(filterProcessorLowPriority);
+ processor.addFilter(filterProcessorLowPriority);
+ processor.addFilter(filterProcessorLowPriority);
List<PrioritizedFilterProcessor> matchingFilters = processor.matchFilters(exchange);
assertEquals(1, matchingFilters.size());
}
+ @Test
+ void testMultipleFilterOrderByPriorityNotIdKey() {
+ when(filterProcessorLowestPriority.getId()).thenReturn("anIdThatComesLexicallyBeforeTestId");
+ processor.addFilter(filterProcessorLowestPriority);
+ processor.addFilter(filterProcessorLowPriority);
+ List<PrioritizedFilterProcessor> matchingFilters = processor.matchFilters(exchange);
+ assertEquals(1, matchingFilters.size());
+ PrioritizedFilterProcessor matchingFilter = matchingFilters.get(0);
+ assertEquals(matchingFilter.getId(), TEST_ID);
+ }
+
@Test
void removeFilter() {
addFilterAsFilterProcessor();
@@ -89,21 +100,21 @@ class DynamicRouterProcessorTest extends DynamicRouterTestSupport {
@Test
void matchFiltersDoesNotMatch() {
PrioritizedFilterProcessor result = processor.matchFilters(exchange).get(0);
- assertEquals(Integer.MAX_VALUE, result.getPriority());
+ assertEquals(Integer.MAX_VALUE - 1000, result.getPriority());
}
@Test
void processMatching() {
addFilterAsFilterProcessor();
- when(filterProcessor.matches(exchange)).thenReturn(true);
- lenient().when(filterProcessor.process(any(Exchange.class), any(AsyncCallback.class))).thenReturn(true);
+ when(filterProcessorLowPriority.matches(exchange)).thenReturn(true);
+ lenient().when(filterProcessorLowPriority.process(any(Exchange.class), any(AsyncCallback.class))).thenReturn(true);
Assertions.assertFalse(processor.process(exchange, asyncCallback));
}
@Test
void processNotMatching() {
addFilterAsFilterProcessor();
- when(filterProcessor.matches(exchange)).thenReturn(false);
+ when(filterProcessorLowPriority.matches(exchange)).thenReturn(false);
Assertions.assertFalse(processor.process(exchange, asyncCallback));
}
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterSingleRouteTwoParticipantsIT.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterSingleRouteTwoParticipantsIT.java
index 85d988dfd9e..a808dbb9ee3 100644
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterSingleRouteTwoParticipantsIT.java
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterSingleRouteTwoParticipantsIT.java
@@ -61,6 +61,9 @@ public class DynamicRouterSingleRouteTwoParticipantsIT {
@EndpointInject("mock:three")
MockEndpoint mockThree;
+ @EndpointInject("mock:four")
+ MockEndpoint mockFour;
+
@Produce("direct:start")
ProducerTemplate start;
@@ -73,6 +76,8 @@ public class DynamicRouterSingleRouteTwoParticipantsIT {
DynamicRouterControlMessage allSubscribeMsg;
+ DynamicRouterControlMessage allSubscribeMsgLowerPriority;
+
@BeforeEach
void setup() {
// Create a subscription that accepts an exchange when the message body contains an even number
@@ -98,12 +103,23 @@ public class DynamicRouterSingleRouteTwoParticipantsIT {
// Create a subscription that accepts an exchange when the message body contains any number
// The destination URI is for the endpoint "mockThree"
allSubscribeMsg = new SubscribeMessageBuilder()
- .id("allNumberSubscription")
+ .id("everyNumberSubscription")
.channel("test")
.priority(1)
.endpointUri(mockThree.getEndpointUri())
.predicate(body().regex("^\\d+$"))
.build();
+
+ // Create a subscription that has an id that is lexically smaller than the rest, but a
+ // priority number that is higher than the rest to test that priority is working properly.
+ // The destination URI is for the endpoint "mockFour"
+ allSubscribeMsgLowerPriority = new SubscribeMessageBuilder()
+ .id("aLowerPrioritySubscription")
+ .channel("test")
+ .priority(10)
+ .endpointUri(mockFour.getEndpointUri())
+ .predicate(body().regex("^\\d+$"))
+ .build();
}
/**
@@ -145,6 +161,25 @@ public class DynamicRouterSingleRouteTwoParticipantsIT {
sendMessagesAndAssert();
}
+ /**
+ * This test shows what happens when there are conflicting rules. The first matching subscriber wins. When two
+ * subscribers have registered at the same priority level, and the predicates match for both, then it is not clearly
+ * determined which subscriber will receive the exchange.
+ *
+ * @throws InterruptedException if interrupted while waiting for mocks to be satisfied
+ */
+ @Test
+ void testConsumersWithDifferentPriorities() throws InterruptedException {
+ mockThree.expectedBodiesReceivedInAnyOrder(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+ mockFour.setExpectedCount(0);
+
+ // Subscribe with filters that perform the same evaluation, but the difference
+ // is in message priority.
+ subscribe(List.of(allSubscribeMsg, allSubscribeMsgLowerPriority));
+
+ sendMessagesAndAssert();
+ }
+
private void subscribe(List<DynamicRouterControlMessage> messages) {
messages.forEach(message -> subscribe.sendBody(message));
}
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/support/DynamicRouterTestSupport.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/support/DynamicRouterTestSupport.java
index 8f20f14365c..14cc0125e67 100644
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/support/DynamicRouterTestSupport.java
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/support/DynamicRouterTestSupport.java
@@ -116,7 +116,10 @@ public class DynamicRouterTestSupport extends CamelTestSupport {
protected DynamicRouterControlProducer controlProducer;
@Mock
- protected PrioritizedFilterProcessor filterProcessor;
+ protected PrioritizedFilterProcessor filterProcessorLowPriority;
+
+ @Mock
+ protected PrioritizedFilterProcessor filterProcessorLowestPriority;
@Mock
protected DynamicRouterControlMessage controlMessage;
@@ -186,8 +189,10 @@ public class DynamicRouterTestSupport extends CamelTestSupport {
lenient().when(simpleLanguage.createPredicate(anyString())).thenReturn(predicate);
- lenient().when(filterProcessor.getId()).thenReturn(TEST_ID);
- lenient().when(filterProcessor.getPriority()).thenReturn(Integer.MAX_VALUE);
+ lenient().when(filterProcessorLowPriority.getId()).thenReturn(TEST_ID);
+ lenient().when(filterProcessorLowPriority.getPriority()).thenReturn(Integer.MAX_VALUE - 1000);
+
+ lenient().when(filterProcessorLowestPriority.getPriority()).thenReturn(Integer.MAX_VALUE);
lenient().doNothing().when(asyncCallback).done(anyBoolean());
@@ -252,7 +257,7 @@ public class DynamicRouterTestSupport extends CamelTestSupport {
@Override
public PrioritizedFilterProcessor getInstance(
String id, int priority, CamelContext context, Predicate predicate, Processor processor) {
- return filterProcessor;
+ return filterProcessorLowPriority;
}
};
}