You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/10/17 12:43:57 UTC
[camel] 01/02: CAMEL-18549: Changed filter list in the DynamicRouterProcessor to a m… (#8431)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 9c7908132ac73e6b2ed097c6d55b705524dd3c88
Author: Steve Storck <st...@gmail.com>
AuthorDate: Mon Sep 26 04:37:35 2022 -0400
CAMEL-18549: Changed filter list in the DynamicRouterProcessor to a m… (#8431)
* CAMEL-18549: Changed filter list in the DynamicRouterProcessor to a map, where filters are mapped by filter ID.
* CAMEL-18549: Fixed checkstyle error for import out of order.
* CAMEL-18549: Attempting to fix build errors.
---
.../dynamicrouter/DynamicRouterProcessor.java | 47 +++++++++-------------
.../dynamicrouter/DynamicRouterProcessorTest.java | 16 +++++++-
2 files changed, 32 insertions(+), 31 deletions(-)
diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessor.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessor.java
index 5509a2f30d3..c82e201d4ad 100644
--- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessor.java
+++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessor.java
@@ -16,10 +16,7 @@
*/
package org.apache.camel.component.dynamicrouter;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
+import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -65,9 +62,10 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra
private static final String LOG_ENDPOINT = "log:%s.%s?level=%s&showAll=true&multiline=true";
/**
- * {@link FilterProcessor}s to determine if the incoming exchange should be routed, based on the content.
+ * {@link FilterProcessor}s, mapped by subscription ID, to determine if the incoming exchange should be routed based
+ * on the content.
*/
- private final ArrayList<PrioritizedFilterProcessor> filters;
+ private final TreeMap<String, PrioritizedFilterProcessor> filterMap;
/**
* The camel context.
@@ -130,7 +128,7 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra
final boolean warnDroppedMessage,
final Supplier<PrioritizedFilterProcessorFactory> filterProcessorFactorySupplier) {
this.id = id;
- this.filters = new ArrayList<>();
+ this.filterMap = new TreeMap<>();
this.camelContext = camelContext;
this.recipientMode = recipientMode;
this.producerTemplate = camelContext.createProducerTemplate();
@@ -196,10 +194,9 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra
* @param filter the filter to add
*/
public void addFilter(final PrioritizedFilterProcessor filter) {
- synchronized (filters) {
+ synchronized (filterMap) {
if (filter != null) {
- filters.add(filter);
- filters.sort(PrioritizedFilterProcessor.COMPARATOR);
+ filterMap.put(filter.getId(), filter);
LOG.debug("Added subscription: {}", filter);
}
}
@@ -212,10 +209,7 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra
* @return the filter with the supplied ID, or null
*/
public PrioritizedFilterProcessor getFilter(final String filterId) {
- return filters.stream()
- .filter(f -> filterId.equals(f.getId()))
- .findFirst()
- .orElse(null);
+ return filterMap.get(filterId);
}
/**
@@ -224,22 +218,17 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra
* @param filterId the ID of the filter to remove
*/
public void removeFilter(final String filterId) {
- synchronized (filters) {
- PrioritizedFilterProcessor toRemove = filters.stream()
- .filter(f -> filterId.equals(f.getId()))
- .findFirst()
- .orElse(null);
- Optional.ofNullable(toRemove)
- .ifPresent(f -> {
- if (filters.remove(f)) {
- LOG.debug("Removed subscription: {}", f);
- }
- });
+ synchronized (filterMap) {
+ Optional.ofNullable(filterMap.remove(filterId))
+ .ifPresentOrElse(
+ f -> LOG.debug("Removed subscription: {}", f),
+ () -> LOG.debug("No subscription exists with ID: {}", filterId));
}
}
/**
- * Match the exchange against all {@link #filters} to determine if any of them are suitable to handle the exchange.
+ * Match the exchange against all {@link #filterMap} to determine if any of them are suitable to handle the
+ * exchange.
*
* @param exchange the message exchange
* @return list of filters that match for the exchange; if "firstMatch" mode, it is a singleton list of
@@ -247,7 +236,7 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra
*/
List<PrioritizedFilterProcessor> matchFilters(final Exchange exchange) {
return Optional.of(
- filters.stream()
+ filterMap.values().stream()
.filter(f -> f.matches(exchange))
.limit(MODE_FIRST_MATCH.equals(recipientMode) ? 1 : Integer.MAX_VALUE)
.collect(Collectors.toList()))
@@ -257,8 +246,8 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra
/**
* Processes the message exchange, where the caller supports having the exchange asynchronously processed. The
- * exchange is matched against all {@link #filters} to determine if any of them are suitable to handle the exchange.
- * When the first suitable filter is found, it processes the exchange.
+ * exchange is matched against all {@link #filterMap} to determine if any of them are suitable to handle the
+ * exchange. When the first suitable filter is found, it processes the exchange.
* <p/>
* If there was any failure in processing, then the caused {@link Exception} would be set on the {@link Exchange}.
*
diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessorTest.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessorTest.java
index d7e0ba15bd8..8ecf81f0dcd 100644
--- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessorTest.java
+++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessorTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.dynamicrouter;
+import java.util.List;
+
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.component.dynamicrouter.support.DynamicRouterTestSupport;
@@ -23,7 +25,7 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.MODE_FIRST_MATCH;
+import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.MODE_ALL_MATCH;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.lenient;
@@ -34,7 +36,7 @@ class DynamicRouterProcessorTest extends DynamicRouterTestSupport {
@BeforeEach
void localSetup() throws Exception {
super.setup();
- processor = new DynamicRouterProcessor(PROCESSOR_ID, context, MODE_FIRST_MATCH, false, () -> filterProcessorFactory);
+ processor = new DynamicRouterProcessor(PROCESSOR_ID, context, MODE_ALL_MATCH, false, () -> filterProcessorFactory);
processor.doInit();
}
@@ -59,6 +61,16 @@ class DynamicRouterProcessorTest extends DynamicRouterTestSupport {
assertEquals(filterProcessor, result);
}
+ @Test
+ void addMultipleFiltersWithSameId() {
+ processor.addFilter(filterProcessor);
+ processor.addFilter(filterProcessor);
+ processor.addFilter(filterProcessor);
+ processor.addFilter(filterProcessor);
+ List<PrioritizedFilterProcessor> matchingFilters = processor.matchFilters(exchange);
+ assertEquals(1, matchingFilters.size());
+ }
+
@Test
void removeFilter() {
addFilterAsFilterProcessor();