You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2016/06/07 14:32:03 UTC

[3/7] camel git commit: CAMEL-9989 : Create a Consul based ServiceCall EIP

CAMEL-9989 : Create a Consul based ServiceCall EIP


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ce007953
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ce007953
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ce007953

Branch: refs/heads/master
Commit: ce00795362c58cb7aaf549e0502e779f6c56edcb
Parents: f0b9526
Author: lburgazzoli <lb...@gmail.com>
Authored: Thu May 26 12:47:24 2016 +0200
Committer: lburgazzoli <lb...@gmail.com>
Committed: Tue Jun 7 16:30:43 2016 +0200

----------------------------------------------------------------------
 .../remote/DefaultServiceCallExpression.java    |  51 +++++
 .../remote/DefaultServiceCallProcessor.java     | 199 +++++++++++++++++++
 .../DefaultServiceCallProcessorFactory.java     | 192 ++++++++++++++++++
 .../impl/remote/DefaultServiceCallServer.java   |  45 +++++
 .../remote/RandomServiceCallLoadBalancer.java   |  55 +++++
 .../RoundRobinServiceCallLoadBalancer.java      |  51 +++++
 .../remote/ConsulConfigurationDefinition.java   |  40 ++++
 .../model/remote/ServiceCallDefinition.java     |  10 +
 .../org/apache/camel/model/remote/jaxb.index    |   1 +
 components/camel-consul/pom.xml                 |  15 ++
 .../camel-consul/src/main/docs/consul.adoc      |   8 +-
 .../camel/component/consul/ConsulComponent.java |   2 +-
 .../component/consul/ConsulConfiguration.java   |  89 ++++++++-
 .../camel/component/consul/ConsulConstants.java |   4 +
 .../camel/component/consul/ConsulEndpoint.java  |  27 +--
 .../service/ConsulProcessorFactory.java         |  26 +++
 .../ConsulServiceCallServerListStrategies.java  |  70 +++++++
 .../ConsulServiceCallServerListStrategy.java    | 130 ++++++++++++
 .../apache/camel/model/ServiceCallDefinition    |  18 ++
 .../service/ServiceCallClientRouteTest.java     | 124 ++++++++++++
 .../src/test/resources/log4j.properties         |   2 +-
 .../camel/spring/CamelContextFactoryBean.java   |   2 +
 .../spring/handler/CamelNamespaceHandler.java   |   2 +
 23 files changed, 1125 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ce007953/camel-core/src/main/java/org/apache/camel/impl/remote/DefaultServiceCallExpression.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/remote/DefaultServiceCallExpression.java b/camel-core/src/main/java/org/apache/camel/impl/remote/DefaultServiceCallExpression.java
