You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/03/25 21:10:16 UTC

[camel] branch main updated: CAMEL-19198: Added sorting logic to ensure dynamic router eip component filters are evaluated in order of their priority property, and updated tests. (#9637)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 0541b03fdb5 CAMEL-19198: Added sorting logic to ensure dynamic router eip component filters are evaluated in order of their priority property, and updated tests. (#9637)
0541b03fdb5 is described below

commit 0541b03fdb5ebd538e5c3c21abe8f32e71bdcedd
Author: Steve Storck <st...@gmail.com>
AuthorDate: Sat Mar 25 17:10:10 2023 -0400

    CAMEL-19198: Added sorting logic to ensure dynamic router eip component filters are evaluated in order of their priority property, and updated tests. (#9637)
---
 .../apache/camel/catalog/schemas/camel-spring.xsd  |  2 +-
 .../dynamicrouter/DynamicRouterProcessor.java      |  1 +
 .../dynamicrouter/DynamicRouterProcessorTest.java  | 33 ++++++++++++-------
 .../DynamicRouterSingleRouteTwoParticipantsIT.java | 37 +++++++++++++++++++++-
 .../support/DynamicRouterTestSupport.java          | 13 +++++---
 5 files changed, 69 insertions(+), 17 deletions(-)

diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
index d0d578bee11..22be641e2cb 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
@@ -384,7 +384,7 @@ Enriches a message with data from a secondary resource
   <xs:element name="errorHandler" nillable="true" type="xs:anyType">
     <xs:annotation>
       <xs:documentation xml:lang="en"><![CDATA[
-Camel error handling.
+Error handler settings
       ]]></xs:documentation>
     </xs:annotation>
   </xs:element>
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessor.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessor.java
index 1872b71ba7a..b8b41ed669c 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessor.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessor.java
@@ -237,6 +237,7 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra
     List<PrioritizedFilterProcessor> matchFilters(final Exchange exchange) {
         return Optional.of(
                 filterMap.values().stream()
+                        .sorted()
                         .filter(f -> f.matches(exchange))
                         .limit(MODE_FIRST_MATCH.equals(recipientMode) ? 1 : Integer.MAX_VALUE)
                         .collect(Collectors.toList()))
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessorTest.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessorTest.java
index 8ecf81f0dcd..28a812cb999 100644
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessorTest.java
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessorTest.java
@@ -45,7 +45,7 @@ class DynamicRouterProcessorTest extends DynamicRouterTestSupport {
         when(controlMessage.getPriority()).thenReturn(1);
         when(controlMessage.getPredicate()).thenReturn(e -> true);
         PrioritizedFilterProcessor result = processor.createFilter(controlMessage);
-        assertEquals(filterProcessor, result);
+        assertEquals(filterProcessorLowPriority, result);
     }
 
     @Test
@@ -56,21 +56,32 @@ class DynamicRouterProcessorTest extends DynamicRouterTestSupport {
 
     @Test
     void addFilterAsFilterProcessor() {
-        processor.addFilter(filterProcessor);
+        processor.addFilter(filterProcessorLowPriority);
         PrioritizedFilterProcessor result = processor.getFilter(TEST_ID);
-        assertEquals(filterProcessor, result);
+        assertEquals(filterProcessorLowPriority, result);
     }
 
     @Test
     void addMultipleFiltersWithSameId() {
-        processor.addFilter(filterProcessor);
-        processor.addFilter(filterProcessor);
-        processor.addFilter(filterProcessor);
-        processor.addFilter(filterProcessor);
+        processor.addFilter(filterProcessorLowPriority);
+        processor.addFilter(filterProcessorLowPriority);
+        processor.addFilter(filterProcessorLowPriority);
+        processor.addFilter(filterProcessorLowPriority);
         List<PrioritizedFilterProcessor> matchingFilters = processor.matchFilters(exchange);
         assertEquals(1, matchingFilters.size());
     }
 
+    @Test
+    void testMultipleFilterOrderByPriorityNotIdKey() {
+        when(filterProcessorLowestPriority.getId()).thenReturn("anIdThatComesLexicallyBeforeTestId");
+        processor.addFilter(filterProcessorLowestPriority);
+        processor.addFilter(filterProcessorLowPriority);
+        List<PrioritizedFilterProcessor> matchingFilters = processor.matchFilters(exchange);
+        assertEquals(1, matchingFilters.size());
+        PrioritizedFilterProcessor matchingFilter = matchingFilters.get(0);
+        assertEquals(matchingFilter.getId(), TEST_ID);
+    }
+
     @Test
     void removeFilter() {
         addFilterAsFilterProcessor();
@@ -89,21 +100,21 @@ class DynamicRouterProcessorTest extends DynamicRouterTestSupport {
     @Test
     void matchFiltersDoesNotMatch() {
         PrioritizedFilterProcessor result = processor.matchFilters(exchange).get(0);
-        assertEquals(Integer.MAX_VALUE, result.getPriority());
+        assertEquals(Integer.MAX_VALUE - 1000, result.getPriority());
     }
 
     @Test
     void processMatching() {
         addFilterAsFilterProcessor();
-        when(filterProcessor.matches(exchange)).thenReturn(true);
-        lenient().when(filterProcessor.process(any(Exchange.class), any(AsyncCallback.class))).thenReturn(true);
+        when(filterProcessorLowPriority.matches(exchange)).thenReturn(true);
+        lenient().when(filterProcessorLowPriority.process(any(Exchange.class), any(AsyncCallback.class))).thenReturn(true);
         Assertions.assertFalse(processor.process(exchange, asyncCallback));
     }
 
     @Test
     void processNotMatching() {
         addFilterAsFilterProcessor();
-        when(filterProcessor.matches(exchange)).thenReturn(false);
+        when(filterProcessorLowPriority.matches(exchange)).thenReturn(false);
         Assertions.assertFalse(processor.process(exchange, asyncCallback));
     }
 
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterSingleRouteTwoParticipantsIT.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterSingleRouteTwoParticipantsIT.java
index 85d988dfd9e..a808dbb9ee3 100644
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterSingleRouteTwoParticipantsIT.java
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterSingleRouteTwoParticipantsIT.java
@@ -61,6 +61,9 @@ public class DynamicRouterSingleRouteTwoParticipantsIT {
     @EndpointInject("mock:three")
     MockEndpoint mockThree;
 
+    @EndpointInject("mock:four")
+    MockEndpoint mockFour;
+
     @Produce("direct:start")
     ProducerTemplate start;
 
@@ -73,6 +76,8 @@ public class DynamicRouterSingleRouteTwoParticipantsIT {
 
     DynamicRouterControlMessage allSubscribeMsg;
 
+    DynamicRouterControlMessage allSubscribeMsgLowerPriority;
+
     @BeforeEach
     void setup() {
         // Create a subscription that accepts an exchange when the message body contains an even number
@@ -98,12 +103,23 @@ public class DynamicRouterSingleRouteTwoParticipantsIT {
         // Create a subscription that accepts an exchange when the message body contains any number
         // The destination URI is for the endpoint "mockThree"
         allSubscribeMsg = new SubscribeMessageBuilder()
-                .id("allNumberSubscription")
+                .id("everyNumberSubscription")
                 .channel("test")
                 .priority(1)
                 .endpointUri(mockThree.getEndpointUri())
                 .predicate(body().regex("^\\d+$"))
                 .build();
+
+        // Create a subscription that has an id that is lexically smaller than the rest, but a
+        // priority number that is higher than the rest to test that priority is working properly.
+        // The destination URI is for the endpoint "mockFour"
+        allSubscribeMsgLowerPriority = new SubscribeMessageBuilder()
+                .id("aLowerPrioritySubscription")
+                .channel("test")
+                .priority(10)
+                .endpointUri(mockFour.getEndpointUri())
+                .predicate(body().regex("^\\d+$"))
+                .build();
     }
 
     /**
@@ -145,6 +161,25 @@ public class DynamicRouterSingleRouteTwoParticipantsIT {
         sendMessagesAndAssert();
     }
 
+    /**
+     * This test shows what happens when there are conflicting rules. The first matching subscriber wins. When two
+     * subscribers have registered at the same priority level, and the predicates match for both, then it is not clearly
+     * determined which subscriber will receive the exchange.
+     *
+     * @throws InterruptedException if interrupted while waiting for mocks to be satisfied
+     */
+    @Test
+    void testConsumersWithDifferentPriorities() throws InterruptedException {
+        mockThree.expectedBodiesReceivedInAnyOrder(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+        mockFour.setExpectedCount(0);
+
+        // Subscribe with filters that perform the same evaluation, but the difference
+        // is in message priority.
+        subscribe(List.of(allSubscribeMsg, allSubscribeMsgLowerPriority));
+
+        sendMessagesAndAssert();
+    }
+
     private void subscribe(List<DynamicRouterControlMessage> messages) {
         messages.forEach(message -> subscribe.sendBody(message));
     }
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/support/DynamicRouterTestSupport.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/support/DynamicRouterTestSupport.java
index 8f20f14365c..14cc0125e67 100644
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/support/DynamicRouterTestSupport.java
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/support/DynamicRouterTestSupport.java
@@ -116,7 +116,10 @@ public class DynamicRouterTestSupport extends CamelTestSupport {
     protected DynamicRouterControlProducer controlProducer;
 
     @Mock
-    protected PrioritizedFilterProcessor filterProcessor;
+    protected PrioritizedFilterProcessor filterProcessorLowPriority;
+
+    @Mock
+    protected PrioritizedFilterProcessor filterProcessorLowestPriority;
 
     @Mock
     protected DynamicRouterControlMessage controlMessage;
@@ -186,8 +189,10 @@ public class DynamicRouterTestSupport extends CamelTestSupport {
 
         lenient().when(simpleLanguage.createPredicate(anyString())).thenReturn(predicate);
 
-        lenient().when(filterProcessor.getId()).thenReturn(TEST_ID);
-        lenient().when(filterProcessor.getPriority()).thenReturn(Integer.MAX_VALUE);
+        lenient().when(filterProcessorLowPriority.getId()).thenReturn(TEST_ID);
+        lenient().when(filterProcessorLowPriority.getPriority()).thenReturn(Integer.MAX_VALUE - 1000);
+
+        lenient().when(filterProcessorLowestPriority.getPriority()).thenReturn(Integer.MAX_VALUE);
 
         lenient().doNothing().when(asyncCallback).done(anyBoolean());
 
@@ -252,7 +257,7 @@ public class DynamicRouterTestSupport extends CamelTestSupport {
             @Override
             public PrioritizedFilterProcessor getInstance(
                     String id, int priority, CamelContext context, Predicate predicate, Processor processor) {
-                return filterProcessor;
+                return filterProcessorLowPriority;
             }
         };
     }