You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/19 08:07:02 UTC
[camel] branch master updated: CAMEL-16449: Introduce Exchange in
ServiceFilter for filtering on exchange content (#5332)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new d37e3e1 CAMEL-16449: Introduce Exchange in ServiceFilter for filtering on exchange content (#5332)
d37e3e1 is described below
commit d37e3e14fc3a401d5c5263c63479cb5920d361f0
Author: Dietrich Schulten <ds...@escalon.de>
AuthorDate: Mon Apr 19 10:06:27 2021 +0200
CAMEL-16449: Introduce Exchange in ServiceFilter for filtering on exchange content (#5332)
---
components/camel-ribbon/pom.xml | 10 ++++
.../ribbon/cloud/RibbonServiceLoadBalancer.java | 15 ++++--
.../ribbon/cloud/RibbonServerListTest.java | 2 +
.../java/org/apache/camel/cloud/ServiceFilter.java | 8 +++-
.../apache/camel/cloud/ServiceLoadBalancer.java | 4 +-
.../camel/impl/cloud/BlacklistServiceFilter.java | 3 +-
.../camel/impl/cloud/CombinedServiceFilter.java | 5 +-
.../impl/cloud/DefaultServiceCallProcessor.java | 2 +-
.../camel/impl/cloud/DefaultServiceFilter.java | 3 +-
.../impl/cloud/DefaultServiceLoadBalancer.java | 5 +-
.../camel/impl/cloud/HealthyServiceFilter.java | 3 +-
.../camel/impl/cloud/PassThroughServiceFilter.java | 3 +-
.../impl/cloud/CombinedServiceFilterTest.java | 36 ++++++++++++--
.../apache/camel/impl/cloud/LoadBalancerTest.java | 47 +++++++++++++++---
.../impl/cloud/ServiceCallConfigurationTest.java | 7 ++-
.../docs/modules/eips/pages/serviceCall-eip.adoc | 56 +++++++++++++++++++---
...binedServiceCallServiceFilterConfiguration.java | 2 +-
.../ROOT/pages/camel-3x-upgrade-guide-3_10.adoc | 5 ++
parent/pom.xml | 1 +
19 files changed, 180 insertions(+), 37 deletions(-)
diff --git a/components/camel-ribbon/pom.xml b/components/camel-ribbon/pom.xml
index 1435391..d5ff555 100644
--- a/components/camel-ribbon/pom.xml
+++ b/components/camel-ribbon/pom.xml
@@ -45,6 +45,10 @@
<artifactId>camel-cloud</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-support</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.netflix.ribbon</groupId>
<artifactId>ribbon-core</artifactId>
<version>${ribbon-version}</version>
@@ -54,6 +58,12 @@
<artifactId>ribbon-loadbalancer</artifactId>
<version>${ribbon-version}</version>
</dependency>
+ <dependency>
+ <groupId>com.netflix.servo</groupId>
+ <artifactId>servo-core</artifactId>
+ <scope>provided</scope>
+ <version>${servo-version}</version>
+ </dependency>
<!-- testing -->
<dependency>
diff --git a/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/cloud/RibbonServiceLoadBalancer.java b/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/cloud/RibbonServiceLoadBalancer.java
index 3cfd56b..e2e1620 100644
--- a/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/cloud/RibbonServiceLoadBalancer.java
+++ b/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/cloud/RibbonServiceLoadBalancer.java
@@ -35,6 +35,7 @@ import com.netflix.loadbalancer.ServerList;
import com.netflix.loadbalancer.ZoneAwareLoadBalancer;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
import org.apache.camel.cloud.ServiceDefinition;
import org.apache.camel.cloud.ServiceDiscovery;
import org.apache.camel.cloud.ServiceDiscoveryAware;
@@ -43,6 +44,7 @@ import org.apache.camel.cloud.ServiceFilterAware;
import org.apache.camel.cloud.ServiceLoadBalancer;
import org.apache.camel.cloud.ServiceLoadBalancerFunction;
import org.apache.camel.component.ribbon.RibbonConfiguration;
+import org.apache.camel.support.DefaultExchange;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
@@ -128,7 +130,7 @@ public class RibbonServiceLoadBalancer
// ************************
@Override
- public <T> T process(String serviceName, ServiceLoadBalancerFunction<T> request) throws Exception {
+ public <T> T process(Exchange exchange, String serviceName, ServiceLoadBalancerFunction<T> request) throws Exception {
ILoadBalancer loadBalancer = loadBalancers.computeIfAbsent(serviceName, key -> createLoadBalancer(key));
Server server = loadBalancer.chooseServer(serviceName);
@@ -187,7 +189,7 @@ public class RibbonServiceLoadBalancer
config,
configuration.getRuleOrDefault(RoundRobinRule::new),
configuration.getPingOrDefault(DummyPing::new),
- new RibbonServerList(serviceName, serviceDiscovery, serviceFilter),
+ new RibbonServerList(camelContext, serviceName, serviceDiscovery, serviceFilter),
null,
new PollingServerListUpdater(config));
} else {
@@ -201,18 +203,21 @@ public class RibbonServiceLoadBalancer
private final String serviceName;
private final ServiceDiscovery serviceDiscovery;
private final ServiceFilter serviceFilter;
+ private final DefaultExchange dummyExchange;
- RibbonServerList(String serviceName, ServiceDiscovery serviceDiscovery, ServiceFilter serviceFilter) {
+ RibbonServerList(CamelContext camelContext, String serviceName, ServiceDiscovery serviceDiscovery,
+ ServiceFilter serviceFilter) {
this.serviceName = serviceName;
this.serviceDiscovery = serviceDiscovery;
this.serviceFilter = serviceFilter;
+ this.dummyExchange = new DefaultExchange(camelContext); // ServerList doesn't support current exchange
}
@Override
public List<RibbonServiceDefinition> getInitialListOfServers() {
List<ServiceDefinition> services = serviceDiscovery.getServices(serviceName);
if (serviceFilter != null) {
- services = serviceFilter.apply(services);
+ services = serviceFilter.apply(dummyExchange, services);
}
return asRibbonServerList(services);
@@ -222,7 +227,7 @@ public class RibbonServiceLoadBalancer
public List<RibbonServiceDefinition> getUpdatedListOfServers() {
List<ServiceDefinition> services = serviceDiscovery.getServices(serviceName);
if (serviceFilter != null) {
- services = serviceFilter.apply(services);
+ services = serviceFilter.apply(dummyExchange, services);
}
return asRibbonServerList(services);
diff --git a/components/camel-ribbon/src/test/java/org/apache/camel/component/ribbon/cloud/RibbonServerListTest.java b/components/camel-ribbon/src/test/java/org/apache/camel/component/ribbon/cloud/RibbonServerListTest.java
index f154584..0b39ad7 100644
--- a/components/camel-ribbon/src/test/java/org/apache/camel/component/ribbon/cloud/RibbonServerListTest.java
+++ b/components/camel-ribbon/src/test/java/org/apache/camel/component/ribbon/cloud/RibbonServerListTest.java
@@ -20,6 +20,7 @@ import com.netflix.loadbalancer.LoadBalancerBuilder;
import com.netflix.loadbalancer.RoundRobinRule;
import com.netflix.loadbalancer.Server;
import com.netflix.loadbalancer.ZoneAwareLoadBalancer;
+import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.cloud.PassThroughServiceFilter;
import org.apache.camel.impl.cloud.StaticServiceDiscovery;
import org.junit.jupiter.api.Test;
@@ -31,6 +32,7 @@ public class RibbonServerListTest {
public void testFixedServerList() throws Exception {
ZoneAwareLoadBalancer<RibbonServiceDefinition> lb = LoadBalancerBuilder.<RibbonServiceDefinition> newBuilder()
.withDynamicServerList(new RibbonServiceLoadBalancer.RibbonServerList(
+ new DefaultCamelContext(),
"unknown",
StaticServiceDiscovery.forServices(
new RibbonServiceDefinition("unknown", "localhost", 9090),
diff --git a/core/camel-api/src/main/java/org/apache/camel/cloud/ServiceFilter.java b/core/camel-api/src/main/java/org/apache/camel/cloud/ServiceFilter.java
index 8c79a4f..dafe09d 100644
--- a/core/camel-api/src/main/java/org/apache/camel/cloud/ServiceFilter.java
+++ b/core/camel-api/src/main/java/org/apache/camel/cloud/ServiceFilter.java
@@ -18,6 +18,8 @@ package org.apache.camel.cloud;
import java.util.List;
+import org.apache.camel.Exchange;
+
/**
* Allows SPIs to implement custom Service Filter.
*
@@ -27,10 +29,12 @@ import java.util.List;
public interface ServiceFilter {
/**
- * Chooses one of the service to use
+ * Chooses service candidates to use
*
+ * @param exchange for content-based filtering
* @param services list of services
* @return the chosen service to use.
*/
- List<ServiceDefinition> apply(List<ServiceDefinition> services);
+ List<ServiceDefinition> apply(Exchange exchange, List<ServiceDefinition> services);
+
}
diff --git a/core/camel-api/src/main/java/org/apache/camel/cloud/ServiceLoadBalancer.java b/core/camel-api/src/main/java/org/apache/camel/cloud/ServiceLoadBalancer.java
index 75f6cd6..28c029b 100644
--- a/core/camel-api/src/main/java/org/apache/camel/cloud/ServiceLoadBalancer.java
+++ b/core/camel-api/src/main/java/org/apache/camel/cloud/ServiceLoadBalancer.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.cloud;
+import org.apache.camel.Exchange;
+
/**
* Represents a Load Balancer.
*
@@ -25,5 +27,5 @@ package org.apache.camel.cloud;
*/
@FunctionalInterface
public interface ServiceLoadBalancer {
- <T> T process(String serviceName, ServiceLoadBalancerFunction<T> function) throws Exception;
+ <T> T process(Exchange exchange, String serviceName, ServiceLoadBalancerFunction<T> function) throws Exception;
}
diff --git a/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/BlacklistServiceFilter.java b/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/BlacklistServiceFilter.java
index 91ac209..08f8f63 100644
--- a/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/BlacklistServiceFilter.java
+++ b/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/BlacklistServiceFilter.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import org.apache.camel.Exchange;
import org.apache.camel.cloud.ServiceDefinition;
import org.apache.camel.cloud.ServiceFilter;
@@ -83,7 +84,7 @@ public class BlacklistServiceFilter implements ServiceFilter {
}
@Override
- public List<ServiceDefinition> apply(List<ServiceDefinition> services) {
+ public List<ServiceDefinition> apply(Exchange exchange, List<ServiceDefinition> services) {
return services.stream().filter(
s -> this.services.stream().noneMatch(b -> b.matches(s))).collect(
Collectors.toList());
diff --git a/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/CombinedServiceFilter.java b/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/CombinedServiceFilter.java
index 3468bc3..7697d6b 100644
--- a/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/CombinedServiceFilter.java
+++ b/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/CombinedServiceFilter.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import org.apache.camel.Exchange;
import org.apache.camel.cloud.ServiceDefinition;
import org.apache.camel.cloud.ServiceFilter;
@@ -38,9 +39,9 @@ public class CombinedServiceFilter implements ServiceFilter {
}
@Override
- public List<ServiceDefinition> apply(List<ServiceDefinition> services) {
+ public List<ServiceDefinition> apply(Exchange exchange, List<ServiceDefinition> services) {
for (int i = 0; i < delegatesSize; i++) {
- services = delegates.get(i).apply(services);
+ services = delegates.get(i).apply(exchange, services);
}
return services;
diff --git a/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/DefaultServiceCallProcessor.java b/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/DefaultServiceCallProcessor.java
index 637d5b3..c909347 100644
--- a/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/DefaultServiceCallProcessor.java
+++ b/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/DefaultServiceCallProcessor.java
@@ -184,7 +184,7 @@ public class DefaultServiceCallProcessor extends AsyncProcessorSupport {
message.setHeader(ServiceCallConstants.SERVICE_NAME, serviceName);
try {
- return loadBalancer.process(serviceName, server -> execute(server, exchange, callback));
+ return loadBalancer.process(exchange, serviceName, server -> execute(server, exchange, callback));
} catch (Exception e) {
exchange.setException(e);
callback.done(true);
diff --git a/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/DefaultServiceFilter.java b/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/DefaultServiceFilter.java
index 90c7ef5..100c626 100644
--- a/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/DefaultServiceFilter.java
+++ b/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/DefaultServiceFilter.java
@@ -18,13 +18,14 @@ package org.apache.camel.impl.cloud;
import java.util.List;
+import org.apache.camel.Exchange;
import org.apache.camel.cloud.ServiceDefinition;
import org.apache.camel.cloud.ServiceFilter;
public class DefaultServiceFilter implements ServiceFilter {
@Override
- public List<ServiceDefinition> apply(List<ServiceDefinition> services) {
+ public List<ServiceDefinition> apply(Exchange exchange, List<ServiceDefinition> services) {
return services;
}
}
diff --git a/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/DefaultServiceLoadBalancer.java b/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/DefaultServiceLoadBalancer.java
index 5af196d..eee9fac 100644
--- a/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/DefaultServiceLoadBalancer.java
+++ b/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/DefaultServiceLoadBalancer.java
@@ -21,6 +21,7 @@ import java.util.concurrent.RejectedExecutionException;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
import org.apache.camel.cloud.ServiceChooser;
import org.apache.camel.cloud.ServiceChooserAware;
import org.apache.camel.cloud.ServiceDefinition;
@@ -125,7 +126,7 @@ public class DefaultServiceLoadBalancer
// *************************************
@Override
- public <T> T process(String serviceName, ServiceLoadBalancerFunction<T> function) throws Exception {
+ public <T> T process(Exchange exchange, String serviceName, ServiceLoadBalancerFunction<T> function) throws Exception {
ServiceDefinition service;
List<ServiceDefinition> services = serviceDiscovery.getServices(serviceName);
@@ -133,7 +134,7 @@ public class DefaultServiceLoadBalancer
throw new RejectedExecutionException("No active services with name " + serviceName);
} else {
// filter services
- services = serviceFilter.apply(services);
+ services = serviceFilter.apply(exchange, services);
// let the client service chooser find which server to use
service = services.isEmpty() ? null : services.size() > 1 ? serviceChooser.choose(services) : services.get(0);
if (service == null) {
diff --git a/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/HealthyServiceFilter.java b/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/HealthyServiceFilter.java
index a3a1840..19c9c15 100644
--- a/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/HealthyServiceFilter.java
+++ b/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/HealthyServiceFilter.java
@@ -19,12 +19,13 @@ package org.apache.camel.impl.cloud;
import java.util.List;
import java.util.stream.Collectors;
+import org.apache.camel.Exchange;
import org.apache.camel.cloud.ServiceDefinition;
import org.apache.camel.cloud.ServiceFilter;
public class HealthyServiceFilter implements ServiceFilter {
@Override
- public List<ServiceDefinition> apply(List<ServiceDefinition> services) {
+ public List<ServiceDefinition> apply(Exchange exchange, List<ServiceDefinition> services) {
return services.stream().filter(s -> s.getHealth().isHealthy()).collect(Collectors.toList());
}
}
diff --git a/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/PassThroughServiceFilter.java b/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/PassThroughServiceFilter.java
index 75791fa..79d6b64 100644
--- a/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/PassThroughServiceFilter.java
+++ b/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/PassThroughServiceFilter.java
@@ -18,12 +18,13 @@ package org.apache.camel.impl.cloud;
import java.util.List;
+import org.apache.camel.Exchange;
import org.apache.camel.cloud.ServiceDefinition;
import org.apache.camel.cloud.ServiceFilter;
public class PassThroughServiceFilter implements ServiceFilter {
@Override
- public List<ServiceDefinition> apply(List<ServiceDefinition> services) {
+ public List<ServiceDefinition> apply(Exchange exchange, List<ServiceDefinition> services) {
return services;
}
}
diff --git a/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/CombinedServiceFilterTest.java b/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/CombinedServiceFilterTest.java
index 56ac312..44678cc 100644
--- a/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/CombinedServiceFilterTest.java
+++ b/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/CombinedServiceFilterTest.java
@@ -16,15 +16,17 @@
*/
package org.apache.camel.impl.cloud;
-import java.util.Arrays;
-import java.util.List;
+import java.util.*;
import java.util.stream.Collectors;
import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
import org.apache.camel.cloud.ServiceDefinition;
import org.apache.camel.model.cloud.CombinedServiceCallServiceFilterConfiguration;
+import org.apache.camel.support.DefaultExchange;
import org.junit.jupiter.api.Test;
+import static java.util.Optional.ofNullable;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -46,9 +48,10 @@ public class CombinedServiceFilterTest extends ContextTestSupport {
public void testMultiServiceFilter() throws Exception {
CombinedServiceCallServiceFilterConfiguration conf = new CombinedServiceCallServiceFilterConfiguration()
.healthy()
- .custom(services -> services.stream().filter(s -> s.getPort() < 2000).collect(Collectors.toList()));
+ .custom((exchange, services) -> services.stream().filter(s -> s.getPort() < 2000).collect(Collectors.toList()));
- List<ServiceDefinition> services = conf.newInstance(context).apply(Arrays.asList(
+ Exchange exchange = new DefaultExchange(context);
+ List<ServiceDefinition> services = conf.newInstance(context).apply(exchange, Arrays.asList(
new DefaultServiceDefinition("no-name", "127.0.0.1", 1000),
new DefaultServiceDefinition("no-name", "127.0.0.1", 1001, new DefaultServiceHealth(false)),
new DefaultServiceDefinition("no-name", "127.0.0.1", 1002, new DefaultServiceHealth(true)),
@@ -61,4 +64,29 @@ public class CombinedServiceFilterTest extends ContextTestSupport {
assertTrue(services.stream().anyMatch(s -> s.getPort() == 1000));
assertTrue(services.stream().anyMatch(s -> s.getPort() == 1002));
}
+
+ @Test
+ public void testContentBasedServiceFilterCombinedWithServiceFilter() throws Exception {
+ CombinedServiceCallServiceFilterConfiguration conf = new CombinedServiceCallServiceFilterConfiguration()
+ .healthy()
+ .custom((exchange, services) -> services.stream()
+ .filter(serviceDefinition -> ofNullable(serviceDefinition.getMetadata()
+ .get("supports"))
+ .orElse("")
+ .contains(exchange.getProperty("needs", String.class)))
+ .collect(Collectors.toList()));
+
+ Map<String, String> metadata = Collections.singletonMap("supports", "foo,bar");
+
+ Exchange exchange = new DefaultExchange(context);
+ exchange.setProperty("needs", "foo");
+
+ List<ServiceDefinition> services = conf.newInstance(context).apply(exchange, Arrays.asList(
+ new DefaultServiceDefinition("no-name", "127.0.0.1", 2001, metadata, new DefaultServiceHealth(true)),
+ new DefaultServiceDefinition("no-name", "127.0.0.1", 2002, metadata, new DefaultServiceHealth(false))));
+
+ assertEquals(1, services.size());
+ assertFalse(services.stream().anyMatch(s -> !s.getHealth().isHealthy()));
+ assertTrue(services.stream().anyMatch(s -> s.getPort() == 2001));
+ }
}
diff --git a/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/LoadBalancerTest.java b/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/LoadBalancerTest.java
index 6bde39b..6d4ebad 100644
--- a/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/LoadBalancerTest.java
+++ b/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/LoadBalancerTest.java
@@ -16,13 +16,18 @@
*/
package org.apache.camel.impl.cloud;
+import java.util.Collections;
import java.util.concurrent.RejectedExecutionException;
import java.util.stream.Collectors;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import static java.util.Optional.ofNullable;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -36,36 +41,64 @@ public class LoadBalancerTest {
serviceDiscovery.addServer("no-name@127.0.0.1:2002");
serviceDiscovery.addServer("no-name@127.0.0.1:1001");
serviceDiscovery.addServer("no-name@127.0.0.1:1002");
+ serviceDiscovery.addServer(
+ new DefaultServiceDefinition("no-name", "127.0.0.1", 1003, Collections.singletonMap("supports", "foo,bar")));
}
@Test
public void testLoadBalancer() throws Exception {
DefaultServiceLoadBalancer loadBalancer = new DefaultServiceLoadBalancer();
- loadBalancer.setCamelContext(new DefaultCamelContext());
+ CamelContext camelContext = new DefaultCamelContext();
+ loadBalancer.setCamelContext(camelContext);
loadBalancer.setServiceDiscovery(serviceDiscovery);
loadBalancer
- .setServiceFilter(services -> services.stream().filter(s -> s.getPort() < 2000).collect(Collectors.toList()));
+ .setServiceFilter(
+ (exchange, services) -> services.stream().filter(s -> s.getPort() < 2000).collect(Collectors.toList()));
loadBalancer.setServiceChooser(new RoundRobinServiceChooser());
- loadBalancer.process("no-name", service -> {
+ Exchange exchange = new DefaultExchange(camelContext);
+ loadBalancer.process(exchange, "no-name", service -> {
assertEquals(1001, service.getPort());
return false;
});
- loadBalancer.process("no-name", service -> {
+ loadBalancer.process(exchange, "no-name", service -> {
assertEquals(1002, service.getPort());
return false;
});
}
@Test
- public void testNoActiveServices() throws Exception {
+ public void testLoadBalancerWithContentBasedServiceFilter() throws Exception {
DefaultServiceLoadBalancer loadBalancer = new DefaultServiceLoadBalancer();
loadBalancer.setCamelContext(new DefaultCamelContext());
loadBalancer.setServiceDiscovery(serviceDiscovery);
+ loadBalancer.setServiceFilter(
+ (exchange, services) -> services.stream()
+ .filter(serviceDefinition -> ofNullable(serviceDefinition.getMetadata()
+ .get("supports"))
+ .orElse("")
+ .contains(exchange.getProperty("needs", String.class)))
+ .collect(Collectors.toList()));
+ loadBalancer.setServiceChooser(new RoundRobinServiceChooser());
+ Exchange exchange = new DefaultExchange(new DefaultCamelContext());
+ exchange.setProperty("needs", "foo");
+ loadBalancer.process(exchange, "no-name", service -> {
+ assertEquals(1003, service.getPort());
+ return false;
+ });
+ }
+
+ @Test
+ public void testNoActiveServices() throws Exception {
+ DefaultServiceLoadBalancer loadBalancer = new DefaultServiceLoadBalancer();
+ DefaultCamelContext camelContext = new DefaultCamelContext();
+ loadBalancer.setCamelContext(camelContext);
+ loadBalancer.setServiceDiscovery(serviceDiscovery);
loadBalancer
- .setServiceFilter(services -> services.stream().filter(s -> s.getPort() < 1000).collect(Collectors.toList()));
+ .setServiceFilter(
+ (exchange, services) -> services.stream().filter(s -> s.getPort() < 1000).collect(Collectors.toList()));
loadBalancer.setServiceChooser(new RoundRobinServiceChooser());
assertThrows(RejectedExecutionException.class, () -> {
- loadBalancer.process("no-name", service -> false);
+ loadBalancer.process(new DefaultExchange(camelContext), "no-name", service -> false);
});
}
}
diff --git a/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/ServiceCallConfigurationTest.java b/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/ServiceCallConfigurationTest.java
index d8283d4..d28fba4 100644
--- a/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/ServiceCallConfigurationTest.java
+++ b/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/ServiceCallConfigurationTest.java
@@ -20,6 +20,7 @@ import java.util.List;
import java.util.UUID;
import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Route;
import org.apache.camel.builder.RouteBuilder;
@@ -31,6 +32,7 @@ import org.apache.camel.model.cloud.ServiceCallConfigurationDefinition;
import org.apache.camel.model.cloud.ServiceCallDefinitionConstants;
import org.apache.camel.model.cloud.ServiceCallExpressionConfiguration;
import org.apache.camel.model.language.SimpleExpression;
+import org.apache.camel.support.DefaultExchange;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -372,11 +374,12 @@ public class ServiceCallConfigurationTest {
assertTrue(lb.getServiceDiscovery() instanceof StaticServiceDiscovery);
+ Exchange exchange = new DefaultExchange(context);
List<ServiceDefinition> services1 = lb.getServiceDiscovery().getServices("hello-service");
- assertEquals(2, filter.apply(services1).size());
+ assertEquals(2, filter.apply(exchange, services1).size());
List<ServiceDefinition> services2 = lb.getServiceDiscovery().getServices("hello-svc");
- assertEquals(1, filter.apply(services2).size());
+ assertEquals(1, filter.apply(exchange, services2).size());
} finally {
if (context != null) {
diff --git a/core/camel-core-engine/src/main/docs/modules/eips/pages/serviceCall-eip.adoc b/core/camel-core-engine/src/main/docs/modules/eips/pages/serviceCall-eip.adoc
index 19c1a73..3843794 100644
--- a/core/camel-core-engine/src/main/docs/modules/eips/pages/serviceCall-eip.adoc
+++ b/core/camel-core-engine/src/main/docs/modules/eips/pages/serviceCall-eip.adoc
@@ -379,7 +379,9 @@ And in XML
</camelContext>
----
-== Blacklist Service Filter
+== Service Filter
+
+=== Blacklist Service Filter
This service filter implementation removes the listed services from those found by the service discovery.
Each service should be provided in the following form:
@@ -438,14 +440,56 @@ And in XML
</camelContext>
----
+=== Custom Service Filter
+
+Service Filters choose suitable candidates from the service definitions found in the service discovery.
+
+As of Camel 3.10.0 they have access to the current exchange, which allows you to create service filters
+comparing service metadata with message content.
+
+Assuming you have labeled one of the services in your service discovery to support a certain type of requests:
+
+[source,java]
+----
+serviceDiscovery.addServer(new DefaultServiceDefinition("service", "127.0.0.1", 1003,
+ Collections.singletonMap("supports", "foo")));
+----
+
+The current exchange has a property which says that it needs a foo service:
+
+[source,java]
+----
+exchange.setProperty("needs", "foo")
+----
+
+You can then use a `ServiceFilter` to select the service instances which match the exchange:
+
+[source,java]
+----
+from("direct:start")
+ .serviceCall()
+ .name("service")
+ .serviceFilter((exchange, services) -> services.stream()
+ .filter(serviceDefinition -> Optional.ofNullable(serviceDefinition.getMetadata()
+ .get("supports"))
+ .orElse("")
+ .equals(exchange.getProperty("needs", String.class)))
+ .collect(Collectors.toList()));
+ .end()
+ .to("mock:result");
+----
+
== Load Balancer
-The Service Call EIP comes with its own Load Balancer which is instantiated by default if a custom is not configured and
-glues Service Discovery, Service Filer, Service Chooser and Service Expression together to load balance requests among the available services.
+The Service Call EIP comes with its own loadbalancer which is instantiated by default if a custom loadbalancer is not configured. It glues Service Discovery, Service Filter, Service Chooser and Service Expression together to load balance requests among the available services.
-If you need a more sophisticate load balancer you can use Ribbon by adding camel-ribbon to the mix,
+If you need a more sophisticated load balancer you can use Ribbon by adding camel-ribbon to the mix,
maven users will need to add the following dependency to their pom.xml
+NOTE: The `RibbonServiceLoadBalancer` has no concept of a current `Exchange`.
+Service filters therefore receive a dummy exchange when used with Ribbon.
+
+
[source,xml]
----
<dependency>
@@ -491,7 +535,7 @@ And in XML
</camelContext>
----
-You can configure Ribbon key programmatic using `RibbonConfiguration`:
+You can configure Ribbon key programmatically using `RibbonConfiguration`:
[source,java]
----
@@ -540,7 +584,7 @@ globalConf.setServiceChooser(
ServiceCallConfigurationDefinition httpsConf = new ServiceCallConfigurationDefinition();
httpsConf.setServiceFilter(
- list -> list.stream().filter(s -> s.getPort() == 443).collect(toList())
+ list -> list.stream().filter((exchange, s) -> s.getPort() == 443).collect(toList())
);
getContext().setServiceCallConfiguration(globalConf);
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/cloud/CombinedServiceCallServiceFilterConfiguration.java b/core/camel-core-model/src/main/java/org/apache/camel/model/cloud/CombinedServiceCallServiceFilterConfiguration.java
index 219d527..b2b9f7b 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/cloud/CombinedServiceCallServiceFilterConfiguration.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/cloud/CombinedServiceCallServiceFilterConfiguration.java
@@ -59,7 +59,7 @@ public class CombinedServiceCallServiceFilterConfiguration extends ServiceCallSe
/**
* List of ServiceFilter configuration to use
- *
+ *
* @param serviceFilterConfigurations
*/
public void setServiceFilterConfigurations(List<ServiceCallServiceFilterConfiguration> serviceFilterConfigurations) {
diff --git a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_10.adoc b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_10.adoc
index cbfe409..12fb0b3 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_10.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_10.adoc
@@ -10,6 +10,11 @@ from both 3.0 to 3.1 and 3.1 to 3.2.
The method `concurrentTasks` on `org.apache.camel.support.DefaultScheduledPollConsumerScheduler ` has been renamed to `concurrentConsumers`.
+The `org.apache.camel.cloud.ServiceFilter` and `org.apache.camel.cloud.ServiceLoadBalancer`
+functional interface methods take the current `Exchange` as additional parameter
+to allow for content-based filtering of service candidates. `RibbonServiceLoadBalancer`
+has no notion of a current exchange, service filters therefore receive a dummy exchange when used with Ribbon.
+
=== camel-scheduler
The option `concurrentTasks` has been renamed to `poolSize` to better reflect its purpose.
diff --git a/parent/pom.xml b/parent/pom.xml
index e1ba182..870b4bb 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -480,6 +480,7 @@
<rhino-version>1.7.7.1</rhino-version>
<rhino-js-version>1.7R2</rhino-js-version>
<ribbon-version>2.3.0</ribbon-version>
+ <servo-version>0.10.1</servo-version>
<roaster-version>2.22.0.Final</roaster-version>
<robotframework-version>3.2.2</robotframework-version>
<rome-version>1.15.0</rome-version>