You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/19 08:07:02 UTC

[camel] branch master updated: CAMEL-16449: Introduce Exchange in ServiceFilter for filtering on exchange content (#5332)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new d37e3e1  CAMEL-16449: Introduce Exchange in ServiceFilter for filtering on exchange content (#5332)
d37e3e1 is described below

commit d37e3e14fc3a401d5c5263c63479cb5920d361f0
Author: Dietrich Schulten <ds...@escalon.de>
AuthorDate: Mon Apr 19 10:06:27 2021 +0200

    CAMEL-16449: Introduce Exchange in ServiceFilter for filtering on exchange content (#5332)
---
 components/camel-ribbon/pom.xml                    | 10 ++++
 .../ribbon/cloud/RibbonServiceLoadBalancer.java    | 15 ++++--
 .../ribbon/cloud/RibbonServerListTest.java         |  2 +
 .../java/org/apache/camel/cloud/ServiceFilter.java |  8 +++-
 .../apache/camel/cloud/ServiceLoadBalancer.java    |  4 +-
 .../camel/impl/cloud/BlacklistServiceFilter.java   |  3 +-
 .../camel/impl/cloud/CombinedServiceFilter.java    |  5 +-
 .../impl/cloud/DefaultServiceCallProcessor.java    |  2 +-
 .../camel/impl/cloud/DefaultServiceFilter.java     |  3 +-
 .../impl/cloud/DefaultServiceLoadBalancer.java     |  5 +-
 .../camel/impl/cloud/HealthyServiceFilter.java     |  3 +-
 .../camel/impl/cloud/PassThroughServiceFilter.java |  3 +-
 .../impl/cloud/CombinedServiceFilterTest.java      | 36 ++++++++++++--
 .../apache/camel/impl/cloud/LoadBalancerTest.java  | 47 +++++++++++++++---
 .../impl/cloud/ServiceCallConfigurationTest.java   |  7 ++-
 .../docs/modules/eips/pages/serviceCall-eip.adoc   | 56 +++++++++++++++++++---
 ...binedServiceCallServiceFilterConfiguration.java |  2 +-
 .../ROOT/pages/camel-3x-upgrade-guide-3_10.adoc    |  5 ++
 parent/pom.xml                                     |  1 +
 19 files changed, 180 insertions(+), 37 deletions(-)

diff --git a/components/camel-ribbon/pom.xml b/components/camel-ribbon/pom.xml
index 1435391..d5ff555 100644
--- a/components/camel-ribbon/pom.xml
+++ b/components/camel-ribbon/pom.xml
@@ -45,6 +45,10 @@
             <artifactId>camel-cloud</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-support</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.netflix.ribbon</groupId>
             <artifactId>ribbon-core</artifactId>
             <version>${ribbon-version}</version>
@@ -54,6 +58,12 @@
             <artifactId>ribbon-loadbalancer</artifactId>
             <version>${ribbon-version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.netflix.servo</groupId>
+            <artifactId>servo-core</artifactId>
+            <scope>provided</scope>
+            <version>${servo-version}</version>
+        </dependency>
 
         <!-- testing -->
         <dependency>
diff --git a/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/cloud/RibbonServiceLoadBalancer.java b/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/cloud/RibbonServiceLoadBalancer.java
index 3cfd56b..e2e1620 100644
--- a/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/cloud/RibbonServiceLoadBalancer.java
+++ b/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/cloud/RibbonServiceLoadBalancer.java
@@ -35,6 +35,7 @@ import com.netflix.loadbalancer.ServerList;
 import com.netflix.loadbalancer.ZoneAwareLoadBalancer;
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
 import org.apache.camel.cloud.ServiceDefinition;
 import org.apache.camel.cloud.ServiceDiscovery;
 import org.apache.camel.cloud.ServiceDiscoveryAware;
@@ -43,6 +44,7 @@ import org.apache.camel.cloud.ServiceFilterAware;
 import org.apache.camel.cloud.ServiceLoadBalancer;
 import org.apache.camel.cloud.ServiceLoadBalancerFunction;
 import org.apache.camel.component.ribbon.RibbonConfiguration;
+import org.apache.camel.support.DefaultExchange;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.support.service.ServiceSupport;
 import org.apache.camel.util.ObjectHelper;
@@ -128,7 +130,7 @@ public class RibbonServiceLoadBalancer
     // ************************
 
     @Override
-    public <T> T process(String serviceName, ServiceLoadBalancerFunction<T> request) throws Exception {
+    public <T> T process(Exchange exchange, String serviceName, ServiceLoadBalancerFunction<T> request) throws Exception {
         ILoadBalancer loadBalancer = loadBalancers.computeIfAbsent(serviceName, key -> createLoadBalancer(key));
         Server server = loadBalancer.chooseServer(serviceName);
 
@@ -187,7 +189,7 @@ public class RibbonServiceLoadBalancer
                     config,
                     configuration.getRuleOrDefault(RoundRobinRule::new),
                     configuration.getPingOrDefault(DummyPing::new),
-                    new RibbonServerList(serviceName, serviceDiscovery, serviceFilter),
+                    new RibbonServerList(camelContext, serviceName, serviceDiscovery, serviceFilter),
                     null,
                     new PollingServerListUpdater(config));
         } else {
@@ -201,18 +203,21 @@ public class RibbonServiceLoadBalancer
         private final String serviceName;
         private final ServiceDiscovery serviceDiscovery;
         private final ServiceFilter serviceFilter;
+        private final DefaultExchange dummyExchange;
 
-        RibbonServerList(String serviceName, ServiceDiscovery serviceDiscovery, ServiceFilter serviceFilter) {
+        RibbonServerList(CamelContext camelContext, String serviceName, ServiceDiscovery serviceDiscovery,
+                         ServiceFilter serviceFilter) {
             this.serviceName = serviceName;
             this.serviceDiscovery = serviceDiscovery;
             this.serviceFilter = serviceFilter;
+            this.dummyExchange = new DefaultExchange(camelContext); // ServerList doesn't support current exchange
         }
 
         @Override
         public List<RibbonServiceDefinition> getInitialListOfServers() {
             List<ServiceDefinition> services = serviceDiscovery.getServices(serviceName);
             if (serviceFilter != null) {
-                services = serviceFilter.apply(services);
+                services = serviceFilter.apply(dummyExchange, services);
             }
 
             return asRibbonServerList(services);
@@ -222,7 +227,7 @@ public class RibbonServiceLoadBalancer
         public List<RibbonServiceDefinition> getUpdatedListOfServers() {
             List<ServiceDefinition> services = serviceDiscovery.getServices(serviceName);
             if (serviceFilter != null) {
-                services = serviceFilter.apply(services);
+                services = serviceFilter.apply(dummyExchange, services);
             }
 
             return asRibbonServerList(services);
diff --git a/components/camel-ribbon/src/test/java/org/apache/camel/component/ribbon/cloud/RibbonServerListTest.java b/components/camel-ribbon/src/test/java/org/apache/camel/component/ribbon/cloud/RibbonServerListTest.java
index f154584..0b39ad7 100644
--- a/components/camel-ribbon/src/test/java/org/apache/camel/component/ribbon/cloud/RibbonServerListTest.java
+++ b/components/camel-ribbon/src/test/java/org/apache/camel/component/ribbon/cloud/RibbonServerListTest.java
@@ -20,6 +20,7 @@ import com.netflix.loadbalancer.LoadBalancerBuilder;
 import com.netflix.loadbalancer.RoundRobinRule;
 import com.netflix.loadbalancer.Server;
 import com.netflix.loadbalancer.ZoneAwareLoadBalancer;
+import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.impl.cloud.PassThroughServiceFilter;
 import org.apache.camel.impl.cloud.StaticServiceDiscovery;
 import org.junit.jupiter.api.Test;
@@ -31,6 +32,7 @@ public class RibbonServerListTest {
     public void testFixedServerList() throws Exception {
         ZoneAwareLoadBalancer<RibbonServiceDefinition> lb = LoadBalancerBuilder.<RibbonServiceDefinition> newBuilder()
                 .withDynamicServerList(new RibbonServiceLoadBalancer.RibbonServerList(
+                        new DefaultCamelContext(),
                         "unknown",
                         StaticServiceDiscovery.forServices(
                                 new RibbonServiceDefinition("unknown", "localhost", 9090),
diff --git a/core/camel-api/src/main/java/org/apache/camel/cloud/ServiceFilter.java b/core/camel-api/src/main/java/org/apache/camel/cloud/ServiceFilter.java
index 8c79a4f..dafe09d 100644
--- a/core/camel-api/src/main/java/org/apache/camel/cloud/ServiceFilter.java
+++ b/core/camel-api/src/main/java/org/apache/camel/cloud/ServiceFilter.java
@@ -18,6 +18,8 @@ package org.apache.camel.cloud;
 
 import java.util.List;
 
+import org.apache.camel.Exchange;
+
 /**
  * Allows SPIs to implement custom Service Filter.
  *
@@ -27,10 +29,12 @@ import java.util.List;
 public interface ServiceFilter {
 
     /**
-     * Chooses one of the service to use
+     * Chooses service candidates to use
      *
+     * @param  exchange for content-based filtering
      * @param  services list of services
      * @return          the chosen service to use.
      */
-    List<ServiceDefinition> apply(List<ServiceDefinition> services);
+    List<ServiceDefinition> apply(Exchange exchange, List<ServiceDefinition> services);
+
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/cloud/ServiceLoadBalancer.java b/core/camel-api/src/main/java/org/apache/camel/cloud/ServiceLoadBalancer.java
index 75f6cd6..28c029b 100644
--- a/core/camel-api/src/main/java/org/apache/camel/cloud/ServiceLoadBalancer.java
+++ b/core/camel-api/src/main/java/org/apache/camel/cloud/ServiceLoadBalancer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.cloud;
 
+import org.apache.camel.Exchange;
+
 /**
  * Represents a Load Balancer.
  *
@@ -25,5 +27,5 @@ package org.apache.camel.cloud;
  */
 @FunctionalInterface
 public interface ServiceLoadBalancer {
-    <T> T process(String serviceName, ServiceLoadBalancerFunction<T> function) throws Exception;
+    <T> T process(Exchange exchange, String serviceName, ServiceLoadBalancerFunction<T> function) throws Exception;
 }
diff --git a/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/BlacklistServiceFilter.java b/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/BlacklistServiceFilter.java
index 91ac209..08f8f63 100644
--- a/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/BlacklistServiceFilter.java
+++ b/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/BlacklistServiceFilter.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.cloud.ServiceDefinition;
 import org.apache.camel.cloud.ServiceFilter;
 
@@ -83,7 +84,7 @@ public class BlacklistServiceFilter implements ServiceFilter {
     }
 
     @Override
-    public List<ServiceDefinition> apply(List<ServiceDefinition> services) {
+    public List<ServiceDefinition> apply(Exchange exchange, List<ServiceDefinition> services) {
         return services.stream().filter(
                 s -> this.services.stream().noneMatch(b -> b.matches(s))).collect(
                         Collectors.toList());
diff --git a/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/CombinedServiceFilter.java b/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/CombinedServiceFilter.java
index 3468bc3..7697d6b 100644
--- a/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/CombinedServiceFilter.java
+++ b/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/CombinedServiceFilter.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.cloud.ServiceDefinition;
 import org.apache.camel.cloud.ServiceFilter;
 
@@ -38,9 +39,9 @@ public class CombinedServiceFilter implements ServiceFilter {
     }
 
     @Override
-    public List<ServiceDefinition> apply(List<ServiceDefinition> services) {
+    public List<ServiceDefinition> apply(Exchange exchange, List<ServiceDefinition> services) {
         for (int i = 0; i < delegatesSize; i++) {
-            services = delegates.get(i).apply(services);
+            services = delegates.get(i).apply(exchange, services);
         }
 
         return services;
diff --git a/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/DefaultServiceCallProcessor.java b/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/DefaultServiceCallProcessor.java
index 637d5b3..c909347 100644
--- a/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/DefaultServiceCallProcessor.java
+++ b/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/DefaultServiceCallProcessor.java
@@ -184,7 +184,7 @@ public class DefaultServiceCallProcessor extends AsyncProcessorSupport {
         message.setHeader(ServiceCallConstants.SERVICE_NAME, serviceName);
 
         try {
-            return loadBalancer.process(serviceName, server -> execute(server, exchange, callback));
+            return loadBalancer.process(exchange, serviceName, server -> execute(server, exchange, callback));
         } catch (Exception e) {
             exchange.setException(e);
             callback.done(true);
diff --git a/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/DefaultServiceFilter.java b/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/DefaultServiceFilter.java
index 90c7ef5..100c626 100644
--- a/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/DefaultServiceFilter.java
+++ b/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/DefaultServiceFilter.java
@@ -18,13 +18,14 @@ package org.apache.camel.impl.cloud;
 
 import java.util.List;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.cloud.ServiceDefinition;
 import org.apache.camel.cloud.ServiceFilter;
 
 public class DefaultServiceFilter implements ServiceFilter {
 
     @Override
-    public List<ServiceDefinition> apply(List<ServiceDefinition> services) {
+    public List<ServiceDefinition> apply(Exchange exchange, List<ServiceDefinition> services) {
         return services;
     }
 }
diff --git a/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/DefaultServiceLoadBalancer.java b/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/DefaultServiceLoadBalancer.java
index 5af196d..eee9fac 100644
--- a/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/DefaultServiceLoadBalancer.java
+++ b/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/DefaultServiceLoadBalancer.java
@@ -21,6 +21,7 @@ import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
 import org.apache.camel.cloud.ServiceChooser;
 import org.apache.camel.cloud.ServiceChooserAware;
 import org.apache.camel.cloud.ServiceDefinition;
@@ -125,7 +126,7 @@ public class DefaultServiceLoadBalancer
     // *************************************
 
     @Override
-    public <T> T process(String serviceName, ServiceLoadBalancerFunction<T> function) throws Exception {
+    public <T> T process(Exchange exchange, String serviceName, ServiceLoadBalancerFunction<T> function) throws Exception {
         ServiceDefinition service;
 
         List<ServiceDefinition> services = serviceDiscovery.getServices(serviceName);
@@ -133,7 +134,7 @@ public class DefaultServiceLoadBalancer
             throw new RejectedExecutionException("No active services with name " + serviceName);
         } else {
             // filter services
-            services = serviceFilter.apply(services);
+            services = serviceFilter.apply(exchange, services);
             // let the client service chooser find which server to use
             service = services.isEmpty() ? null : services.size() > 1 ? serviceChooser.choose(services) : services.get(0);
             if (service == null) {
diff --git a/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/HealthyServiceFilter.java b/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/HealthyServiceFilter.java
index a3a1840..19c9c15 100644
--- a/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/HealthyServiceFilter.java
+++ b/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/HealthyServiceFilter.java
@@ -19,12 +19,13 @@ package org.apache.camel.impl.cloud;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.cloud.ServiceDefinition;
 import org.apache.camel.cloud.ServiceFilter;
 
 public class HealthyServiceFilter implements ServiceFilter {
     @Override
-    public List<ServiceDefinition> apply(List<ServiceDefinition> services) {
+    public List<ServiceDefinition> apply(Exchange exchange, List<ServiceDefinition> services) {
         return services.stream().filter(s -> s.getHealth().isHealthy()).collect(Collectors.toList());
     }
 }
diff --git a/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/PassThroughServiceFilter.java b/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/PassThroughServiceFilter.java
index 75791fa..79d6b64 100644
--- a/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/PassThroughServiceFilter.java
+++ b/core/camel-cloud/src/main/java/org/apache/camel/impl/cloud/PassThroughServiceFilter.java
@@ -18,12 +18,13 @@ package org.apache.camel.impl.cloud;
 
 import java.util.List;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.cloud.ServiceDefinition;
 import org.apache.camel.cloud.ServiceFilter;
 
 public class PassThroughServiceFilter implements ServiceFilter {
     @Override
-    public List<ServiceDefinition> apply(List<ServiceDefinition> services) {
+    public List<ServiceDefinition> apply(Exchange exchange, List<ServiceDefinition> services) {
         return services;
     }
 }
diff --git a/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/CombinedServiceFilterTest.java b/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/CombinedServiceFilterTest.java
index 56ac312..44678cc 100644
--- a/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/CombinedServiceFilterTest.java
+++ b/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/CombinedServiceFilterTest.java
@@ -16,15 +16,17 @@
  */
 package org.apache.camel.impl.cloud;
 
-import java.util.Arrays;
-import java.util.List;
+import java.util.*;
 import java.util.stream.Collectors;
 
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
 import org.apache.camel.cloud.ServiceDefinition;
 import org.apache.camel.model.cloud.CombinedServiceCallServiceFilterConfiguration;
+import org.apache.camel.support.DefaultExchange;
 import org.junit.jupiter.api.Test;
 
+import static java.util.Optional.ofNullable;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -46,9 +48,10 @@ public class CombinedServiceFilterTest extends ContextTestSupport {
     public void testMultiServiceFilter() throws Exception {
         CombinedServiceCallServiceFilterConfiguration conf = new CombinedServiceCallServiceFilterConfiguration()
                 .healthy()
-                .custom(services -> services.stream().filter(s -> s.getPort() < 2000).collect(Collectors.toList()));
+                .custom((exchange, services) -> services.stream().filter(s -> s.getPort() < 2000).collect(Collectors.toList()));
 
-        List<ServiceDefinition> services = conf.newInstance(context).apply(Arrays.asList(
+        Exchange exchange = new DefaultExchange(context);
+        List<ServiceDefinition> services = conf.newInstance(context).apply(exchange, Arrays.asList(
                 new DefaultServiceDefinition("no-name", "127.0.0.1", 1000),
                 new DefaultServiceDefinition("no-name", "127.0.0.1", 1001, new DefaultServiceHealth(false)),
                 new DefaultServiceDefinition("no-name", "127.0.0.1", 1002, new DefaultServiceHealth(true)),
@@ -61,4 +64,29 @@ public class CombinedServiceFilterTest extends ContextTestSupport {
         assertTrue(services.stream().anyMatch(s -> s.getPort() == 1000));
         assertTrue(services.stream().anyMatch(s -> s.getPort() == 1002));
     }
+
+    @Test
+    public void testContentBasedServiceFilterCombinedWithServiceFilter() throws Exception {
+        CombinedServiceCallServiceFilterConfiguration conf = new CombinedServiceCallServiceFilterConfiguration()
+                .healthy()
+                .custom((exchange, services) -> services.stream()
+                        .filter(serviceDefinition -> ofNullable(serviceDefinition.getMetadata()
+                                .get("supports"))
+                                        .orElse("")
+                                        .contains(exchange.getProperty("needs", String.class)))
+                        .collect(Collectors.toList()));
+
+        Map<String, String> metadata = Collections.singletonMap("supports", "foo,bar");
+
+        Exchange exchange = new DefaultExchange(context);
+        exchange.setProperty("needs", "foo");
+
+        List<ServiceDefinition> services = conf.newInstance(context).apply(exchange, Arrays.asList(
+                new DefaultServiceDefinition("no-name", "127.0.0.1", 2001, metadata, new DefaultServiceHealth(true)),
+                new DefaultServiceDefinition("no-name", "127.0.0.1", 2002, metadata, new DefaultServiceHealth(false))));
+
+        assertEquals(1, services.size());
+        assertFalse(services.stream().anyMatch(s -> !s.getHealth().isHealthy()));
+        assertTrue(services.stream().anyMatch(s -> s.getPort() == 2001));
+    }
 }
diff --git a/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/LoadBalancerTest.java b/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/LoadBalancerTest.java
index 6bde39b..6d4ebad 100644
--- a/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/LoadBalancerTest.java
+++ b/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/LoadBalancerTest.java
@@ -16,13 +16,18 @@
  */
 package org.apache.camel.impl.cloud;
 
+import java.util.Collections;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.stream.Collectors;
 
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
+import static java.util.Optional.ofNullable;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
@@ -36,36 +41,64 @@ public class LoadBalancerTest {
         serviceDiscovery.addServer("no-name@127.0.0.1:2002");
         serviceDiscovery.addServer("no-name@127.0.0.1:1001");
         serviceDiscovery.addServer("no-name@127.0.0.1:1002");
+        serviceDiscovery.addServer(
+                new DefaultServiceDefinition("no-name", "127.0.0.1", 1003, Collections.singletonMap("supports", "foo,bar")));
     }
 
     @Test
     public void testLoadBalancer() throws Exception {
         DefaultServiceLoadBalancer loadBalancer = new DefaultServiceLoadBalancer();
-        loadBalancer.setCamelContext(new DefaultCamelContext());
+        CamelContext camelContext = new DefaultCamelContext();
+        loadBalancer.setCamelContext(camelContext);
         loadBalancer.setServiceDiscovery(serviceDiscovery);
         loadBalancer
-                .setServiceFilter(services -> services.stream().filter(s -> s.getPort() < 2000).collect(Collectors.toList()));
+                .setServiceFilter(
+                        (exchange, services) -> services.stream().filter(s -> s.getPort() < 2000).collect(Collectors.toList()));
         loadBalancer.setServiceChooser(new RoundRobinServiceChooser());
-        loadBalancer.process("no-name", service -> {
+        Exchange exchange = new DefaultExchange(camelContext);
+        loadBalancer.process(exchange, "no-name", service -> {
             assertEquals(1001, service.getPort());
             return false;
         });
-        loadBalancer.process("no-name", service -> {
+        loadBalancer.process(exchange, "no-name", service -> {
             assertEquals(1002, service.getPort());
             return false;
         });
     }
 
     @Test
-    public void testNoActiveServices() throws Exception {
+    public void testLoadBalancerWithContentBasedServiceFilter() throws Exception {
         DefaultServiceLoadBalancer loadBalancer = new DefaultServiceLoadBalancer();
         loadBalancer.setCamelContext(new DefaultCamelContext());
         loadBalancer.setServiceDiscovery(serviceDiscovery);
+        loadBalancer.setServiceFilter(
+                (exchange, services) -> services.stream()
+                        .filter(serviceDefinition -> ofNullable(serviceDefinition.getMetadata()
+                                .get("supports"))
+                                        .orElse("")
+                                        .contains(exchange.getProperty("needs", String.class)))
+                        .collect(Collectors.toList()));
+        loadBalancer.setServiceChooser(new RoundRobinServiceChooser());
+        Exchange exchange = new DefaultExchange(new DefaultCamelContext());
+        exchange.setProperty("needs", "foo");
+        loadBalancer.process(exchange, "no-name", service -> {
+            assertEquals(1003, service.getPort());
+            return false;
+        });
+    }
+
+    @Test
+    public void testNoActiveServices() throws Exception {
+        DefaultServiceLoadBalancer loadBalancer = new DefaultServiceLoadBalancer();
+        DefaultCamelContext camelContext = new DefaultCamelContext();
+        loadBalancer.setCamelContext(camelContext);
+        loadBalancer.setServiceDiscovery(serviceDiscovery);
         loadBalancer
-                .setServiceFilter(services -> services.stream().filter(s -> s.getPort() < 1000).collect(Collectors.toList()));
+                .setServiceFilter(
+                        (exchange, services) -> services.stream().filter(s -> s.getPort() < 1000).collect(Collectors.toList()));
         loadBalancer.setServiceChooser(new RoundRobinServiceChooser());
         assertThrows(RejectedExecutionException.class, () -> {
-            loadBalancer.process("no-name", service -> false);
+            loadBalancer.process(new DefaultExchange(camelContext), "no-name", service -> false);
         });
     }
 }
diff --git a/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/ServiceCallConfigurationTest.java b/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/ServiceCallConfigurationTest.java
index d8283d4..d28fba4 100644
--- a/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/ServiceCallConfigurationTest.java
+++ b/core/camel-cloud/src/test/java/org/apache/camel/impl/cloud/ServiceCallConfigurationTest.java
@@ -20,6 +20,7 @@ import java.util.List;
 import java.util.UUID;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.builder.RouteBuilder;
@@ -31,6 +32,7 @@ import org.apache.camel.model.cloud.ServiceCallConfigurationDefinition;
 import org.apache.camel.model.cloud.ServiceCallDefinitionConstants;
 import org.apache.camel.model.cloud.ServiceCallExpressionConfiguration;
 import org.apache.camel.model.language.SimpleExpression;
+import org.apache.camel.support.DefaultExchange;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -372,11 +374,12 @@ public class ServiceCallConfigurationTest {
 
             assertTrue(lb.getServiceDiscovery() instanceof StaticServiceDiscovery);
 
+            Exchange exchange = new DefaultExchange(context);
             List<ServiceDefinition> services1 = lb.getServiceDiscovery().getServices("hello-service");
-            assertEquals(2, filter.apply(services1).size());
+            assertEquals(2, filter.apply(exchange, services1).size());
 
             List<ServiceDefinition> services2 = lb.getServiceDiscovery().getServices("hello-svc");
-            assertEquals(1, filter.apply(services2).size());
+            assertEquals(1, filter.apply(exchange, services2).size());
 
         } finally {
             if (context != null) {
diff --git a/core/camel-core-engine/src/main/docs/modules/eips/pages/serviceCall-eip.adoc b/core/camel-core-engine/src/main/docs/modules/eips/pages/serviceCall-eip.adoc
index 19c1a73..3843794 100644
--- a/core/camel-core-engine/src/main/docs/modules/eips/pages/serviceCall-eip.adoc
+++ b/core/camel-core-engine/src/main/docs/modules/eips/pages/serviceCall-eip.adoc
@@ -379,7 +379,9 @@ And in XML
 </camelContext>
 ----
 
-== Blacklist Service Filter
+== Service Filter
+
+=== Blacklist Service Filter
 
 This service filter implementation removes the listed services from those found by the service discovery.
 Each service should be provided in the following form:
@@ -438,14 +440,56 @@ And in XML
 </camelContext>
 ----
 
+=== Custom Service Filter
+
+Service Filters choose suitable candidates from the service definitions found in the service discovery. 
+
+As of Camel 3.10.0 they have access to the current exchange, which allows you to create service filters 
+comparing service metadata with message content.
+
+Assuming you have labeled one of the services in your service discovery to support a certain type of requests:
+
+[source,java]
+----
+serviceDiscovery.addServer(new DefaultServiceDefinition("service", "127.0.0.1", 1003, 
+    Collections.singletonMap("supports", "foo")));
+----
+
+The current exchange has a property which says that it needs a foo service:
+
+[source,java]
+----
+exchange.setProperty("needs", "foo")
+----
+
+You can then use a `ServiceFilter` to select the service instances which match the exchange:
+
+[source,java]
+----
+from("direct:start")
+    .serviceCall()
+        .name("service")
+        .serviceFilter((exchange, services) -> services.stream()
+			.filter(serviceDefinition -> Optional.ofNullable(serviceDefinition.getMetadata()
+				.get("supports"))
+				.orElse("")
+				.equals(exchange.getProperty("needs", String.class)))
+			.collect(Collectors.toList()));
+        .end()
+    .to("mock:result");
+----
+
 == Load Balancer
 
-The Service Call EIP comes with its own Load Balancer which is instantiated by default if a custom is not configured and
-glues Service Discovery, Service Filer, Service Chooser and Service Expression together to load balance requests among the available services.
+The Service Call EIP comes with its own loadbalancer which is instantiated by default if a custom loadbalancer is not configured. It glues Service Discovery, Service Filter, Service Chooser and Service Expression together to load balance requests among the available services.
 
-If you need a more sophisticate load balancer you can use Ribbon by adding camel-ribbon to the mix,
+If you need a more sophisticated load balancer you can use Ribbon by adding camel-ribbon to the mix,
 maven users will need to add the following dependency to their pom.xml
 
+NOTE: The `RibbonServiceLoadBalancer` has no concept of a current `Exchange`. 
+Service filters therefore receive a dummy exchange when used with Ribbon.
+
+
 [source,xml]
 ----
 <dependency>
@@ -491,7 +535,7 @@ And in XML
 </camelContext>
 ----
 
-You can configure Ribbon key programmatic using `RibbonConfiguration`:
+You can configure Ribbon key programmatically using `RibbonConfiguration`:
 
 [source,java]
 ----
@@ -540,7 +584,7 @@ globalConf.setServiceChooser(
 
 ServiceCallConfigurationDefinition httpsConf = new ServiceCallConfigurationDefinition();
 httpsConf.setServiceFilter(
-    list -> list.stream().filter(s -> s.getPort() == 443).collect(toList())
+    list -> list.stream().filter((exchange, s) -> s.getPort() == 443).collect(toList())
 );
 
 getContext().setServiceCallConfiguration(globalConf);
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/cloud/CombinedServiceCallServiceFilterConfiguration.java b/core/camel-core-model/src/main/java/org/apache/camel/model/cloud/CombinedServiceCallServiceFilterConfiguration.java
index 219d527..b2b9f7b 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/cloud/CombinedServiceCallServiceFilterConfiguration.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/cloud/CombinedServiceCallServiceFilterConfiguration.java
@@ -59,7 +59,7 @@ public class CombinedServiceCallServiceFilterConfiguration extends ServiceCallSe
 
     /**
      * List of ServiceFilter configuration to use
-     * 
+     *
      * @param serviceFilterConfigurations
      */
     public void setServiceFilterConfigurations(List<ServiceCallServiceFilterConfiguration> serviceFilterConfigurations) {
diff --git a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_10.adoc b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_10.adoc
index cbfe409..12fb0b3 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_10.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_10.adoc
@@ -10,6 +10,11 @@ from both 3.0 to 3.1 and 3.1 to 3.2.
 
 The method `concurrentTasks` on `org.apache.camel.support.DefaultScheduledPollConsumerScheduler ` has been renamed to `concurrentConsumers`.
 
+The `org.apache.camel.cloud.ServiceFilter` and `org.apache.camel.cloud.ServiceLoadBalancer` 
+functional interface methods take the current `Exchange` as additional parameter 
+to allow for content-based filtering of service candidates. `RibbonServiceLoadBalancer` 
+has no notion of a current exchange, service filters therefore receive a dummy exchange when used with Ribbon.
+
 === camel-scheduler
 
 The option `concurrentTasks` has been renamed to `poolSize` to better reflect its purpose.
diff --git a/parent/pom.xml b/parent/pom.xml
index e1ba182..870b4bb 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -480,6 +480,7 @@
         <rhino-version>1.7.7.1</rhino-version>
         <rhino-js-version>1.7R2</rhino-js-version>
         <ribbon-version>2.3.0</ribbon-version>
+        <servo-version>0.10.1</servo-version>
         <roaster-version>2.22.0.Final</roaster-version>
         <robotframework-version>3.2.2</robotframework-version>
         <rome-version>1.15.0</rome-version>