new file mode 100644
index 0000000..5ce2cca
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/remote/DefaultServiceCallExpression.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.impl.remote;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.support.ServiceCallExpressionSupport;
+import org.apache.camel.util.ExchangeHelper;
+
+public class DefaultServiceCallExpression extends ServiceCallExpressionSupport {
+    public static final String SERVER_IP = "CamelServiceCallServerIp";
+    public static final String SERVER_PORT = "CamelServiceCallServerPort";
+
+    private final String ipHeader;
+    private final String portHeader;
+
+    public DefaultServiceCallExpression(String name, String scheme, String contextPath, String uri) {
+        this(name, scheme, contextPath, uri, SERVER_IP, SERVER_PORT);
+    }
+
+    public DefaultServiceCallExpression(String name, String scheme, String contextPath, String uri, String ipHeader, String portHeader) {
+        super(name, scheme, contextPath, uri);
+
+        this.ipHeader = ipHeader;
+        this.portHeader = portHeader;
+    }
+
+    @Override
+    public String getIp(Exchange exchange) throws Exception {
+        return ExchangeHelper.getMandatoryHeader(exchange, ipHeader, String.class);
+    }
+
+    @Override
+    public int getPort(Exchange exchange) throws Exception {
+        return ExchangeHelper.getMandatoryHeader(exchange, portHeader, int.class);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/ce007953/camel-core/src/main/java/org/apache/camel/impl/remote/DefaultServiceCallProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/remote/DefaultServiceCallProcessor.java b/camel-core/src/main/java/org/apache/camel/impl/remote/DefaultServiceCallProcessor.java
new file mode 100644
index 0000000..4cd40d2
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/remote/DefaultServiceCallProcessor.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.remote;
+
+import java.util.Collection;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Traceable;
+import org.apache.camel.processor.SendDynamicProcessor;
+import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.ServiceCallLoadBalancer;
+import org.apache.camel.spi.ServiceCallServer;
+import org.apache.camel.spi.ServiceCallServerListStrategy;
+import org.apache.camel.support.ServiceCallExpressionSupport;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultServiceCallProcessor extends ServiceSupport implements AsyncProcessor, CamelContextAware, Traceable, IdAware {
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultServiceCallProcessor.class);
+
+    private final ServiceCallExpressionSupport serviceCallExpression;
+    private final ExchangePattern exchangePattern;
+    private final String uri;
+    private final String name;
+    private final String scheme;
+    private final String contextPath;
+    private CamelContext camelContext;
+    private String id;
+    private ServiceCallServerListStrategy<ServiceCallServer> serverListStrategy;
+    private ServiceCallLoadBalancer<ServiceCallServer> loadBalancer;
+    private SendDynamicProcessor processor;
+
+    public DefaultServiceCallProcessor(String name, String scheme, String uri, ExchangePattern exchangePattern) {
+        this.uri = uri;
+        this.exchangePattern = exchangePattern;
+
+        // setup from the provided name which can contain scheme and context-path information as well
+        String serviceName;
+        if (name.contains("/")) {
+            serviceName = ObjectHelper.before(name, "/");
+            this.contextPath = ObjectHelper.after(name, "/");
+        } else if (name.contains("?")) {
+            serviceName = ObjectHelper.before(name, "?");
+            this.contextPath = ObjectHelper.after(name, "?");
+        } else {
+            serviceName = name;
+            this.contextPath = null;
+        }
+        if (serviceName.contains(":")) {
+            this.scheme = ObjectHelper.before(serviceName, ":");
+            this.name = ObjectHelper.after(serviceName, ":");
+        } else {
+            this.scheme = scheme;
+            this.name = serviceName;
+        }
+
+        this.serviceCallExpression = new DefaultServiceCallExpression(
+            this.name,
+            this.scheme,
+            this.contextPath,
+            this.uri);
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public String getId() {
+        return id;
+    }
+
+    @Override
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    @Override
+    public String getTraceLabel() {
+        return id;
+    }
+
+    public ServiceCallLoadBalancer<ServiceCallServer> getLoadBalancer() {
+        return loadBalancer;
+    }
+
+    public void setLoadBalancer(ServiceCallLoadBalancer<ServiceCallServer> loadBalancer) {
+        this.loadBalancer = loadBalancer;
+    }
+
+    public DefaultServiceCallProcessor loadBalancer(ServiceCallLoadBalancer<ServiceCallServer> loadBalancer) {
+        setLoadBalancer(loadBalancer);
+        return this;
+    }
+
+    public ServiceCallServerListStrategy<ServiceCallServer> getServerListStrategy() {
+        return serverListStrategy;
+    }
+
+    public void setServerListStrategy(ServiceCallServerListStrategy<ServiceCallServer> serverListStrategy) {
+        this.serverListStrategy = serverListStrategy;
+    }
+
+    public DefaultServiceCallProcessor serverListStrategy(ServiceCallServerListStrategy<ServiceCallServer> serverListStrategy) {
+        setServerListStrategy(serverListStrategy);
+        return this;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        ObjectHelper.notEmpty(name, "name", this);
+        ObjectHelper.notNull(camelContext, "camelContext");
+        ObjectHelper.notNull(serverListStrategy, "serverListStrategy");
+        ObjectHelper.notNull(loadBalancer, "loadBalancer");
+
+
+        LOG.info("ConsulsServiceCall at dc: {} with service name: {} is using load balancer: {} and service discovery: {}",
+            name, loadBalancer, serverListStrategy);
+
+        processor = new SendDynamicProcessor(uri, serviceCallExpression);
+        processor.setCamelContext(getCamelContext());
+        if (exchangePattern != null) {
+            processor.setPattern(exchangePattern);
+        }
+
+        ServiceHelper.startServices(serverListStrategy, processor);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        ServiceHelper.stopServices(processor, serverListStrategy);
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        AsyncProcessorHelper.process(this, exchange);
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        Collection<ServiceCallServer> servers = null;
+        try {
+            servers = serverListStrategy.getUpdatedListOfServers();
+            if (servers == null || servers.isEmpty()) {
+                exchange.setException(new RejectedExecutionException("No active services with name " + name));
+            }
+        } catch (Throwable e) {
+            exchange.setException(e);
+        }
+
+        if (exchange.getException() != null) {
+            callback.done(true);
+            return true;
+        }
+
+        // let the client load balancer chose which server to use
+        ServiceCallServer server = loadBalancer.chooseServer(servers);
+        String ip = server.getIp();
+        int port = server.getPort();
+        LOG.debug("Service {} active at server: {}:{}", name, ip, port);
+
+        // set selected server as header
+        exchange.getIn().setHeader(DefaultServiceCallExpression.SERVER_IP, ip);
+        exchange.getIn().setHeader(DefaultServiceCallExpression.SERVER_PORT, port);
+
+        // use the dynamic send processor to call the service
+        return processor.process(exchange, callback);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/ce007953/camel-core/src/main/java/org/apache/camel/impl/remote/DefaultServiceCallProcessorFactory.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/remote/DefaultServiceCallProcessorFactory.java b/camel-core/src/main/java/org/apache/camel/impl/remote/DefaultServiceCallProcessorFactory.java
new file mode 100644
index 0000000..6b145c7
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/remote/DefaultServiceCallProcessorFactory.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.remote;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.model.remote.ServiceCallConfigurationDefinition;
+import org.apache.camel.model.remote.ServiceCallDefinition;
+import org.apache.camel.spi.ProcessorFactory;
+import org.apache.camel.spi.RouteContext;
+import org.apache.camel.spi.ServiceCallLoadBalancer;
+import org.apache.camel.spi.ServiceCallServerListStrategy;
+import org.apache.camel.util.CamelContextHelper;
+import org.apache.camel.util.IntrospectionSupport;
+
+public class DefaultServiceCallProcessorFactory implements ProcessorFactory {
+
+    @Override
+    public Processor createChildProcessor(RouteContext routeContext, ProcessorDefinition<?> definition, boolean mandatory) throws Exception {
+        // not in use
+        return null;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Processor createProcessor(RouteContext routeContext, ProcessorDefinition<?> definition) throws Exception {
+        if (definition instanceof ServiceCallDefinition) {
+            ServiceCallDefinition sc = (ServiceCallDefinition) definition;
+
+            String name = sc.getName();
+            String uri = sc.getUri();
+            ExchangePattern mep = sc.getPattern();
+
+            ServiceCallConfigurationDefinition config = sc.getServiceCallConfiguration();
+            ServiceCallConfigurationDefinition configRef = null;
+            if (sc.getServiceCallConfigurationRef() != null) {
+                // lookup in registry first
+                configRef = CamelContextHelper.lookup(routeContext.getCamelContext(), sc.getServiceCallConfigurationRef(), ServiceCallConfigurationDefinition.class);
+                if (configRef == null) {
+                    // and fallback as service configuration
+                    routeContext.getCamelContext().getServiceCallConfiguration(sc.getServiceCallConfigurationRef(), ServiceCallConfigurationDefinition.class);
+                }
+            }
+
+            // if no configuration explicit configured then use default
+            if (config == null && configRef == null) {
+                config = routeContext.getCamelContext().getServiceCallConfiguration(null, ServiceCallConfigurationDefinition.class);
+            }
+            if (config == null) {
+                // if no default then try to find if there configuration in the registry of the given type
+                Set<ServiceCallConfigurationDefinition> set = routeContext.getCamelContext().getRegistry().findByType(ServiceCallConfigurationDefinition.class);
+                if (set.size() == 1) {
+                    config = set.iterator().next();
+                }
+            }
+
+            if (config == null && configRef == null) {
+                throw new IllegalStateException("The ServiceCall: " + definition + " must be configured before it can be used.");
+            }
+
+            // extract the properties from the configuration from the model
+            Map<String, Object> parameters = new HashMap<>();
+            if (configRef != null) {
+                IntrospectionSupport.getProperties(configRef, parameters, null);
+            }
+            if (config != null) {
+                IntrospectionSupport.getProperties(config, parameters, null);
+            }
+
+            // lookup the load balancer to use (configured on EIP takes precedence vs configured on configuration)
+            ServiceCallLoadBalancer lb = configureLoadBalancer(routeContext, sc);
+            if (lb == null && config != null) {
+                lb = configureLoadBalancer(routeContext, config);
+            }
+            if (lb == null && configRef != null) {
+                lb = configureLoadBalancer(routeContext, configRef);
+            }
+
+            // lookup the server list strategy to use (configured on EIP takes precedence vs configured on configuration)
+            ServiceCallServerListStrategy sl = configureServerListStrategy(routeContext, sc);
+            if (sl == null && config != null) {
+                sl = configureServerListStrategy(routeContext, config);
+            }
+            if (sl == null && configRef != null) {
+                sl = configureServerListStrategy(routeContext, configRef);
+            }
+
+            // the component is used to configure what the default scheme to use (eg camel component name)
+            String component = config != null ? config.getComponent() : null;
+            if (component == null && configRef != null) {
+                component = configRef.getComponent();
+            }
+
+            return createProcessor(name, component, uri, mep, parameters, lb, sl);
+
+        } else {
+            return null;
+        }
+    }
+
+    protected Processor createProcessor(
+            String name,
+            String component,
+            String uri,
+            ExchangePattern mep,
+            Map<String, Object> parameters,
+            ServiceCallLoadBalancer lb, ServiceCallServerListStrategy sl) {
+
+        return new DefaultServiceCallProcessor(name, component, uri, mep).
+            loadBalancer(lb)
+            .serverListStrategy(sl);
+    }
+
+
+    protected ServiceCallLoadBalancer configureLoadBalancer(RouteContext routeContext, ServiceCallDefinition sd) {
+        ServiceCallLoadBalancer lb = null;
+
+        if (sd != null) {
+            lb = sd.getLoadBalancer();
+            if (lb == null && sd.getLoadBalancerRef() != null) {
+                String ref = sd.getLoadBalancerRef();
+                // special for ref is referring to built-in
+                if ("random".equalsIgnoreCase(ref)) {
+                    lb = new RandomServiceCallLoadBalancer();
+                } else if ("roundrobin".equalsIgnoreCase(ref)) {
+                    lb = new RoundRobinServiceCallLoadBalancer();
+                } else {
+                    lb = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), ref, ServiceCallLoadBalancer.class);
+                }
+            }
+        }
+
+        return lb;
+    }
+
+    protected ServiceCallLoadBalancer configureLoadBalancer(RouteContext routeContext, ServiceCallConfigurationDefinition config) {
+        ServiceCallLoadBalancer lb = config.getLoadBalancer();
+        if (lb == null && config.getLoadBalancerRef() != null) {
+            String ref = config.getLoadBalancerRef();
+            // special for ref is referring to built-in
+            if ("random".equalsIgnoreCase(ref)) {
+                lb = new RandomServiceCallLoadBalancer();
+            } else if ("roundrobin".equalsIgnoreCase(ref)) {
+                lb = new RoundRobinServiceCallLoadBalancer();
+            } else {
+                lb = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), ref, ServiceCallLoadBalancer.class);
+            }
+        }
+        return lb;
+    }
+
+    protected ServiceCallServerListStrategy configureServerListStrategy(RouteContext routeContext, ServiceCallDefinition sd) {
+        ServiceCallServerListStrategy lb = null;
+
+        if (sd != null) {
+            lb = sd.getServerListStrategy();
+            if (lb == null && sd.getServerListStrategyRef() != null) {
+                lb = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), sd.getServerListStrategyRef(), ServiceCallServerListStrategy.class);
+            }
+        }
+
+        return lb;
+    }
+
+    protected ServiceCallServerListStrategy configureServerListStrategy(RouteContext routeContext, ServiceCallConfigurationDefinition config) {
+        ServiceCallServerListStrategy lb = config.getServerListStrategy();
+        if (lb == null && config.getServerListStrategyRef() != null) {
+            String ref = config.getServerListStrategyRef();
+            lb = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), ref, ServiceCallServerListStrategy.class);
+        }
+        return lb;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ce007953/camel-core/src/main/java/org/apache/camel/impl/remote/DefaultServiceCallServer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/remote/DefaultServiceCallServer.java b/camel-core/src/main/java/org/apache/camel/impl/remote/DefaultServiceCallServer.java
new file mode 100644
index 0000000..e42afc7
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/remote/DefaultServiceCallServer.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.remote;
+
+import org.apache.camel.spi.ServiceCallServer;
+
+public class DefaultServiceCallServer implements ServiceCallServer {
+
+    private final String ip;
+    private final int port;
+
+    public DefaultServiceCallServer(String ip, int port) {
+        this.ip = ip;
+        this.port = port;
+    }
+
+    @Override
+    public String getIp() {
+        return ip;
+    }
+
+    @Override
+    public int getPort() {
+        return port;
+    }
+
+    @Override
+    public String toString() {
+        return "DefaultServiceCallServer[" + ip + ":" + port + "]";
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/ce007953/camel-core/src/main/java/org/apache/camel/impl/remote/RandomServiceCallLoadBalancer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/remote/RandomServiceCallLoadBalancer.java b/camel-core/src/main/java/org/apache/camel/impl/remote/RandomServiceCallLoadBalancer.java
new file mode 100644
index 0000000..895863e
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/remote/RandomServiceCallLoadBalancer.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.remote;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.camel.spi.ServiceCallLoadBalancer;
+import org.apache.camel.spi.ServiceCallServer;
+
+public class RandomServiceCallLoadBalancer implements ServiceCallLoadBalancer<ServiceCallServer> {
+    private final Random random;
+
+    public RandomServiceCallLoadBalancer() {
+        this.random = new Random();
+    }
+
+    @SuppressWarnings("uncheked")
+    @Override
+    public ServiceCallServer chooseServer(Collection<ServiceCallServer> servers) {
+        List<ServiceCallServer> list;
+        if (servers instanceof List) {
+            list = (List<ServiceCallServer>)servers;
+        } else {
+            list = new ArrayList<>(servers);
+        }
+
+        int size = list.size();
+        int ran = this.random.nextInt(size);
+
+        servers.stream().limit(1);
+        return list.get(ran);
+    }
+
+    @Override
+    public String toString() {
+        return "RandomServiceCallLoadBalancer";
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ce007953/camel-core/src/main/java/org/apache/camel/impl/remote/RoundRobinServiceCallLoadBalancer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/remote/RoundRobinServiceCallLoadBalancer.java b/camel-core/src/main/java/org/apache/camel/impl/remote/RoundRobinServiceCallLoadBalancer.java
new file mode 100644
index 0000000..2b8b488
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/remote/RoundRobinServiceCallLoadBalancer.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.impl.remote;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.camel.spi.ServiceCallLoadBalancer;
+import org.apache.camel.spi.ServiceCallServer;
+
+public class RoundRobinServiceCallLoadBalancer implements ServiceCallLoadBalancer<ServiceCallServer> {
+    private int counter = -1;
+
+    @SuppressWarnings("uncheked")
+    @Override
+    public ServiceCallServer chooseServer(Collection<ServiceCallServer> servers) {
+        List<ServiceCallServer> list;
+        if (servers instanceof List) {
+            list = (List<ServiceCallServer>)servers;
+        } else {
+            list = new ArrayList<>(servers);
+        }
+
+        int size = list.size();
+        if (++counter >= size) {
+            counter = 0;
+        }
+        return list.get(counter);
+    }
+
+    @Override
+    public String toString() {
+        return "RoundRobinServiceCallLoadBalancer";
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ce007953/camel-core/src/main/java/org/apache/camel/model/remote/ConsulConfigurationDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/remote/ConsulConfigurationDefinition.java b/camel-core/src/main/java/org/apache/camel/model/remote/ConsulConfigurationDefinition.java
new file mode 100644
index 0000000..aba621f
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/model/remote/ConsulConfigurationDefinition.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.model.remote;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.camel.spi.Metadata;
+
+/**
+ * Consul remote service call configuration
+ */
+@Metadata(label = "eip,routing,remote")
+@XmlRootElement(name = "consulConfiguration")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class ConsulConfigurationDefinition extends ServiceCallConfigurationDefinition {
+
+    public ConsulConfigurationDefinition() {
+    }
+
+    public ConsulConfigurationDefinition(ServiceCallDefinition parent) {
+        super(parent);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ce007953/camel-core/src/main/java/org/apache/camel/model/remote/ServiceCallDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/remote/ServiceCallDefinition.java b/camel-core/src/main/java/org/apache/camel/model/remote/ServiceCallDefinition.java
index 25ead8d..ed1c2aa 100644
--- a/camel-core/src/main/java/org/apache/camel/model/remote/ServiceCallDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/remote/ServiceCallDefinition.java
@@ -125,6 +125,16 @@ public class ServiceCallDefinition extends NoOutputDefinition<ServiceCallDefinit
     }
 
     /**
+     * Configures the Service Call EIP using Consul
+     * <p/>
+     * Use <tt>end</tt> when configuration is complete, to return back to the Service Call EIP.
+     */
+    public ConsulConfigurationDefinition consulConfiguration() {
+        serviceCallConfiguration = new ConsulConfigurationDefinition(this);
+        return (ConsulConfigurationDefinition) serviceCallConfiguration;
+    }
+
+    /**
      * Configures the ServiceCall using the given configuration
      */
     public ServiceCallDefinition serviceCallConfiguration(ServiceCallConfigurationDefinition configuration) {

http://git-wip-us.apache.org/repos/asf/camel/blob/ce007953/camel-core/src/main/resources/org/apache/camel/model/remote/jaxb.index
----------------------------------------------------------------------
diff --git a/camel-core/src/main/resources/org/apache/camel/model/remote/jaxb.index b/camel-core/src/main/resources/org/apache/camel/model/remote/jaxb.index
index 29147c2..6d7d250 100644
--- a/camel-core/src/main/resources/org/apache/camel/model/remote/jaxb.index
+++ b/camel-core/src/main/resources/org/apache/camel/model/remote/jaxb.index
@@ -14,6 +14,7 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ------------------------------------------------------------------------
+ConsulConfigurationDefinition
 KubernetesConfigurationDefinition
 RibbonConfigurationDefinition
 ServiceCallDefinition

http://git-wip-us.apache.org/repos/asf/camel/blob/ce007953/components/camel-consul/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-consul/pom.xml b/components/camel-consul/pom.xml
index 3f1d707..ef630cd 100644
--- a/components/camel-consul/pom.xml
+++ b/components/camel-consul/pom.xml
@@ -58,6 +58,21 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-test-spring</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-http</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-jetty</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-log4j12</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/camel/blob/ce007953/components/camel-consul/src/main/docs/consul.adoc
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/docs/consul.adoc b/components/camel-consul/src/main/docs/consul.adoc
index 011ffe1..c8ae235 100644
--- a/components/camel-consul/src/main/docs/consul.adoc
+++ b/components/camel-consul/src/main/docs/consul.adoc
@@ -47,8 +47,9 @@ The Consul component has no options.
 
 
 
+
 // endpoint options: START
-The Consul component supports 20 endpoint options which are listed below:
+The Consul component supports 22 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2s,1,1m,1m,5",options="header"]
@@ -56,20 +57,22 @@ The Consul component supports 20 endpoint options which are listed below:
 | Name | Group | Default | Java Type | Description
 | apiEndpoint | common |  | String | *Required* The API endpoint
 | connectTimeoutMillis | common |  | Long | Connect timeout for OkHttpClient
+| dc | common |  | String | The data center
 | key | common |  | String | The default key. Can be overridden by CamelConsulKey
 | pingInstance | common | true | boolean | Configure if the AgentClient should attempt a ping before returning the Consul instance
 | readTimeoutMillis | common |  | Long | Read timeout for OkHttpClient
+| tags | common |  | String | Set tags
 | url | common |  | String | The Consul agent URL
 | writeTimeoutMillis | common |  | Long | Write timeout for OkHttpClient
 | bridgeErrorHandler | consumer | false | boolean | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN/ERROR level and ignored.
 | exceptionHandler | consumer (advanced) |  | ExceptionHandler | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN/ERROR level and ignored.
 | action | producer |  | String | The default action. Can be overridden by CamelConsulAction
+| valueAsString | producer | false | boolean | Default to transform values retrieved from Consul i.e. on KV endpoint to string.
 | exchangePattern | advanced | InOnly | ExchangePattern | Sets the default exchange pattern when creating an exchange
 | synchronous | advanced | false | boolean | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported).
 | blockSeconds | watch | 10 | Integer | The second to wait for a watch event default 10 seconds
 | firstIndex | watch | 0 | long | The first index for watch for default 0
 | recursive | watch | false | boolean | Recursively watch default false
-| valueAsString | kv | false | boolean | Default to transform values retrieved from Consul i.e. on KV endpoint to string.
 | aclToken | security |  | String | Sets the ACL token to be used with Consul
 | password | security |  | String | Sets the password to be used for basic authentication
 | sslContextParameters | security |  | SSLContextParameters | SSL configuration using an org.apache.camel.util.jsse.SSLContextParameters instance.
@@ -79,6 +82,7 @@ The Consul component supports 20 endpoint options which are listed below:
 // endpoint options: END
 
 
+
 [[Consul-Headers]]
 Headers
 ^^^^^^^

http://git-wip-us.apache.org/repos/asf/camel/blob/ce007953/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulComponent.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulComponent.java
index 3bf94b9..83887e3 100644
--- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulComponent.java
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulComponent.java
@@ -51,7 +51,7 @@ public class ConsulComponent extends UriEndpointComponent {
     }
 
     private ConsulConfiguration createConfiguration(Map<String, Object> parameters) throws Exception {
-        ConsulConfiguration configuration = new ConsulConfiguration();
+        ConsulConfiguration configuration = new ConsulConfiguration(getCamelContext());
         setProperties(configuration, parameters);
 
         return configuration;

http://git-wip-us.apache.org/repos/asf/camel/blob/ce007953/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConfiguration.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConfiguration.java
index f2ef537..b9df9b1 100644
--- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConfiguration.java
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConfiguration.java
@@ -16,6 +16,12 @@
  */
 package org.apache.camel.component.consul;
 
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.orbitz.consul.Consul;
+import org.apache.camel.CamelContext;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
 import org.apache.camel.util.ObjectHelper;
@@ -25,6 +31,10 @@ import org.apache.camel.util.jsse.SSLContextParameters;
 public class ConsulConfiguration {
     @UriParam
     private String url;
+    @UriParam
+    private String dc;
+    @UriParam(javaType = "java.lang.String")
+    private Set<String> tags;
 
     @UriParam(label = "security")
     private SSLContextParameters sslContextParameters;
@@ -44,25 +54,29 @@ public class ConsulConfiguration {
     @UriParam(defaultValue = "true")
     private boolean pingInstance = true;
 
-
+    @UriParam
+    private String key;
     @UriParam(label = "producer")
     private String action;
-
-    @UriParam(label = "producer,kv", defaultValue = "false")
+    @UriParam(label = "producer", defaultValue = "false")
     private boolean valueAsString;
 
-    @UriParam
-    private String key;
-
     @UriParam(label = "consumer,watch", defaultValue = "10")
     private Integer blockSeconds = 10;
-
     @UriParam(label = "consumer,watch", defaultValue = "0")
     private long firstIndex;
-
     @UriParam(label = "consumer,watch", defaultValue = "false")
     private boolean recursive;
 
+    private final CamelContext context;
+
+    public ConsulConfiguration(CamelContext context) {
+        this.context = context;
+    }
+
+    public CamelContext getContext() {
+        return this.context;
+    }
 
     public String getUrl() {
         return url;
@@ -75,6 +89,36 @@ public class ConsulConfiguration {
         this.url = url;
     }
 
+    public String getDc() {
+        return dc;
+    }
+
+    /**
+     * The data center
+     */
+    public void setDc(String dc) {
+        this.dc = dc;
+    }
+
+    public Set<String> getTags() {
+        return tags;
+    }
+
+    /**
+     * Set tags
+     */
+    public void setTags(Set<String> tags) {
+        this.tags = tags;
+    }
+
+    /**
+     * Set tags
+     */
+    public void setTags(String tagsAsString) {
+        this.tags = new HashSet<>();
+        Collections.addAll(tags, tagsAsString.split(","));
+    }
+
     public SSLContextParameters getSslContextParameters() {
         return sslContextParameters;
     }
@@ -234,4 +278,33 @@ public class ConsulConfiguration {
     public void setRecursive(boolean recursive) {
         this.recursive = recursive;
     }
+
+    public Consul createConsulClient() throws Exception {
+        Consul.Builder builder = Consul.builder();
+        builder.withPing(pingInstance);
+
+        if (ObjectHelper.isNotEmpty(url)) {
+            builder.withUrl(url);
+        }
+        if (ObjectHelper.isNotEmpty(context) && ObjectHelper.isNotEmpty(sslContextParameters)) {
+            builder.withSslContext(sslContextParameters.createSSLContext(context));
+        }
+        if (ObjectHelper.isNotEmpty(aclToken)) {
+            builder.withAclToken(aclToken);
+        }
+        if (requiresBasicAuthentication()) {
+            builder.withBasicAuth(userName, password);
+        }
+        if (ObjectHelper.isNotEmpty(connectTimeoutMillis)) {
+            builder.withConnectTimeoutMillis(connectTimeoutMillis);
+        }
+        if (ObjectHelper.isNotEmpty(readTimeoutMillis)) {
+            builder.withReadTimeoutMillis(readTimeoutMillis);
+        }
+        if (ObjectHelper.isNotEmpty(writeTimeoutMillis)) {
+            builder.withWriteTimeoutMillis(writeTimeoutMillis);
+        }
+
+        return builder.build();
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/ce007953/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java
index bb1936e..1265fa5 100644
--- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java
@@ -17,6 +17,10 @@
 package org.apache.camel.component.consul;
 
 public interface ConsulConstants {
+    // Service Call EIP
+    String CONSUL_SERVER_IP = "CamelConsulServerIp";
+    String CONSUL_SERVER_PORT = "CamelConsulServerPort";
+
     String CONSUL_ACTION = "CamelConsulAction";
     String CONSUL_KEY = "CamelConsulKey";
     String CONSUL_EVENT_ID = "CamelConsulEventId";

http://git-wip-us.apache.org/repos/asf/camel/blob/ce007953/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulEndpoint.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulEndpoint.java
index 2b4d4e0..2c8a1ec 100644
--- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulEndpoint.java
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulEndpoint.java
@@ -96,32 +96,7 @@ public class ConsulEndpoint extends DefaultEndpoint {
 
     public synchronized Consul getConsul() throws Exception {
         if (consul == null) {
-            Consul.Builder builder = Consul.builder();
-            builder.withPing(configuration.isPingInstance());
-
-            if (ObjectHelper.isNotEmpty(configuration.getUrl())) {
-                builder.withUrl(configuration.getUrl());
-            }
-            if (ObjectHelper.isNotEmpty(configuration.getSslContextParameters())) {
-                builder.withSslContext(configuration.getSslContextParameters().createSSLContext(getCamelContext()));
-            }
-            if (ObjectHelper.isNotEmpty(configuration.getAclToken())) {
-                builder.withAclToken(configuration.getAclToken());
-            }
-            if (configuration.requiresBasicAuthentication()) {
-                builder.withBasicAuth(configuration.getUserName(), configuration.getPassword());
-            }
-            if (ObjectHelper.isNotEmpty(configuration.getConnectTimeoutMillis())) {
-                builder.withConnectTimeoutMillis(configuration.getConnectTimeoutMillis());
-            }
-            if (ObjectHelper.isNotEmpty(configuration.getReadTimeoutMillis())) {
-                builder.withReadTimeoutMillis(configuration.getReadTimeoutMillis());
-            }
-            if (ObjectHelper.isNotEmpty(configuration.getWriteTimeoutMillis())) {
-                builder.withWriteTimeoutMillis(configuration.getWriteTimeoutMillis());
-            }
-
-            consul = builder.build();
+            consul = configuration.createConsulClient();
         }
 
         return consul;

http://git-wip-us.apache.org/repos/asf/camel/blob/ce007953/components/camel-consul/src/main/java/org/apache/camel/component/consul/processor/service/ConsulProcessorFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/processor/service/ConsulProcessorFactory.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/processor/service/ConsulProcessorFactory.java
new file mode 100644
index 0000000..03940db
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/processor/service/ConsulProcessorFactory.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.processor.service;
+
+import org.apache.camel.impl.remote.DefaultServiceCallProcessorFactory;
+import org.apache.camel.spi.ProcessorFactory;
+
+/**
+ * {@link ProcessorFactory} that creates the Consul implementation of the ServiceCall EIP.
+ */
+public class ConsulProcessorFactory extends DefaultServiceCallProcessorFactory {
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ce007953/components/camel-consul/src/main/java/org/apache/camel/component/consul/processor/service/ConsulServiceCallServerListStrategies.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/processor/service/ConsulServiceCallServerListStrategies.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/processor/service/ConsulServiceCallServerListStrategies.java
new file mode 100644
index 0000000..c66dcdb
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/processor/service/ConsulServiceCallServerListStrategies.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.processor.service;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.orbitz.consul.model.catalog.CatalogService;
+import com.orbitz.consul.model.health.ServiceHealth;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.spi.ServiceCallServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class ConsulServiceCallServerListStrategies {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ConsulServiceCallServerListStrategies.class);
+
+    private ConsulServiceCallServerListStrategies() {
+    }
+
+    public static final class OnDemand extends ConsulServiceCallServerListStrategy {
+        public OnDemand(ConsulConfiguration configuration, String name) throws Exception {
+            super(configuration, name);
+        }
+
+        @Override
+        public Collection<ServiceCallServer> getUpdatedListOfServers() {
+            List<CatalogService> services = getCatalogClient()
+                .getService(getName(), getCatalogOptions())
+                .getResponse();
+
+            List<ServiceHealth> healths = getHealthClient()
+                .getAllServiceInstances(getName(), getCatalogOptions())
+                .getResponse();
+
+            return services.stream()
+                .filter(service -> !hasFailingChecks(service, healths))
+                .map(this::newServer)
+                .collect(Collectors.toList());
+        }
+
+        @Override
+        public String toString() {
+            return "OnDemand";
+        }
+    }
+
+    // *************************************************************************
+    // Helpers
+    // *************************************************************************
+
+    public static ConsulServiceCallServerListStrategy onDemand(ConsulConfiguration configuration, String name) throws Exception {
+        return new OnDemand(configuration, name);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ce007953/components/camel-consul/src/main/java/org/apache/camel/component/consul/processor/service/ConsulServiceCallServerListStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/processor/service/ConsulServiceCallServerListStrategy.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/processor/service/ConsulServiceCallServerListStrategy.java
new file mode 100644
index 0000000..ebe5ddf
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/processor/service/ConsulServiceCallServerListStrategy.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.processor.service;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import com.orbitz.consul.CatalogClient;
+import com.orbitz.consul.Consul;
+import com.orbitz.consul.HealthClient;
+import com.orbitz.consul.model.catalog.CatalogService;
+import com.orbitz.consul.model.health.HealthCheck;
+import com.orbitz.consul.model.health.ServiceHealth;
+import com.orbitz.consul.option.CatalogOptions;
+import com.orbitz.consul.option.ImmutableCatalogOptions;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.impl.remote.DefaultServiceCallServer;
+import org.apache.camel.spi.ServiceCallServer;
+import org.apache.camel.spi.ServiceCallServerListStrategy;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.ObjectHelper;
+
+
+abstract class ConsulServiceCallServerListStrategy extends ServiceSupport implements ServiceCallServerListStrategy<ServiceCallServer> {
+    private final Consul client;
+    private final String name;
+    private final CatalogOptions catalogOptions;
+
+    ConsulServiceCallServerListStrategy(ConsulConfiguration configuration, String name) throws Exception {
+        this.client = configuration.createConsulClient();
+        this.name = name;
+
+        ImmutableCatalogOptions.Builder builder = ImmutableCatalogOptions.builder();
+        if (ObjectHelper.isNotEmpty(configuration.getDc())) {
+            builder.datacenter(configuration.getDc());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getTags())) {
+            configuration.getTags().forEach(builder::tag);
+        }
+
+        catalogOptions = builder.build();
+    }
+
+    @Override
+    public Collection<ServiceCallServer> getInitialListOfServers() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        // noop
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        // noop
+    }
+
+    @Override
+    public String toString() {
+        return "ConsulServiceCallServerListStrategy";
+    }
+
+    // *************************
+    // Getter
+    // *************************
+
+    protected Consul getClient() {
+        return client;
+    }
+
+    protected CatalogClient getCatalogClient() {
+        return client.catalogClient();
+    }
+
+    protected HealthClient getHealthClient() {
+        return client.healthClient();
+    }
+
+    protected String getName() {
+        return name;
+    }
+
+    protected CatalogOptions getCatalogOptions() {
+        return catalogOptions;
+    }
+
+    // *************************
+    // Helpers
+    // *************************
+
+    protected boolean isNotHealthy(HealthCheck check) {
+        final String status = check.getStatus();
+        return status != null && !status.equalsIgnoreCase("passing");
+    }
+
+    protected boolean isNotHealthy(ServiceHealth health) {
+        return health.getChecks().stream().anyMatch(this::isNotHealthy);
+    }
+
+    protected boolean isCheckOnService(ServiceHealth check, CatalogService service) {
+        return check.getService().getService().equalsIgnoreCase(service.getServiceName());
+    }
+
+    protected boolean hasFailingChecks(CatalogService service, List<ServiceHealth> healths) {
+        return healths.stream().anyMatch(health -> isCheckOnService(health, service) && isNotHealthy(health));
+    }
+
+    protected ServiceCallServer newServer(CatalogService service) {
+        return new DefaultServiceCallServer(
+            service.getServiceAddress(),
+            service.getServicePort()
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ce007953/components/camel-consul/src/main/resources/META-INF/services/org/apache/camel/model/ServiceCallDefinition
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/resources/META-INF/services/org/apache/camel/model/ServiceCallDefinition b/components/camel-consul/src/main/resources/META-INF/services/org/apache/camel/model/ServiceCallDefinition
new file mode 100644
index 0000000..0172512
--- /dev/null
+++ b/components/camel-consul/src/main/resources/META-INF/services/org/apache/camel/model/ServiceCallDefinition
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.component.consul.processor.service.ConsulProcessorFactory

http://git-wip-us.apache.org/repos/asf/camel/blob/ce007953/components/camel-consul/src/test/java/org/apache/camel/component/consul/processor/service/ServiceCallClientRouteTest.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/test/java/org/apache/camel/component/consul/processor/service/ServiceCallClientRouteTest.java b/components/camel-consul/src/test/java/org/apache/camel/component/consul/processor/service/ServiceCallClientRouteTest.java
new file mode 100644
index 0000000..e3f753d
--- /dev/null
+++ b/components/camel-consul/src/test/java/org/apache/camel/component/consul/processor/service/ServiceCallClientRouteTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.processor.service;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.orbitz.consul.AgentClient;
+import com.orbitz.consul.model.agent.ImmutableRegistration;
+import com.orbitz.consul.model.agent.Registration;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.component.consul.ConsulTestSupport;
+import org.apache.camel.impl.remote.RoundRobinServiceCallLoadBalancer;
+import org.apache.camel.model.remote.ConsulConfigurationDefinition;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore
+public class ServiceCallClientRouteTest extends ConsulTestSupport {
+    private static final String SERVICE_NAME = "http-service";
+    private static final int SERVICE_COUNT = 5;
+    private static final int SERVICE_PORT_BASE = 8080;
+
+    private AgentClient client;
+    private List<Registration> registrations;
+    private List<String> expectedBodies;
+
+    // *************************************************************************
+    // Setup / tear down
+    // *************************************************************************
+
+    @Override
+    protected void doPreSetup() throws Exception {
+        client = getConsul().agentClient();
+
+        registrations = new ArrayList<>(SERVICE_COUNT);
+        expectedBodies = new ArrayList<>(SERVICE_COUNT);
+
+        for (int i = 0; i < SERVICE_COUNT; i++) {
+            Registration r = ImmutableRegistration.builder()
+                .id("service-" + i)
+                .name(SERVICE_NAME)
+                .address("127.0.0.1")
+                .port(SERVICE_PORT_BASE + i)
+                .build();
+
+            client.register(r);
+
+            registrations.add(r);
+            expectedBodies.add("ping on " + r.getPort().get());
+        }
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        registrations.forEach(r -> client.deregister(r.getId()));
+    }
+
+    // *************************************************************************
+    // Test
+    // *************************************************************************
+
+    @Test
+    public void testServiceCall() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(SERVICE_COUNT);
+        getMockEndpoint("mock:result").expectedBodiesReceivedInAnyOrder(expectedBodies);
+
+        for (int i = 0; i < SERVICE_COUNT; i++) {
+            template.sendBody("direct:start", "ping");
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    // *************************************************************************
+    // Route
+    // *************************************************************************
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                ConsulConfigurationDefinition config = new ConsulConfigurationDefinition();
+                config.setComponent("http");
+                config.setLoadBalancer(new RoundRobinServiceCallLoadBalancer());
+                config.setServerListStrategy(ConsulServiceCallServerListStrategies.onDemand(
+                    new ConsulConfiguration(context()),
+                    SERVICE_NAME
+                ));
+
+                // register configuration
+                context.setServiceCallConfiguration(config);
+
+                from("direct:start")
+                    .serviceCall(SERVICE_NAME)
+                    .to("log:org.apache.camel.component.consul.processor.service?level=INFO&showAll=true&multiline=true")
+                    .to("mock:result");
+
+                for (int i = SERVICE_PORT_BASE; i < SERVICE_PORT_BASE + SERVICE_COUNT; i++) {
+                    fromF("jetty:http://127.0.0.1:%d", i)
+                        .transform().simple("${in.body} on " + i);
+                }
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ce007953/components/camel-consul/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/test/resources/log4j.properties b/components/camel-consul/src/test/resources/log4j.properties
index 4103089..f7cacfe 100644
--- a/components/camel-consul/src/test/resources/log4j.properties
+++ b/components/camel-consul/src/test/resources/log4j.properties
@@ -1,7 +1,7 @@
 #
 # The logging properties used
 #
-log4j.rootLogger=INFO, file
+log4j.rootLogger=INFO, out
 
 # uncomment the following line to turn on Camel debugging
 log4j.logger.org.apache.camel.component.consul=DEBUG

http://git-wip-us.apache.org/repos/asf/camel/blob/ce007953/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java b/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
index 89d2502..9f48921 100644
--- a/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
+++ b/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
@@ -57,6 +57,7 @@ import org.apache.camel.model.RouteContextRefDefinition;
 import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.model.ThreadPoolProfileDefinition;
 import org.apache.camel.model.dataformat.DataFormatsDefinition;
+import org.apache.camel.model.remote.ConsulConfigurationDefinition;
 import org.apache.camel.model.remote.KubernetesConfigurationDefinition;
 import org.apache.camel.model.remote.RibbonConfigurationDefinition;
 import org.apache.camel.model.rest.RestConfigurationDefinition;
@@ -154,6 +155,7 @@ public class CamelContextFactoryBean extends AbstractCamelContextFactoryBean<Spr
     private CamelJMXAgentDefinition camelJMXAgent;
     @XmlElements({
             @XmlElement(name = "hystrixConfiguration", type = HystrixConfigurationDefinition.class, required = false),
+            @XmlElement(name = "consulConfiguration", type = KubernetesConfigurationDefinition.class, required = false),
             @XmlElement(name = "kubernetesConfiguration", type = KubernetesConfigurationDefinition.class, required = false),
             @XmlElement(name = "ribbonConfiguration", type = RibbonConfigurationDefinition.class, required = false),
             @XmlElement(name = "template", type = CamelProducerTemplateFactoryBean.class, required = false),

http://git-wip-us.apache.org/repos/asf/camel/blob/ce007953/components/camel-spring/src/main/java/org/apache/camel/spring/handler/CamelNamespaceHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/main/java/org/apache/camel/spring/handler/CamelNamespaceHandler.java b/components/camel-spring/src/main/java/org/apache/camel/spring/handler/CamelNamespaceHandler.java
index 855d6d3..7470126 100644
--- a/components/camel-spring/src/main/java/org/apache/camel/spring/handler/CamelNamespaceHandler.java
+++ b/components/camel-spring/src/main/java/org/apache/camel/spring/handler/CamelNamespaceHandler.java
@@ -40,6 +40,7 @@ import org.apache.camel.impl.DefaultCamelContextNameStrategy;
 import org.apache.camel.model.FromDefinition;
 import org.apache.camel.model.HystrixConfigurationDefinition;
 import org.apache.camel.model.SendDefinition;
+import org.apache.camel.model.remote.ConsulConfigurationDefinition;
 import org.apache.camel.model.remote.KubernetesConfigurationDefinition;
 import org.apache.camel.model.remote.RibbonConfigurationDefinition;
 import org.apache.camel.spi.CamelContextNameStrategy;
@@ -146,6 +147,7 @@ public class CamelNamespaceHandler extends NamespaceHandlerSupport {
         addBeanDefinitionParser("streamCaching", CamelStreamCachingStrategyDefinition.class, false, false);
         addBeanDefinitionParser("propertyPlaceholder", CamelPropertyPlaceholderDefinition.class, false, false);
         addBeanDefinitionParser("hystrixConfiguration", HystrixConfigurationDefinition.class, false, false);
+        addBeanDefinitionParser("consulConfiguration", ConsulConfigurationDefinition.class, false, false);
         addBeanDefinitionParser("kubernetesConfiguration", KubernetesConfigurationDefinition.class, false, false);
         addBeanDefinitionParser("ribbonConfiguration", RibbonConfigurationDefinition.class, false, false);