You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/03/26 06:11:34 UTC

[camel] branch camel-3.20.x updated: CAMEL-19198: Added sorting logic to ensure dynamic router eip component filters are evaluated in order of their priority property, and updated tests. (#9638)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.20.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.20.x by this push:
     new 2293d58120a CAMEL-19198: Added sorting logic to ensure dynamic router eip component filters are evaluated in order of their priority property, and updated tests. (#9638)
2293d58120a is described below

commit 2293d58120a7bcd955e893c106585976b49bd908
Author: Steve Storck <st...@gmail.com>
AuthorDate: Sun Mar 26 02:10:04 2023 -0400

    CAMEL-19198: Added sorting logic to ensure dynamic router eip component filters are evaluated in order of their priority property, and updated tests. (#9638)
---
 .../dynamicrouter/DynamicRouterProcessor.java      |  1 +
 .../dynamicrouter/DynamicRouterProcessorTest.java  | 33 ++++++++++++-------
 .../DynamicRouterSingleRouteTwoParticipantsIT.java | 37 +++++++++++++++++++++-
 .../support/DynamicRouterTestSupport.java          | 13 +++++---
 4 files changed, 68 insertions(+), 16 deletions(-)

diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessor.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessor.java
index c82e201d4ad..eaaa39f5843 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessor.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessor.java
@@ -237,6 +237,7 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra
     List<PrioritizedFilterProcessor> matchFilters(final Exchange exchange) {
         return Optional.of(
                 filterMap.values().stream()
+                        .sorted()
                         .filter(f -> f.matches(exchange))
                         .limit(MODE_FIRST_MATCH.equals(recipientMode) ? 1 : Integer.MAX_VALUE)
                         .collect(Collectors.toList()))
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessorTest.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessorTest.java
index 8ecf81f0dcd..28a812cb999 100644
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessorTest.java
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessorTest.java
@@ -45,7 +45,7 @@ class DynamicRouterProcessorTest extends DynamicRouterTestSupport {
         when(controlMessage.getPriority()).thenReturn(1);
         when(controlMessage.getPredicate()).thenReturn(e -> true);
         PrioritizedFilterProcessor result = processor.createFilter(controlMessage);
-        assertEquals(filterProcessor, result);
+        assertEquals(filterProcessorLowPriority, result);
     }
 
     @Test
@@ -56,21 +56,32 @@ class DynamicRouterProcessorTest extends DynamicRouterTestSupport {
 
     @Test
     void addFilterAsFilterProcessor() {
-        processor.addFilter(filterProcessor);
+        processor.addFilter(filterProcessorLowPriority);
         PrioritizedFilterProcessor result = processor.getFilter(TEST_ID);
-        assertEquals(filterProcessor, result);
+        assertEquals(filterProcessorLowPriority, result);
     }
 
     @Test
     void addMultipleFiltersWithSameId() {
-        processor.addFilter(filterProcessor);
-        processor.addFilter(filterProcessor);
-        processor.addFilter(filterProcessor);
-        processor.addFilter(filterProcessor);
+        processor.addFilter(filterProcessorLowPriority);
+        processor.addFilter(filterProcessorLowPriority);
+        processor.addFilter(filterProcessorLowPriority);
+        processor.addFilter(filterProcessorLowPriority);
         List<PrioritizedFilterProcessor> matchingFilters = processor.matchFilters(exchange);
         assertEquals(1, matchingFilters.size());
     }
 
+    @Test
+    void testMultipleFilterOrderByPriorityNotIdKey() {
+        when(filterProcessorLowestPriority.getId()).thenReturn("anIdThatComesLexicallyBeforeTestId");
+        processor.addFilter(filterProcessorLowestPriority);
+        processor.addFilter(filterProcessorLowPriority);
+        List<PrioritizedFilterProcessor> matchingFilters = processor.matchFilters(exchange);
+        assertEquals(1, matchingFilters.size());
+        PrioritizedFilterProcessor matchingFilter = matchingFilters.get(0);
+        assertEquals(matchingFilter.getId(), TEST_ID);
+    }
+
     @Test
     void removeFilter() {
         addFilterAsFilterProcessor();
@@ -89,21 +100,21 @@ class DynamicRouterProcessorTest extends DynamicRouterTestSupport {
     @Test
     void matchFiltersDoesNotMatch() {
         PrioritizedFilterProcessor result = processor.matchFilters(exchange).get(0);
-        assertEquals(Integer.MAX_VALUE, result.getPriority());
+        assertEquals(Integer.MAX_VALUE - 1000, result.getPriority());
     }
 
     @Test
     void processMatching() {
         addFilterAsFilterProcessor();
-        when(filterProcessor.matches(exchange)).thenReturn(true);
-        lenient().when(filterProcessor.process(any(Exchange.class), any(AsyncCallback.class))).thenReturn(true);
+        when(filterProcessorLowPriority.matches(exchange)).thenReturn(true);
+        lenient().when(filterProcessorLowPriority.process(any(Exchange.class), any(AsyncCallback.class))).thenReturn(true);
         Assertions.assertFalse(processor.process(exchange, asyncCallback));
     }
 
     @Test
     void processNotMatching() {
         addFilterAsFilterProcessor();
-        when(filterProcessor.matches(exchange)).thenReturn(false);
+        when(filterProcessorLowPriority.matches(exchange)).thenReturn(false);
         Assertions.assertFalse(processor.process(exchange, asyncCallback));
     }
 
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterSingleRouteTwoParticipantsIT.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterSingleRouteTwoParticipantsIT.java
index 85d988dfd9e..a808dbb9ee3 100644
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterSingleRouteTwoParticipantsIT.java
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/integration/DynamicRouterSingleRouteTwoParticipantsIT.java
@@ -61,6 +61,9 @@ public class DynamicRouterSingleRouteTwoParticipantsIT {
     @EndpointInject("mock:three")
     MockEndpoint mockThree;
 
+    @EndpointInject("mock:four")
+    MockEndpoint mockFour;
+
     @Produce("direct:start")
     ProducerTemplate start;
 
@@ -73,6 +76,8 @@ public class DynamicRouterSingleRouteTwoParticipantsIT {
 
     DynamicRouterControlMessage allSubscribeMsg;
 
+    DynamicRouterControlMessage allSubscribeMsgLowerPriority;
+
     @BeforeEach
     void setup() {
         // Create a subscription that accepts an exchange when the message body contains an even number
@@ -98,12 +103,23 @@ public class DynamicRouterSingleRouteTwoParticipantsIT {
         // Create a subscription that accepts an exchange when the message body contains any number
         // The destination URI is for the endpoint "mockThree"
         allSubscribeMsg = new SubscribeMessageBuilder()
-                .id("allNumberSubscription")
+                .id("everyNumberSubscription")
                 .channel("test")
                 .priority(1)
                 .endpointUri(mockThree.getEndpointUri())
                 .predicate(body().regex("^\\d+$"))
                 .build();
+
+        // Create a subscription that has an id that is lexically smaller than the rest, but a
+        // priority number that is higher than the rest to test that priority is working properly.
+        // The destination URI is for the endpoint "mockFour"
+        allSubscribeMsgLowerPriority = new SubscribeMessageBuilder()
+                .id("aLowerPrioritySubscription")
+                .channel("test")
+                .priority(10)
+                .endpointUri(mockFour.getEndpointUri())
+                .predicate(body().regex("^\\d+$"))
+                .build();
     }
 
     /**
@@ -145,6 +161,25 @@ public class DynamicRouterSingleRouteTwoParticipantsIT {
         sendMessagesAndAssert();
     }
 
+    /**
+     * This test shows what happens when there are conflicting rules. The first matching subscriber wins. When two
+     * subscribers have registered at the same priority level, and the predicates match for both, then it is not clearly
+     * determined which subscriber will receive the exchange.
+     *
+     * @throws InterruptedException if interrupted while waiting for mocks to be satisfied
+     */
+    @Test
+    void testConsumersWithDifferentPriorities() throws InterruptedException {
+        mockThree.expectedBodiesReceivedInAnyOrder(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+        mockFour.setExpectedCount(0);
+
+        // Subscribe with filters that perform the same evaluation, but the difference
+        // is in message priority.
+        subscribe(List.of(allSubscribeMsg, allSubscribeMsgLowerPriority));
+
+        sendMessagesAndAssert();
+    }
+
     private void subscribe(List<DynamicRouterControlMessage> messages) {
         messages.forEach(message -> subscribe.sendBody(message));
     }
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/support/DynamicRouterTestSupport.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/support/DynamicRouterTestSupport.java
index 3deffb2b01c..d97e4c68a0a 100644
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/support/DynamicRouterTestSupport.java
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/support/DynamicRouterTestSupport.java
@@ -113,7 +113,10 @@ public class DynamicRouterTestSupport extends CamelTestSupport {
     protected DynamicRouterControlProducer controlProducer;
 
     @Mock
-    protected PrioritizedFilterProcessor filterProcessor;
+    protected PrioritizedFilterProcessor filterProcessorLowPriority;
+
+    @Mock
+    protected PrioritizedFilterProcessor filterProcessorLowestPriority;
 
     @Mock
     protected DynamicRouterControlMessage controlMessage;
@@ -183,8 +186,10 @@ public class DynamicRouterTestSupport extends CamelTestSupport {
 
         lenient().when(simpleLanguage.createPredicate(anyString())).thenReturn(predicate);
 
-        lenient().when(filterProcessor.getId()).thenReturn(TEST_ID);
-        lenient().when(filterProcessor.getPriority()).thenReturn(Integer.MAX_VALUE);
+        lenient().when(filterProcessorLowPriority.getId()).thenReturn(TEST_ID);
+        lenient().when(filterProcessorLowPriority.getPriority()).thenReturn(Integer.MAX_VALUE - 1000);
+
+        lenient().when(filterProcessorLowestPriority.getPriority()).thenReturn(Integer.MAX_VALUE);
 
         lenient().doNothing().when(asyncCallback).done(anyBoolean());
 
@@ -249,7 +254,7 @@ public class DynamicRouterTestSupport extends CamelTestSupport {
             @Override
             public PrioritizedFilterProcessor getInstance(
                     String id, int priority, CamelContext context, Predicate predicate, Processor processor) {
-                return filterProcessor;
+                return filterProcessorLowPriority;
             }
         };
     }