You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/10/17 12:43:57 UTC

[camel] 01/02: CAMEL-18549: Changed filter list in the DynamicRouterProcessor to a m… (#8431)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 9c7908132ac73e6b2ed097c6d55b705524dd3c88
Author: Steve Storck <st...@gmail.com>
AuthorDate: Mon Sep 26 04:37:35 2022 -0400

    CAMEL-18549: Changed filter list in the DynamicRouterProcessor to a m… (#8431)
    
    * CAMEL-18549: Changed filter list in the DynamicRouterProcessor to a map, where filters are mapped by filter ID.
    
    * CAMEL-18549: Fixed checkstyle error for import  out of order.
    
    * CAMEL-18549: Attempting to fix build errors.
---
 .../dynamicrouter/DynamicRouterProcessor.java      | 47 +++++++++-------------
 .../dynamicrouter/DynamicRouterProcessorTest.java  | 16 +++++++-
 2 files changed, 32 insertions(+), 31 deletions(-)

diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessor.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessor.java
index 5509a2f30d3..c82e201d4ad 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessor.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessor.java
@@ -16,10 +16,7 @@
  */
 package org.apache.camel.component.dynamicrouter;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
+import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -65,9 +62,10 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra
     private static final String LOG_ENDPOINT = "log:%s.%s?level=%s&showAll=true&multiline=true";
 
     /**
-     * {@link FilterProcessor}s to determine if the incoming exchange should be routed, based on the content.
+     * {@link FilterProcessor}s, mapped by subscription ID, to determine if the incoming exchange should be routed based
+     * on the content.
      */
-    private final ArrayList<PrioritizedFilterProcessor> filters;
+    private final TreeMap<String, PrioritizedFilterProcessor> filterMap;
 
     /**
      * The camel context.
@@ -130,7 +128,7 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra
                                   final boolean warnDroppedMessage,
                                   final Supplier<PrioritizedFilterProcessorFactory> filterProcessorFactorySupplier) {
         this.id = id;
-        this.filters = new ArrayList<>();
+        this.filterMap = new TreeMap<>();
         this.camelContext = camelContext;
         this.recipientMode = recipientMode;
         this.producerTemplate = camelContext.createProducerTemplate();
@@ -196,10 +194,9 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra
      * @param filter the filter to add
      */
     public void addFilter(final PrioritizedFilterProcessor filter) {
-        synchronized (filters) {
+        synchronized (filterMap) {
             if (filter != null) {
-                filters.add(filter);
-                filters.sort(PrioritizedFilterProcessor.COMPARATOR);
+                filterMap.put(filter.getId(), filter);
                 LOG.debug("Added subscription: {}", filter);
             }
         }
@@ -212,10 +209,7 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra
      * @return          the filter with the supplied ID, or null
      */
     public PrioritizedFilterProcessor getFilter(final String filterId) {
-        return filters.stream()
-                .filter(f -> filterId.equals(f.getId()))
-                .findFirst()
-                .orElse(null);
+        return filterMap.get(filterId);
     }
 
     /**
@@ -224,22 +218,17 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra
      * @param filterId the ID of the filter to remove
      */
     public void removeFilter(final String filterId) {
-        synchronized (filters) {
-            PrioritizedFilterProcessor toRemove = filters.stream()
-                    .filter(f -> filterId.equals(f.getId()))
-                    .findFirst()
-                    .orElse(null);
-            Optional.ofNullable(toRemove)
-                    .ifPresent(f -> {
-                        if (filters.remove(f)) {
-                            LOG.debug("Removed subscription: {}", f);
-                        }
-                    });
+        synchronized (filterMap) {
+            Optional.ofNullable(filterMap.remove(filterId))
+                    .ifPresentOrElse(
+                            f -> LOG.debug("Removed subscription: {}", f),
+                            () -> LOG.debug("No subscription exists with ID: {}", filterId));
         }
     }
 
     /**
-     * Match the exchange against all {@link #filters} to determine if any of them are suitable to handle the exchange.
+     * Match the exchange against all {@link #filterMap} to determine if any of them are suitable to handle the
+     * exchange.
      *
      * @param  exchange the message exchange
      * @return          list of filters that match for the exchange; if "firstMatch" mode, it is a singleton list of
@@ -247,7 +236,7 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra
      */
     List<PrioritizedFilterProcessor> matchFilters(final Exchange exchange) {
         return Optional.of(
-                filters.stream()
+                filterMap.values().stream()
                         .filter(f -> f.matches(exchange))
                         .limit(MODE_FIRST_MATCH.equals(recipientMode) ? 1 : Integer.MAX_VALUE)
                         .collect(Collectors.toList()))
@@ -257,8 +246,8 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra
 
     /**
      * Processes the message exchange, where the caller supports having the exchange asynchronously processed. The
-     * exchange is matched against all {@link #filters} to determine if any of them are suitable to handle the exchange.
-     * When the first suitable filter is found, it processes the exchange.
+     * exchange is matched against all {@link #filterMap} to determine if any of them are suitable to handle the
+     * exchange. When the first suitable filter is found, it processes the exchange.
      * <p/>
      * If there was any failure in processing, then the caused {@link Exception} would be set on the {@link Exchange}.
      *
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessorTest.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessorTest.java
index d7e0ba15bd8..8ecf81f0dcd 100644
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessorTest.java
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessorTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.dynamicrouter;
 
+import java.util.List;
+
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.dynamicrouter.support.DynamicRouterTestSupport;
@@ -23,7 +25,7 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.MODE_FIRST_MATCH;
+import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.MODE_ALL_MATCH;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.lenient;
@@ -34,7 +36,7 @@ class DynamicRouterProcessorTest extends DynamicRouterTestSupport {
     @BeforeEach
     void localSetup() throws Exception {
         super.setup();
-        processor = new DynamicRouterProcessor(PROCESSOR_ID, context, MODE_FIRST_MATCH, false, () -> filterProcessorFactory);
+        processor = new DynamicRouterProcessor(PROCESSOR_ID, context, MODE_ALL_MATCH, false, () -> filterProcessorFactory);
         processor.doInit();
     }
 
@@ -59,6 +61,16 @@ class DynamicRouterProcessorTest extends DynamicRouterTestSupport {
         assertEquals(filterProcessor, result);
     }
 
+    @Test
+    void addMultipleFiltersWithSameId() {
+        processor.addFilter(filterProcessor);
+        processor.addFilter(filterProcessor);
+        processor.addFilter(filterProcessor);
+        processor.addFilter(filterProcessor);
+        List<PrioritizedFilterProcessor> matchingFilters = processor.matchFilters(exchange);
+        assertEquals(1, matchingFilters.size());
+    }
+
     @Test
     void removeFilter() {
         addFilterAsFilterProcessor();