You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2017/04/05 05:06:08 UTC

[1/3] camel git commit: CAMEL-11081: camel-consul: add support for additional http api

Repository: camel
Updated Branches:
  refs/heads/master a8a6b74c8 -> 339526303


http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulSessionProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulSessionProducer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulSessionProducer.java
new file mode 100644
index 0000000..baff53e
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulSessionProducer.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.enpoint;
+
+import com.orbitz.consul.Consul;
+import com.orbitz.consul.SessionClient;
+import com.orbitz.consul.model.session.Session;
+import org.apache.camel.InvokeOnHeader;
+import org.apache.camel.Message;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.component.consul.ConsulConstants;
+import org.apache.camel.component.consul.ConsulEndpoint;
+import org.apache.camel.util.ObjectHelper;
+
+public final class ConsulSessionProducer extends AbstractConsulProducer<SessionClient> {
+
+    public ConsulSessionProducer(ConsulEndpoint endpoint, ConsulConfiguration configuration) {
+        super(endpoint, configuration, Consul::sessionClient);
+    }
+
+    @InvokeOnHeader(ConsulSessionActions.CREATE)
+    protected void create(Message message) throws Exception {
+        setBodyAndResult(
+            message,
+            getClient().createSession(
+                message.getMandatoryBody(Session.class),
+                message.getHeader(ConsulConstants.CONSUL_DATACENTER, String.class)
+            )
+        );
+    }
+
+    @InvokeOnHeader(ConsulSessionActions.DESTROY)
+    protected void destroy(Message message) throws Exception {
+        String sessionId = message.getHeader(ConsulConstants.CONSUL_SESSION, String.class);
+
+        if (ObjectHelper.isEmpty(sessionId)) {
+            getClient().destroySession(
+                message.getMandatoryBody(String.class),
+                message.getHeader(ConsulConstants.CONSUL_DATACENTER, String.class)
+            );
+        } else {
+            getClient().destroySession(
+                sessionId,
+                message.getHeader(ConsulConstants.CONSUL_DATACENTER, String.class)
+            );
+        }
+
+        setBodyAndResult(message, null, true);
+    }
+
+    @InvokeOnHeader(ConsulSessionActions.INFO)
+    protected void info(Message message) throws Exception {
+        String sessionId = message.getHeader(ConsulConstants.CONSUL_SESSION, String.class);
+
+        if (ObjectHelper.isEmpty(sessionId)) {
+            setBodyAndResult(
+                message,
+                getClient().getSessionInfo(
+                    message.getMandatoryBody(String.class),
+                    message.getHeader(ConsulConstants.CONSUL_DATACENTER, String.class)
+                ).orNull()
+            );
+        } else {
+            setBodyAndResult(
+                message,
+                getClient().getSessionInfo(
+                    sessionId,
+                    message.getHeader(ConsulConstants.CONSUL_DATACENTER, String.class)
+                ).orNull()
+            );
+        }
+    }
+
+    @InvokeOnHeader(ConsulSessionActions.LIST)
+    protected void list(Message message) throws Exception {
+        setBodyAndResult(
+            message,
+            getClient().listSessions(
+                message.getHeader(ConsulConstants.CONSUL_DATACENTER, String.class)
+            )
+        );
+    }
+
+    @InvokeOnHeader(ConsulSessionActions.RENEW)
+    protected void renew(Message message) throws Exception {
+        String sessionId = message.getHeader(ConsulConstants.CONSUL_SESSION, String.class);
+
+        if (ObjectHelper.isEmpty(sessionId)) {
+            setBodyAndResult(
+                message,
+                getClient().renewSession(
+                    message.getHeader(ConsulConstants.CONSUL_DATACENTER, String.class),
+                    message.getMandatoryBody(String.class)
+                )
+            );
+        } else {
+            setBodyAndResult(
+                message,
+                getClient().renewSession(
+                    message.getHeader(ConsulConstants.CONSUL_DATACENTER, String.class),
+                    sessionId
+                )
+            );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulStatusActions.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulStatusActions.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulStatusActions.java
new file mode 100644
index 0000000..7ba0ac5
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulStatusActions.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.enpoint;
+
+public interface ConsulStatusActions {
+    String LEADER = "LEADER";
+    String PEERS = "PEERS";
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulStatusProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulStatusProducer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulStatusProducer.java
new file mode 100644
index 0000000..490ed4e
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulStatusProducer.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.enpoint;
+
+import com.orbitz.consul.Consul;
+import com.orbitz.consul.StatusClient;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.component.consul.ConsulEndpoint;
+
+public final class ConsulStatusProducer extends AbstractConsulProducer<StatusClient> {
+
+    public ConsulStatusProducer(ConsulEndpoint endpoint, ConsulConfiguration configuration) {
+        super(endpoint, configuration, Consul::statusClient);
+
+        bind(ConsulStatusActions.LEADER, wrap(c -> c.getLeader()));
+        bind(ConsulStatusActions.PEERS, wrap(c -> c.getPeers()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulCatalogTest.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulCatalogTest.java b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulCatalogTest.java
new file mode 100644
index 0000000..e539c25
--- /dev/null
+++ b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulCatalogTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import java.util.List;
+
+import com.orbitz.consul.model.health.Node;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.consul.enpoint.ConsulCatalogActions;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ConsulCatalogTest extends ConsulTestSupport {
+
+    @Test
+    public void testListDatacenters() throws Exception {
+        List<String> ref = getConsul().catalogClient().getDatacenters();
+        List<String> res = fluentTemplate()
+            .withHeader(ConsulConstants.CONSUL_ACTION, ConsulCatalogActions.LIST_DATACENTERS)
+            .to("direct:consul")
+            .request(List.class);
+
+        Assert.assertFalse(ref.isEmpty());
+        Assert.assertFalse(res.isEmpty());
+        Assert.assertEquals(ref, res);
+    }
+
+    @Test
+    public void testListNodes() throws Exception {
+        List<Node> ref = getConsul().catalogClient().getNodes().getResponse();
+        List<Node> res = fluentTemplate()
+            .withHeader(ConsulConstants.CONSUL_ACTION, ConsulCatalogActions.LIST_NODES)
+            .to("direct:consul")
+            .request(List.class);
+
+        Assert.assertFalse(ref.isEmpty());
+        Assert.assertFalse(res.isEmpty());
+        Assert.assertEquals(ref, res);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:consul")
+                    .to("consul:catalog");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulCoordinatesTest.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulCoordinatesTest.java b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulCoordinatesTest.java
new file mode 100644
index 0000000..0cc7808
--- /dev/null
+++ b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulCoordinatesTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import java.util.List;
+
+import com.orbitz.consul.model.coordinate.Coordinate;
+import com.orbitz.consul.model.coordinate.Datacenter;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.consul.enpoint.ConsulCoordinatesActions;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ConsulCoordinatesTest extends ConsulTestSupport {
+
+    @Test
+    public void testDatacenters() throws Exception {
+        List<Datacenter> ref = getConsul().coordinateClient().getDatacenters();
+        List<Datacenter> res = fluentTemplate()
+            .withHeader(ConsulConstants.CONSUL_ACTION, ConsulCoordinatesActions.DATACENTERS)
+            .to("direct:consul")
+            .request(List.class);
+
+        Assert.assertFalse(ref.isEmpty());
+        Assert.assertFalse(res.isEmpty());
+        Assert.assertEquals(ref, res);
+    }
+    
+    @Test
+    public void testNodes() throws Exception {
+        List<Coordinate> ref = getConsul().coordinateClient().getNodes();
+        List<Coordinate> res = fluentTemplate()
+            .withHeader(ConsulConstants.CONSUL_ACTION, ConsulCoordinatesActions.NODES)
+            .to("direct:consul")
+            .request(List.class);
+
+        Assert.assertFalse(ref.isEmpty());
+        Assert.assertFalse(res.isEmpty());
+        Assert.assertEquals(ref, res);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:consul")
+                    .to("consul:coordinates");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulHealthTest.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulHealthTest.java b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulHealthTest.java
new file mode 100644
index 0000000..6d28913
--- /dev/null
+++ b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulHealthTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import com.orbitz.consul.AgentClient;
+import com.orbitz.consul.model.agent.ImmutableRegistration;
+import com.orbitz.consul.model.agent.Registration;
+import com.orbitz.consul.model.health.ServiceHealth;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.consul.enpoint.ConsulHealthActions;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ConsulHealthTest extends ConsulTestSupport {
+    private AgentClient client;
+    private List<Registration> registrations;
+    private String service;
+
+    // *************************************************************************
+    // Setup / tear down
+    // *************************************************************************
+
+    @Override
+    public void doPreSetup() throws Exception {
+        Random random = new Random();
+
+        this.service = UUID.randomUUID().toString();
+        this.client = getConsul().agentClient();
+        this.registrations = Arrays.asList(
+            ImmutableRegistration.builder()
+                .id(UUID.randomUUID().toString())
+                .name(this.service)
+                .address("127.0.0.1")
+                .port(random.nextInt(10000))
+                .build(),
+            ImmutableRegistration.builder()
+                .id(UUID.randomUUID().toString())
+                .name(this.service)
+                .address("127.0.0.1")
+                .port(random.nextInt(10000))
+                .build()
+        );
+
+        this.registrations.forEach(client::register);
+        super.doPreSetup();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        registrations.forEach(r -> client.deregister(r.getId()));
+    }
+
+    // *************************************************************************
+    // Test
+    // *************************************************************************
+
+    @Test
+    public void testServiceInstance() throws Exception {
+        List<ServiceHealth> ref = getConsul().healthClient().getAllServiceInstances(this.service).getResponse();
+        List<ServiceHealth> res = fluentTemplate()
+            .withHeader(ConsulConstants.CONSUL_ACTION, ConsulHealthActions.SERVICE_INSTANCES)
+            .withHeader(ConsulConstants.CONSUL_SERVICE, this.service)
+            .to("direct:consul")
+            .request(List.class);
+
+        Assert.assertEquals(2, ref.size());
+        Assert.assertEquals(2, res.size());
+        Assert.assertEquals(ref, res);
+
+        assertTrue(registrations.stream().anyMatch(
+            r -> r.getPort().isPresent() && r.getPort().get() == res.get(0).getService().getPort() && r.getId().equalsIgnoreCase(res.get(0).getService().getId())
+        ));
+        assertTrue(registrations.stream().anyMatch(
+            r -> r.getPort().isPresent() && r.getPort().get() == res.get(1).getService().getPort() && r.getId().equalsIgnoreCase(res.get(1).getService().getId())
+        ));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:consul")
+                    .to("consul:health");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulSessionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulSessionTest.java b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulSessionTest.java
new file mode 100644
index 0000000..51631ad
--- /dev/null
+++ b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulSessionTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import java.util.List;
+import java.util.UUID;
+
+import com.orbitz.consul.model.session.ImmutableSession;
+import com.orbitz.consul.model.session.SessionCreatedResponse;
+import com.orbitz.consul.model.session.SessionInfo;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.consul.enpoint.ConsulSessionActions;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ConsulSessionTest extends ConsulTestSupport {
+
+    @Test
+    public void testServiceInstance() throws Exception {
+        final String name = UUID.randomUUID().toString();
+        final int sessions = getConsul().sessionClient().listSessions().size();
+
+        {
+            List<SessionInfo> list = fluentTemplate()
+                .withHeader(ConsulConstants.CONSUL_ACTION, ConsulSessionActions.LIST)
+                .to("direct:consul")
+                .request(List.class);
+
+            Assert.assertEquals(sessions, list.size());
+            Assert.assertFalse(list.stream().anyMatch(s -> s.getName().isPresent() && s.getName().get().equals(name)));
+        }
+
+        SessionCreatedResponse res = fluentTemplate()
+            .withHeader(ConsulConstants.CONSUL_ACTION, ConsulSessionActions.CREATE)
+            .withBody(ImmutableSession.builder().name(name).build())
+            .to("direct:consul")
+            .request(SessionCreatedResponse.class);
+
+        Assert.assertNotNull(res.getId());
+
+        {
+            List<SessionInfo> list = fluentTemplate()
+                .withHeader(ConsulConstants.CONSUL_ACTION, ConsulSessionActions.LIST)
+                .to("direct:consul")
+                .request(List.class);
+
+            Assert.assertEquals(sessions + 1, list.size());
+            Assert.assertTrue(list.stream().anyMatch(s -> s.getName().isPresent() && s.getName().get().equals(name)));
+        }
+
+        {
+            fluentTemplate()
+                .withHeader(ConsulConstants.CONSUL_ACTION, ConsulSessionActions.DESTROY)
+                .withHeader(ConsulConstants.CONSUL_SESSION, res.getId())
+                .to("direct:consul")
+                .send();
+
+            List<SessionInfo> list = fluentTemplate()
+                .withHeader(ConsulConstants.CONSUL_ACTION, ConsulSessionActions.LIST)
+                .to("direct:consul")
+                .request(List.class);
+
+            Assert.assertEquals(sessions, list.size());
+            Assert.assertFalse(list.stream().anyMatch(s -> s.getName().isPresent() && s.getName().get().equals(name)));
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:consul")
+                    .to("consul:session");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index f855024..602714c 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -150,7 +150,7 @@
     <commons-pool2-version>2.4.2</commons-pool2-version>
     <commons-vfs2-version>2.0</commons-vfs2-version>
     <compress-lzf-version>1.0.3</compress-lzf-version>
-    <consul-client-version>0.13.11</consul-client-version>
+    <consul-client-version>0.14.0</consul-client-version>
     <consul-client-bundle-version>0.13.11_1</consul-client-bundle-version>
     <cobertura-maven-plugin-version>2.7</cobertura-maven-plugin-version>
     <couchbase-client-version>1.4.12</couchbase-client-version>

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/platforms/spring-boot/components-starter/camel-consul-starter/src/main/java/org/apache/camel/component/consul/springboot/ConsulComponentConfiguration.java
----------------------------------------------------------------------
diff --git a/platforms/spring-boot/components-starter/camel-consul-starter/src/main/java/org/apache/camel/component/consul/springboot/ConsulComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-consul-starter/src/main/java/org/apache/camel/component/consul/springboot/ConsulComponentConfiguration.java
index 519a1b6..83fdb87 100644
--- a/platforms/spring-boot/components-starter/camel-consul-starter/src/main/java/org/apache/camel/component/consul/springboot/ConsulComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-consul-starter/src/main/java/org/apache/camel/component/consul/springboot/ConsulComponentConfiguration.java
@@ -16,7 +16,9 @@
  */
 package org.apache.camel.component.consul.springboot;
 
+import java.util.List;
 import java.util.Set;
+import com.orbitz.consul.option.ConsistencyMode;
 import org.apache.camel.CamelContext;
 import org.apache.camel.util.jsse.SSLContextParameters;
 import org.springframework.boot.context.properties.ConfigurationProperties;
@@ -155,6 +157,18 @@ public class ConsulComponentConfiguration {
          */
         private String datacenter;
         /**
+         * The near node to use for queries.
+         */
+        private String nearNode;
+        /**
+         * The note meta-data to use for queries.
+         */
+        private List nodeMeta;
+        /**
+         * The consistencyMode used for queries, default ConsistencyMode.DEFAULT
+         */
+        private ConsistencyMode consistencyMode;
+        /**
          * Set tags. You can separate multiple tags by comma.
          */
         private Set tags;
@@ -196,12 +210,12 @@ public class ConsulComponentConfiguration {
          * Configure if the AgentClient should attempt a ping before returning
          * the Consul instance
          */
-        private Boolean pingInstance = true;
+        private Boolean pingInstance;
         /**
          * Default to transform values retrieved from Consul i.e. on KV endpoint
          * to string.
          */
-        private Boolean valueAsString = false;
+        private Boolean valueAsString;
         /**
          * The default key. Can be overridden by CamelConsulKey
          */
@@ -209,15 +223,15 @@ public class ConsulComponentConfiguration {
         /**
          * The second to wait for a watch event, default 10 seconds
          */
-        private Integer blockSeconds = 10;
+        private Integer blockSeconds;
         /**
          * The first index for watch for, default 0
          */
-        private Long firstIndex = 0L;
+        private Long firstIndex;
         /**
          * Recursively watch, default false
          */
-        private Boolean recursive = false;
+        private Boolean recursive;
 
         public CamelContext getCamelContext() {
             return camelContext;
@@ -254,6 +268,30 @@ public class ConsulComponentConfiguration {
             this.datacenter = datacenter;
         }
 
+        public String getNearNode() {
+            return nearNode;
+        }
+
+        public void setNearNode(String nearNode) {
+            this.nearNode = nearNode;
+        }
+
+        public List getNodeMeta() {
+            return nodeMeta;
+        }
+
+        public void setNodeMeta(List nodeMeta) {
+            this.nodeMeta = nodeMeta;
+        }
+
+        public ConsistencyMode getConsistencyMode() {
+            return consistencyMode;
+        }
+
+        public void setConsistencyMode(ConsistencyMode consistencyMode) {
+            this.consistencyMode = consistencyMode;
+        }
+
         public Set getTags() {
             return tags;
         }


[2/3] camel git commit: CAMEL-11081: camel-consul: add support for additional http api

Posted by lb...@apache.org.
CAMEL-11081: camel-consul: add support for additional http api


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e7063798
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e7063798
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e7063798

Branch: refs/heads/master
Commit: e706379876127c23f22e56d2ea0740a4b13bb5e1
Parents: a8a6b74
Author: lburgazzoli <lb...@gmail.com>
Authored: Tue Apr 4 18:21:29 2017 +0200
Committer: lburgazzoli <lb...@gmail.com>
Committed: Tue Apr 4 18:21:53 2017 +0200

----------------------------------------------------------------------
 .../src/main/docs/consul-component.adoc         |  19 +--
 .../consul/AbstractConsulConsumer.java          | 107 ---------------
 .../consul/AbstractConsulProducer.java          | 128 ------------------
 .../camel/component/consul/ConsulComponent.java |  68 ++++++----
 .../component/consul/ConsulConfiguration.java   |  41 ++++++
 .../camel/component/consul/ConsulConstants.java |  13 ++
 .../camel/component/consul/ConsulEndpoint.java  |  31 ++---
 .../camel/component/consul/ConsulFactories.java |  33 +++++
 .../consul/cloud/ConsulServiceDiscovery.java    |  16 +--
 .../consul/enpoint/AbstractConsulConsumer.java  | 110 ++++++++++++++++
 .../consul/enpoint/AbstractConsulProducer.java  | 131 +++++++++++++++++++
 .../consul/enpoint/ConsulAgentProducer.java     |   6 +-
 .../consul/enpoint/ConsulCatalogActions.java    |  27 ++++
 .../consul/enpoint/ConsulCatalogProducer.java   |  94 +++++++++++++
 .../enpoint/ConsulCoordinatesActions.java       |  22 ++++
 .../enpoint/ConsulCoordinatesProducer.java      |  50 +++++++
 .../consul/enpoint/ConsulEventConsumer.java     |   6 +-
 .../consul/enpoint/ConsulEventProducer.java     |  15 ++-
 .../consul/enpoint/ConsulHealthActions.java     |  24 ++++
 .../consul/enpoint/ConsulHealthProducer.java    |  89 +++++++++++++
 .../consul/enpoint/ConsulKeyValueConsumer.java  |   6 +-
 .../consul/enpoint/ConsulKeyValueProducer.java  |  58 +++++---
 .../enpoint/ConsulPreparedQueryActions.java     |  23 ++++
 .../enpoint/ConsulPreparedQueryProducer.java    |  82 ++++++++++++
 .../consul/enpoint/ConsulSessionActions.java    |  25 ++++
 .../consul/enpoint/ConsulSessionProducer.java   | 120 +++++++++++++++++
 .../consul/enpoint/ConsulStatusActions.java     |  22 ++++
 .../consul/enpoint/ConsulStatusProducer.java    |  32 +++++
 .../component/consul/ConsulCatalogTest.java     |  64 +++++++++
 .../component/consul/ConsulCoordinatesTest.java |  65 +++++++++
 .../component/consul/ConsulHealthTest.java      | 107 +++++++++++++++
 .../component/consul/ConsulSessionTest.java     |  91 +++++++++++++
 parent/pom.xml                                  |   2 +-
 .../ConsulComponentConfiguration.java           |  48 ++++++-
 34 files changed, 1425 insertions(+), 350 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/main/docs/consul-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/docs/consul-component.adoc b/components/camel-consul/src/main/docs/consul-component.adoc
index 9d7080f..10c66d4 100644
--- a/components/camel-consul/src/main/docs/consul-component.adoc
+++ b/components/camel-consul/src/main/docs/consul-component.adoc
@@ -73,32 +73,15 @@ with the following path and query parameters:
 | **apiEndpoint** | *Required* The API endpoint |  | String
 |=======================================================================
 
-#### Query Parameters (21 parameters):
+#### Query Parameters (4 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |=======================================================================
 | Name | Description | Default | Type
-| **connectTimeoutMillis** (common) | Connect timeout for OkHttpClient |  | Long
-| **key** (common) | The default key. Can be overridden by CamelConsulKey |  | String
-| **pingInstance** (common) | Configure if the AgentClient should attempt a ping before returning the Consul instance | true | boolean
-| **readTimeoutMillis** (common) | Read timeout for OkHttpClient |  | Long
-| **tags** (common) | Set tags. You can separate multiple tags by comma. |  | String
-| **url** (common) | The Consul agent URL |  | String
-| **writeTimeoutMillis** (common) | Write timeout for OkHttpClient |  | Long
 | **bridgeErrorHandler** (consumer) | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored. | false | boolean
 | **exceptionHandler** (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
-| **action** (producer) | The default action. Can be overridden by CamelConsulAction |  | String
-| **valueAsString** (producer) | Default to transform values retrieved from Consul i.e. on KV endpoint to string. | false | boolean
-| **datacenter** (advanced) | The data center |  | String
 | **synchronous** (advanced) | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). | false | boolean
-| **blockSeconds** (watch) | The second to wait for a watch event default 10 seconds | 10 | Integer
-| **firstIndex** (watch) | The first index for watch for default 0 | 0 | long
-| **recursive** (watch) | Recursively watch default false | false | boolean
-| **aclToken** (security) | Sets the ACL token to be used with Consul |  | String
-| **password** (security) | Sets the password to be used for basic authentication |  | String
-| **sslContextParameters** (security) | SSL configuration using an org.apache.camel.util.jsse.SSLContextParameters instance. |  | SSLContextParameters
-| **userName** (security) | Sets the username to be used for basic authentication |  | String
 |=======================================================================
 // endpoint options: END
 

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulConsumer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulConsumer.java
deleted file mode 100644
index 685b7d1..0000000
--- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulConsumer.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.consul;
-
-import java.math.BigInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
-
-import com.orbitz.consul.Consul;
-import org.apache.camel.Processor;
-import org.apache.camel.impl.DefaultConsumer;
-import org.apache.camel.util.ObjectHelper;
-
-public abstract class AbstractConsulConsumer<C> extends DefaultConsumer {
-    protected final ConsulEndpoint endpoint;
-    protected final ConsulConfiguration configuration;
-    protected final String key;
-    protected final AtomicReference<BigInteger> index;
-
-    private final Function<Consul, C> clientSupplier;
-    private Runnable watcher;
-
-    protected AbstractConsulConsumer(ConsulEndpoint endpoint, ConsulConfiguration configuration, Processor processor, Function<Consul, C> clientSupplier) {
-        super(endpoint, processor);
-
-        this.endpoint = endpoint;
-        this.configuration = configuration;
-        this.key = ObjectHelper.notNull(configuration.getKey(), ConsulConstants.CONSUL_KEY);
-        this.index = new AtomicReference<>(BigInteger.valueOf(configuration.getFirstIndex()));
-        this.clientSupplier = clientSupplier;
-        this.watcher = null;
-    }
-
-    @Override
-    protected void doStart() throws Exception {
-        super.doStart();
-
-        watcher = createWatcher(clientSupplier.apply(endpoint.getConsul()));
-        watcher.run();
-    }
-
-    @Override
-    protected void doStop() throws Exception {
-        watcher = null;
-
-        super.doStop();
-    }
-
-    // *************************************************************************
-    //
-    // *************************************************************************
-
-    protected abstract Runnable createWatcher(C client) throws Exception;
-
-    // *************************************************************************
-    // Handlers
-    // *************************************************************************
-
-    protected abstract class AbstractWatcher implements Runnable {
-        private final C client;
-
-        public AbstractWatcher(C client) {
-            this.client = client;
-        }
-
-        protected void onError(Throwable throwable) {
-            if (isRunAllowed()) {
-                getExceptionHandler().handleException("Error watching for event " + key, throwable);
-            }
-        }
-
-        protected final void setIndex(BigInteger responseIndex) {
-            index.set(responseIndex);
-        }
-
-        @Override
-        public final void run() {
-            if (isRunAllowed()) {
-                watch(client);
-            }
-        }
-
-        protected final C client() {
-            return client;
-        }
-
-        protected final void watch() {
-            watch(client);
-        }
-
-        protected abstract void watch(C client);
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulProducer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulProducer.java
deleted file mode 100644
index 1df4f29..0000000
--- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulProducer.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.consul;
-
-import java.util.function.Function;
-
-import com.orbitz.consul.Consul;
-import org.apache.camel.Message;
-import org.apache.camel.NoSuchHeaderException;
-import org.apache.camel.Processor;
-import org.apache.camel.impl.HeaderSelectorProducer;
-
-public abstract class AbstractConsulProducer<C> extends HeaderSelectorProducer {
-    private final ConsulEndpoint endpoint;
-    private final ConsulConfiguration configuration;
-    private final Function<Consul, C> clientSupplier;
-    private C client;
-
-    protected AbstractConsulProducer(ConsulEndpoint endpoint, ConsulConfiguration configuration, Function<Consul, C> clientSupplier) {
-        super(endpoint, ConsulConstants.CONSUL_ACTION, configuration.getAction());
-
-        this.endpoint = endpoint;
-        this.configuration = configuration;
-        this.clientSupplier = clientSupplier;
-        this.client = null;
-    }
-
-    // *************************************************************************
-    //
-    // *************************************************************************
-
-    protected Consul getConsul() throws Exception {
-        return endpoint.getConsul();
-    }
-
-    protected C getClient() throws Exception {
-        if (client == null) {
-            client = clientSupplier.apply(getConsul());
-        }
-
-        return client;
-    }
-
-    protected ConsulConfiguration getConfiguration() {
-        return configuration;
-    }
-
-    protected <D> D getHeader(Message message, String header, D defaultValue, Class<D> type) {
-        return message.getHeader(header, defaultValue, type);
-    }
-
-    protected <D> D getMandatoryHeader(Message message, String header, Class<D> type) throws Exception {
-        return getMandatoryHeader(message, header, null, type);
-    }
-
-    protected <D> D getMandatoryHeader(Message message, String header, D defaultValue, Class<D> type) throws Exception {
-        D value = getHeader(message, header, defaultValue, type);
-        if (value == null) {
-            throw new NoSuchHeaderException(message.getExchange(), header, type);
-        }
-
-        return value;
-    }
-
-    protected String getKey(Message message) {
-        return message.getHeader(
-            ConsulConstants.CONSUL_KEY,
-            configuration.getKey(),
-            String.class);
-    }
-
-    protected String getMandatoryKey(Message message) throws Exception {
-        return getMandatoryHeader(
-            message,
-            ConsulConstants.CONSUL_KEY,
-            configuration.getKey(),
-            String.class);
-    }
-
-    protected <T> T getOption(Message message, T defaultValue, Class<T> type) {
-        return message.getHeader(ConsulConstants.CONSUL_OPTIONS, defaultValue, type);
-    }
-
-    protected boolean isValueAsString(Message message) throws Exception {
-        return message.getHeader(
-            ConsulConstants.CONSUL_VALUE_AS_STRING,
-            configuration.isValueAsString(),
-            Boolean.class);
-    }
-
-    protected <T> T getBody(Message message, T defaultValue, Class<T> type) throws Exception {
-        T body = message.getBody(type);
-        if (body == null) {
-            body = defaultValue;
-        }
-
-        return  body;
-    }
-
-    protected void setBodyAndResult(Message message, Object body) throws Exception {
-        setBodyAndResult(message, body, body != null);
-    }
-
-    protected void setBodyAndResult(Message message, Object body, boolean result) throws Exception {
-        message.setHeader(ConsulConstants.CONSUL_RESULT, result);
-        if (body != null) {
-            message.setBody(body);
-        }
-    }
-
-    protected Processor wrap(Function<C, Object> supplier) {
-        return exchange -> setBodyAndResult(exchange.getIn(), supplier.apply(getClient()));
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulComponent.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulComponent.java
index ccacf8a..4597cf7 100644
--- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulComponent.java
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulComponent.java
@@ -22,10 +22,16 @@ import java.util.Optional;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.component.consul.enpoint.ConsulAgentProducer;
+import org.apache.camel.component.consul.enpoint.ConsulCatalogProducer;
+import org.apache.camel.component.consul.enpoint.ConsulCoordinatesProducer;
 import org.apache.camel.component.consul.enpoint.ConsulEventConsumer;
 import org.apache.camel.component.consul.enpoint.ConsulEventProducer;
+import org.apache.camel.component.consul.enpoint.ConsulHealthProducer;
 import org.apache.camel.component.consul.enpoint.ConsulKeyValueConsumer;
 import org.apache.camel.component.consul.enpoint.ConsulKeyValueProducer;
+import org.apache.camel.component.consul.enpoint.ConsulPreparedQueryProducer;
+import org.apache.camel.component.consul.enpoint.ConsulSessionProducer;
+import org.apache.camel.component.consul.enpoint.ConsulStatusProducer;
 import org.apache.camel.impl.DefaultComponent;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.util.jsse.SSLContextParameters;
@@ -142,31 +148,45 @@ public class ConsulComponent extends DefaultComponent {
 
         setProperties(configuration, parameters);
 
-        return ConsulApiEndpoint.valueOf(remaining).create(remaining, uri, this, configuration);
-    }
-
-    // Consul Api Enpoints (see https://www.consul.io/docs/agent/http.html)
-    private enum ConsulApiEndpoint {
-        kv(ConsulKeyValueProducer::new, ConsulKeyValueConsumer::new),
-        event(ConsulEventProducer::new, ConsulEventConsumer::new),
-        agent(ConsulAgentProducer::new, null);
-
-        private final Optional<ConsulEndpoint.ProducerFactory> producerFactory;
-        private final Optional<ConsulEndpoint.ConsumerFactory> consumerFactory;
-
-        ConsulApiEndpoint(ConsulEndpoint.ProducerFactory producerFactory, ConsulEndpoint.ConsumerFactory consumerFactory) {
-            this.producerFactory = Optional.ofNullable(producerFactory);
-            this.consumerFactory = Optional.ofNullable(consumerFactory);
-        }
-
-        public Endpoint create(String apiEndpoint, String uri, ConsulComponent component, ConsulConfiguration configuration) throws Exception {
+        switch (remaining) {
+        case "kv":
+            return new ConsulEndpoint(
+                remaining, uri, this, configuration, Optional.of(ConsulKeyValueProducer::new), Optional.of(ConsulKeyValueConsumer::new)
+            );
+        case "event":
+            return new ConsulEndpoint(
+                remaining, uri, this, configuration, Optional.of(ConsulEventProducer::new), Optional.of(ConsulEventConsumer::new)
+            );
+        case "agent":
+            return new ConsulEndpoint(
+                remaining, uri, this, configuration, Optional.of(ConsulAgentProducer::new), Optional.empty()
+            );
+        case "coordinates":
+            return new ConsulEndpoint(
+                remaining, uri, this, configuration, Optional.of(ConsulCoordinatesProducer::new), Optional.empty()
+            );
+        case "health":
+            return new ConsulEndpoint(
+                remaining, uri, this, configuration, Optional.of(ConsulHealthProducer::new), Optional.empty()
+            );
+        case "status":
+            return new ConsulEndpoint(
+                remaining, uri, this, configuration, Optional.of(ConsulStatusProducer::new), Optional.empty()
+            );
+        case "preparedQuery":
+            return new ConsulEndpoint(
+                remaining, uri, this, configuration, Optional.of(ConsulPreparedQueryProducer::new), Optional.empty()
+            );
+        case "catalog":
+            return new ConsulEndpoint(
+                remaining, uri, this, configuration, Optional.of(ConsulCatalogProducer::new), Optional.empty()
+            );
+        case "session":
             return new ConsulEndpoint(
-                apiEndpoint,
-                uri,
-                component,
-                configuration,
-                producerFactory,
-                consumerFactory);
+                remaining, uri, this, configuration, Optional.of(ConsulSessionProducer::new), Optional.empty()
+            );
+        default:
+            throw new IllegalArgumentException("Unknown apiEndpoint: " + remaining);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConfiguration.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConfiguration.java
index 34fa757..a385ffa 100644
--- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConfiguration.java
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConfiguration.java
@@ -18,9 +18,11 @@ package org.apache.camel.component.consul;
 
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 import com.orbitz.consul.Consul;
+import com.orbitz.consul.option.ConsistencyMode;
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.RuntimeCamelException;
@@ -35,6 +37,12 @@ public class ConsulConfiguration implements CamelContextAware, Cloneable {
     private String url;
     @UriParam(label = "advanced")
     private String datacenter;
+    @UriParam(label = "advanced")
+    private String nearNode;
+    @UriParam(label = "advanced")
+    private List<String> nodeMeta;
+    @UriParam(label = "advanced", defaultValue = "DEFAULT", enums = "DEFAULT,STALE,CONSISTENT")
+    private ConsistencyMode consistencyMode = ConsistencyMode.DEFAULT;
     @UriParam(javaType = "java.lang.String")
     private Set<String> tags;
 
@@ -130,6 +138,39 @@ public class ConsulConfiguration implements CamelContextAware, Cloneable {
         this.datacenter = datacenter;
     }
 
+    public String getNearNode() {
+        return nearNode;
+    }
+
+    /**
+     * The near node to use for queries.
+     */
+    public void setNearNode(String nearNode) {
+        this.nearNode = nearNode;
+    }
+
+    public List<String> getNodeMeta() {
+        return nodeMeta;
+    }
+
+    /**
+     * The note meta-data to use for queries.
+     */
+    public void setNodeMeta(List<String> nodeMeta) {
+        this.nodeMeta = nodeMeta;
+    }
+
+    public ConsistencyMode getConsistencyMode() {
+        return consistencyMode;
+    }
+
+    /**
+     * The consistencyMode used for queries, default ConsistencyMode.DEFAULT
+     */
+    public void setConsistencyMode(ConsistencyMode consistencyMode) {
+        this.consistencyMode = consistencyMode;
+    }
+
     public Set<String> getTags() {
         return tags;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java
index fd8ea39..858bb45 100644
--- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java
@@ -35,6 +35,8 @@ public interface ConsulConstants {
     String CONSUL_SERVICE_FILTER = "CamelConsulSessionFilter";
     String CONSUL_VERSION = "CamelConsulVersion";
     String CONSUL_FLAGS = "CamelConsulFlags";
+    String CONSUL_INDEX = "CamelConsulIndex";
+    String CONSUL_WAIT = "CamelConsulWait";
     String CONSUL_CREATE_INDEX = "CamelConsulCreateIndex";
     String CONSUL_LOCK_INDEX = "CamelConsulLockIndex";
     String CONSUL_MODIFY_INDEX = "CamelConsulModifyIndex";
@@ -42,4 +44,15 @@ public interface ConsulConstants {
     String CONSUL_RESULT = "CamelConsulResult";
     String CONSUL_SESSION = "CamelConsulSession";
     String CONSUL_VALUE_AS_STRING = "CamelConsulValueAsString";
+    String CONSUL_NODE = "CamelConsulNode";
+    String CONSUL_SERVICE = "CamelConsulService";
+    String CONSUL_DATACENTER = "CamelConsulDatacenter";
+    String CONSUL_NEAR_NODE = "CamelConsulNearNode";
+    String CONSUL_NODE_META = "CamelConsulNodeMeta";
+    String CONSUL_LAST_CONTACT = "CamelConsulLastContact";
+    String CONSUL_KNOWN_LEADER = "CamelConsulKnownLeader";
+    String CONSUL_CONSISTENCY_MODE = "CamelConsulConsistencyMode";
+    String CONSUL_HEALTHY_ONLY = "CamelConsulHealthyOnly";
+    String CONSUL_HEALTHY_STATE = "CamelConsulHealthyState";
+    String CONSUL_PREPARED_QUERY_ID = "CamelConsulPreparedQueryID";
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulEndpoint.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulEndpoint.java
index 0399830..c9eaccf 100644
--- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulEndpoint.java
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulEndpoint.java
@@ -25,7 +25,6 @@ import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
-import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 import org.apache.camel.util.ObjectHelper;
 
@@ -35,16 +34,16 @@ import org.apache.camel.util.ObjectHelper;
 @UriEndpoint(firstVersion = "2.18.0", scheme = "consul", title = "Consul", syntax = "consul:apiEndpoint", label = "api,cloud")
 public class ConsulEndpoint extends DefaultEndpoint {
 
-    @UriParam(description = "The consul configuration")
-    @Metadata
+    //@UriParam(description = "The consul configuration")
+    //@Metadata
     private final ConsulConfiguration configuration;
 
     @UriPath(description = "The API endpoint")
     @Metadata(required = "true")
     private final String apiEndpoint;
 
-    private final Optional<ProducerFactory> producerFactory;
-    private final Optional<ConsumerFactory> consumerFactory;
+    private final Optional<ConsulFactories.ProducerFactory> producerFactory;
+    private final Optional<ConsulFactories.ConsumerFactory> consumerFactory;
 
     private Consul consul;
 
@@ -53,8 +52,8 @@ public class ConsulEndpoint extends DefaultEndpoint {
             String uri,
             ConsulComponent component,
             ConsulConfiguration configuration,
-            Optional<ProducerFactory> producerFactory,
-            Optional<ConsumerFactory> consumerFactory) {
+            Optional<ConsulFactories.ProducerFactory> producerFactory,
+            Optional<ConsulFactories.ConsumerFactory> consumerFactory) {
 
         super(uri, component);
 
@@ -71,7 +70,7 @@ public class ConsulEndpoint extends DefaultEndpoint {
 
     @Override
     public Producer createProducer() throws Exception {
-        ProducerFactory factory = producerFactory.orElseThrow(
+        ConsulFactories.ProducerFactory factory = producerFactory.orElseThrow(
             () -> new IllegalArgumentException("No producer for " + apiEndpoint)
         );
 
@@ -80,7 +79,7 @@ public class ConsulEndpoint extends DefaultEndpoint {
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        ConsumerFactory factory = consumerFactory.orElseThrow(
+        ConsulFactories.ConsumerFactory factory = consumerFactory.orElseThrow(
             () -> new IllegalArgumentException("No consumer for " + apiEndpoint)
         );
 
@@ -106,18 +105,4 @@ public class ConsulEndpoint extends DefaultEndpoint {
 
         return consul;
     }
-
-    // *************************************************************************
-    //
-    // *************************************************************************
-
-    @FunctionalInterface
-    public interface ProducerFactory {
-        Producer create(ConsulEndpoint endpoint, ConsulConfiguration configuration) throws Exception;
-    }
-
-    @FunctionalInterface
-    public interface ConsumerFactory {
-        Consumer create(ConsulEndpoint endpoint, ConsulConfiguration configuration, Processor processor) throws Exception;
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulFactories.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulFactories.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulFactories.java
new file mode 100644
index 0000000..01cf918
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulFactories.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+
+public interface ConsulFactories {
+    @FunctionalInterface
+    interface ProducerFactory {
+        Producer create(ConsulEndpoint endpoint, ConsulConfiguration configuration) throws Exception;
+    }
+
+    @FunctionalInterface
+    interface ConsumerFactory {
+        Consumer create(ConsulEndpoint endpoint, ConsulConfiguration configuration, Processor processor) throws Exception;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/main/java/org/apache/camel/component/consul/cloud/ConsulServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/cloud/ConsulServiceDiscovery.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/cloud/ConsulServiceDiscovery.java
index fd3fe60..8cfa854 100644
--- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/cloud/ConsulServiceDiscovery.java
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/cloud/ConsulServiceDiscovery.java
@@ -25,8 +25,8 @@ import java.util.stream.Collectors;
 import com.orbitz.consul.Consul;
 import com.orbitz.consul.model.catalog.CatalogService;
 import com.orbitz.consul.model.health.ServiceHealth;
-import com.orbitz.consul.option.CatalogOptions;
-import com.orbitz.consul.option.ImmutableCatalogOptions;
+import com.orbitz.consul.option.ImmutableQueryOptions;
+import com.orbitz.consul.option.QueryOptions;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.cloud.ServiceDefinition;
 import org.apache.camel.component.consul.ConsulConfiguration;
@@ -38,25 +38,25 @@ import org.apache.camel.util.function.Suppliers;
 
 public final class ConsulServiceDiscovery extends DefaultServiceDiscovery {
     private final Supplier<Consul> client;
-    private final CatalogOptions catalogOptions;
+    private final QueryOptions queryOptions;
 
     public ConsulServiceDiscovery(ConsulConfiguration configuration) throws Exception {
         this.client = Suppliers.memorize(configuration::createConsulClient, this::rethrowAsRuntimeCamelException);
 
-        ImmutableCatalogOptions.Builder builder = ImmutableCatalogOptions.builder();
+        ImmutableQueryOptions.Builder builder = ImmutableQueryOptions.builder();
         ObjectHelper.ifNotEmpty(configuration.getDatacenter(), builder::datacenter);
-        ObjectHelper.ifNotEmpty(configuration.getTags(), tags -> tags.forEach(builder::tag));
+        ObjectHelper.ifNotEmpty(configuration.getTags(), builder::tag);
 
-        catalogOptions = builder.build();
+        queryOptions = builder.build();
     }
 
     @Override
     public List<ServiceDefinition> getServices(String name) {
         List<CatalogService> services = client.get().catalogClient()
-            .getService(name, catalogOptions)
+            .getService(name, queryOptions)
             .getResponse();
         List<ServiceHealth> healths = client.get().healthClient()
-            .getAllServiceInstances(name, catalogOptions)
+            .getAllServiceInstances(name, queryOptions)
             .getResponse();
 
         return services.stream()

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/AbstractConsulConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/AbstractConsulConsumer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/AbstractConsulConsumer.java
new file mode 100644
index 0000000..12f4ec6
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/AbstractConsulConsumer.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.enpoint;
+
+import java.math.BigInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import com.orbitz.consul.Consul;
+import org.apache.camel.Processor;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.component.consul.ConsulConstants;
+import org.apache.camel.component.consul.ConsulEndpoint;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.util.ObjectHelper;
+
+abstract class AbstractConsulConsumer<C> extends DefaultConsumer {
+    protected final ConsulEndpoint endpoint;
+    protected final ConsulConfiguration configuration;
+    protected final String key;
+    protected final AtomicReference<BigInteger> index;
+
+    private final Function<Consul, C> clientSupplier;
+    private Runnable watcher;
+
+    protected AbstractConsulConsumer(ConsulEndpoint endpoint, ConsulConfiguration configuration, Processor processor, Function<Consul, C> clientSupplier) {
+        super(endpoint, processor);
+
+        this.endpoint = endpoint;
+        this.configuration = configuration;
+        this.key = ObjectHelper.notNull(configuration.getKey(), ConsulConstants.CONSUL_KEY);
+        this.index = new AtomicReference<>(BigInteger.valueOf(configuration.getFirstIndex()));
+        this.clientSupplier = clientSupplier;
+        this.watcher = null;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        watcher = createWatcher(clientSupplier.apply(endpoint.getConsul()));
+        watcher.run();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        watcher = null;
+
+        super.doStop();
+    }
+
+    // *************************************************************************
+    //
+    // *************************************************************************
+
+    protected abstract Runnable createWatcher(C client) throws Exception;
+
+    // *************************************************************************
+    // Handlers
+    // *************************************************************************
+
+    protected abstract class AbstractWatcher implements Runnable {
+        private final C client;
+
+        public AbstractWatcher(C client) {
+            this.client = client;
+        }
+
+        protected void onError(Throwable throwable) {
+            if (isRunAllowed()) {
+                getExceptionHandler().handleException("Error watching for event " + key, throwable);
+            }
+        }
+
+        protected final void setIndex(BigInteger responseIndex) {
+            index.set(responseIndex);
+        }
+
+        @Override
+        public final void run() {
+            if (isRunAllowed()) {
+                watch(client);
+            }
+        }
+
+        protected final C client() {
+            return client;
+        }
+
+        protected final void watch() {
+            watch(client);
+        }
+
+        protected abstract void watch(C client);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/AbstractConsulProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/AbstractConsulProducer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/AbstractConsulProducer.java
new file mode 100644
index 0000000..8d2319d
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/AbstractConsulProducer.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.enpoint;
+
+import java.math.BigInteger;
+import java.util.List;
+import java.util.function.Function;
+
+import com.orbitz.consul.Consul;
+import com.orbitz.consul.model.ConsulResponse;
+import com.orbitz.consul.option.ConsistencyMode;
+import com.orbitz.consul.option.ImmutableQueryOptions;
+import com.orbitz.consul.option.QueryOptions;
+import org.apache.camel.Message;
+import org.apache.camel.NoSuchHeaderException;
+import org.apache.camel.Processor;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.component.consul.ConsulConstants;
+import org.apache.camel.component.consul.ConsulEndpoint;
+import org.apache.camel.impl.HeaderSelectorProducer;
+import org.apache.camel.util.ObjectHelper;
+
+abstract class AbstractConsulProducer<C> extends HeaderSelectorProducer {
+    private final ConsulEndpoint endpoint;
+    private final ConsulConfiguration configuration;
+    private final Function<Consul, C> clientSupplier;
+    private C client;
+
+    protected AbstractConsulProducer(ConsulEndpoint endpoint, ConsulConfiguration configuration, Function<Consul, C> clientSupplier) {
+        super(endpoint, ConsulConstants.CONSUL_ACTION, configuration.getAction());
+
+        this.endpoint = endpoint;
+        this.configuration = configuration;
+        this.clientSupplier = clientSupplier;
+        this.client = null;
+    }
+
+    // *************************************************************************
+    //
+    // *************************************************************************
+
+    protected C getClient() throws Exception {
+        if (client == null) {
+            client = clientSupplier.apply(endpoint.getConsul());
+        }
+
+        return client;
+    }
+
+    protected ConsulConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    protected <D> D getMandatoryHeader(Message message, String header, Class<D> type) throws Exception {
+        return getMandatoryHeader(message, header, (D)null, type);
+    }
+
+    protected <D> D getMandatoryHeader(Message message, String header, D defaultValue, Class<D> type) throws Exception {
+        D value = message.getHeader(header, defaultValue, type);
+        if (value == null) {
+            throw new NoSuchHeaderException(message.getExchange(), header, type);
+        }
+
+        return value;
+    }
+
+    protected QueryOptions buildQueryOptions(Message message, ConsulConfiguration conf) {
+        ImmutableQueryOptions.Builder builder = ImmutableQueryOptions.builder();
+
+        ObjectHelper.ifNotEmpty(
+            message.getHeader(ConsulConstants.CONSUL_INDEX, BigInteger.class),
+            builder::index);
+        ObjectHelper.ifNotEmpty(
+            message.getHeader(ConsulConstants.CONSUL_WAIT, String.class),
+            builder::wait);
+        ObjectHelper.ifNotEmpty(
+            message.getHeader(ConsulConstants.CONSUL_DATACENTER, conf.getDatacenter(), String.class),
+            builder::datacenter);
+        ObjectHelper.ifNotEmpty(
+            message.getHeader(ConsulConstants.CONSUL_NEAR_NODE, conf.getNearNode(), String.class),
+            builder::near);
+        ObjectHelper.ifNotEmpty(
+            conf.getAclToken(),
+            builder::token);
+        ObjectHelper.ifNotEmpty(
+            message.getHeader(ConsulConstants.CONSUL_CONSISTENCY_MODE, conf.getConsistencyMode(), ConsistencyMode.class),
+            builder::consistencyMode);
+        ObjectHelper.ifNotEmpty(
+            message.getHeader(ConsulConstants.CONSUL_NODE_META, conf.getNodeMeta(), List.class),
+            builder::nodeMeta);
+
+        return builder.build();
+    }
+
+    protected <T> void processConsulResponse(Message message, ConsulResponse<T> response) throws Exception {
+        message.setHeader(ConsulConstants.CONSUL_INDEX, response.getIndex());
+        message.setHeader(ConsulConstants.CONSUL_LAST_CONTACT, response.getLastContact());
+        message.setHeader(ConsulConstants.CONSUL_KNOWN_LEADER, response.isKnownLeader());
+
+        setBodyAndResult(message, response.getResponse());
+    }
+
+    protected void setBodyAndResult(Message message, Object body) throws Exception {
+        setBodyAndResult(message, body, body != null);
+    }
+
+    protected void setBodyAndResult(Message message, Object body, boolean result) throws Exception {
+        message.setHeader(ConsulConstants.CONSUL_RESULT, result);
+        if (body != null) {
+            message.setBody(body);
+        }
+    }
+
+    protected Processor wrap(Function<C, Object> supplier) {
+        return exchange -> setBodyAndResult(exchange.getIn(), supplier.apply(getClient()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentProducer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentProducer.java
index 4d5c489..cb8b84a 100644
--- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentProducer.java
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentProducer.java
@@ -17,14 +17,14 @@
 package org.apache.camel.component.consul.enpoint;
 
 import com.orbitz.consul.AgentClient;
-import org.apache.camel.component.consul.AbstractConsulProducer;
+import com.orbitz.consul.Consul;
 import org.apache.camel.component.consul.ConsulConfiguration;
 import org.apache.camel.component.consul.ConsulEndpoint;
 
-public class ConsulAgentProducer extends AbstractConsulProducer<AgentClient> {
+public final class ConsulAgentProducer extends AbstractConsulProducer<AgentClient> {
 
     public ConsulAgentProducer(ConsulEndpoint endpoint, ConsulConfiguration configuration) {
-        super(endpoint, configuration, c -> c.agentClient());
+        super(endpoint, configuration, Consul::agentClient);
 
         bind(ConsulAgentActions.CHECKS, wrap(c -> c.getChecks()));
         bind(ConsulAgentActions.SERVICES, wrap(c -> c.getServices()));

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulCatalogActions.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulCatalogActions.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulCatalogActions.java
new file mode 100644
index 0000000..3fda2a3
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulCatalogActions.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.enpoint;
+
+public interface ConsulCatalogActions {
+    String REGISTER = "REGISTER";
+    String DEREGISTER = "DEREGISTER";
+    String LIST_DATACENTERS = "LIST_DATACENTERS";
+    String LIST_NODES = "LIST_NODES";
+    String LIST_SERVICES = "LIST_SERVICES";
+    String GET_SERVICE = "GET_SERVICE";
+    String GET_NODE = "GET_NODE";
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulCatalogProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulCatalogProducer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulCatalogProducer.java
new file mode 100644
index 0000000..c083e3c
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulCatalogProducer.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.enpoint;
+
+import com.orbitz.consul.CatalogClient;
+import com.orbitz.consul.Consul;
+import com.orbitz.consul.model.catalog.CatalogDeregistration;
+import com.orbitz.consul.model.catalog.CatalogRegistration;
+import org.apache.camel.InvokeOnHeader;
+import org.apache.camel.Message;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.component.consul.ConsulConstants;
+import org.apache.camel.component.consul.ConsulEndpoint;
+
+public final class ConsulCatalogProducer extends AbstractConsulProducer<CatalogClient> {
+
+    public ConsulCatalogProducer(ConsulEndpoint endpoint, ConsulConfiguration configuration) {
+        super(endpoint, configuration, Consul::catalogClient);
+    }
+
+    @InvokeOnHeader(ConsulCatalogActions.REGISTER)
+    protected void register(Message message) throws Exception {
+
+        getClient().register(message.getMandatoryBody(CatalogRegistration.class));
+        setBodyAndResult(message, null);
+    }
+
+    @InvokeOnHeader(ConsulCatalogActions.DEREGISTER)
+    protected void deregister(Message message) throws Exception {
+        getClient().deregister(message.getMandatoryBody(CatalogDeregistration.class));
+        setBodyAndResult(message, null);
+    }
+
+    @InvokeOnHeader(ConsulCatalogActions.LIST_DATACENTERS)
+    protected void listDatacenters(Message message) throws Exception {
+        setBodyAndResult(message, getClient().getDatacenters());
+    }
+
+    @InvokeOnHeader(ConsulCatalogActions.LIST_NODES)
+    protected void listNodes(Message message) throws Exception {
+        processConsulResponse(
+            message,
+            getClient().getNodes(
+                buildQueryOptions(message, getConfiguration())
+            )
+        );
+    }
+
+    @InvokeOnHeader(ConsulCatalogActions.LIST_SERVICES)
+    protected void listServices(Message message) throws Exception {
+        processConsulResponse(
+            message,
+            getClient().getNodes(
+                buildQueryOptions(message, getConfiguration())
+            )
+        );
+    }
+
+    @InvokeOnHeader(ConsulCatalogActions.GET_SERVICE)
+    protected void getService(Message message) throws Exception {
+        processConsulResponse(
+            message,
+            getClient().getService(
+                getMandatoryHeader(message, ConsulConstants.CONSUL_SERVICE, String.class),
+                buildQueryOptions(message, getConfiguration())
+            )
+        );
+    }
+
+    @InvokeOnHeader(ConsulCatalogActions.GET_NODE)
+    protected void getNode(Message message) throws Exception {
+        processConsulResponse(
+            message,
+            getClient().getNode(
+                getMandatoryHeader(message, ConsulConstants.CONSUL_NODE, String.class),
+                buildQueryOptions(message, getConfiguration())
+            )
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulCoordinatesActions.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulCoordinatesActions.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulCoordinatesActions.java
new file mode 100644
index 0000000..cab838e
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulCoordinatesActions.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.enpoint;
+
+public interface ConsulCoordinatesActions {
+    String DATACENTERS = "DATACENTERS";
+    String NODES = "NODES";
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulCoordinatesProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulCoordinatesProducer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulCoordinatesProducer.java
new file mode 100644
index 0000000..ab7d3a2
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulCoordinatesProducer.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.enpoint;
+
+import com.orbitz.consul.Consul;
+import com.orbitz.consul.CoordinateClient;
+import org.apache.camel.InvokeOnHeader;
+import org.apache.camel.Message;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.component.consul.ConsulConstants;
+import org.apache.camel.component.consul.ConsulEndpoint;
+
+public final class ConsulCoordinatesProducer extends AbstractConsulProducer<CoordinateClient> {
+
+    public ConsulCoordinatesProducer(ConsulEndpoint endpoint, ConsulConfiguration configuration) {
+        super(endpoint, configuration, Consul::coordinateClient);
+    }
+
+    @InvokeOnHeader(ConsulCoordinatesActions.DATACENTERS)
+    protected void datacenters(Message message) throws Exception {
+        setBodyAndResult(
+            message,
+            getClient().getDatacenters()
+        );
+    }
+
+    @InvokeOnHeader(ConsulCoordinatesActions.NODES)
+    protected void nodes(Message message) throws Exception {
+        setBodyAndResult(
+            message,
+            getClient().getNodes(
+                message.getHeader(ConsulConstants.CONSUL_DATACENTER, getConfiguration().getDatacenter(), String.class)
+            )
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventConsumer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventConsumer.java
index b6b51fe..9c5f7a4 100644
--- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventConsumer.java
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventConsumer.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.consul.enpoint;
 import java.math.BigInteger;
 import java.util.List;
 
+import com.orbitz.consul.Consul;
 import com.orbitz.consul.EventClient;
 import com.orbitz.consul.async.EventResponseCallback;
 import com.orbitz.consul.model.EventResponse;
@@ -27,15 +28,14 @@ import com.orbitz.consul.option.QueryOptions;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
-import org.apache.camel.component.consul.AbstractConsulConsumer;
 import org.apache.camel.component.consul.ConsulConfiguration;
 import org.apache.camel.component.consul.ConsulConstants;
 import org.apache.camel.component.consul.ConsulEndpoint;
 
-public class ConsulEventConsumer extends AbstractConsulConsumer<EventClient> {
+public final class ConsulEventConsumer extends AbstractConsulConsumer<EventClient> {
 
     public ConsulEventConsumer(ConsulEndpoint endpoint, ConsulConfiguration configuration, Processor processor) {
-        super(endpoint, configuration, processor, c -> c.eventClient());
+        super(endpoint, configuration, processor, Consul::eventClient);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventProducer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventProducer.java
index 78b4c6c..df9e1eb 100644
--- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventProducer.java
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventProducer.java
@@ -16,19 +16,20 @@
  */
 package org.apache.camel.component.consul.enpoint;
 
+import com.orbitz.consul.Consul;
 import com.orbitz.consul.EventClient;
 import com.orbitz.consul.option.EventOptions;
 import com.orbitz.consul.option.QueryOptions;
 import org.apache.camel.InvokeOnHeader;
 import org.apache.camel.Message;
-import org.apache.camel.component.consul.AbstractConsulProducer;
 import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.component.consul.ConsulConstants;
 import org.apache.camel.component.consul.ConsulEndpoint;
 
-public class ConsulEventProducer extends AbstractConsulProducer<EventClient> {
+public final class ConsulEventProducer extends AbstractConsulProducer<EventClient> {
 
     public ConsulEventProducer(ConsulEndpoint endpoint, ConsulConfiguration configuration) {
-        super(endpoint, configuration, c -> c.eventClient());
+        super(endpoint, configuration, Consul::eventClient);
     }
 
     @InvokeOnHeader(ConsulEventActions.FIRE)
@@ -36,8 +37,8 @@ public class ConsulEventProducer extends AbstractConsulProducer<EventClient> {
         setBodyAndResult(
             message,
             getClient().fireEvent(
-                getMandatoryKey(message),
-                getOption(message, EventOptions.BLANK, EventOptions.class),
+                getMandatoryHeader(message, ConsulConstants.CONSUL_KEY, getConfiguration().getKey(), String.class),
+                message.getHeader(ConsulConstants.CONSUL_OPTIONS, EventOptions.BLANK, EventOptions.class),
                 message.getBody(String.class)
             )
         );
@@ -48,8 +49,8 @@ public class ConsulEventProducer extends AbstractConsulProducer<EventClient> {
         setBodyAndResult(
             message,
             getClient().listEvents(
-                getKey(message),
-                getOption(message, QueryOptions.BLANK, QueryOptions.class)
+                message.getHeader(ConsulConstants.CONSUL_KEY, getConfiguration().getKey(), String.class),
+                message.getHeader(ConsulConstants.CONSUL_OPTIONS, QueryOptions.BLANK, QueryOptions.class)
             )
         );
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulHealthActions.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulHealthActions.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulHealthActions.java
new file mode 100644
index 0000000..335ef37
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulHealthActions.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.enpoint;
+
+public interface ConsulHealthActions {
+    String CHECKS = "CHECKS";
+    String NODE_CHECKS = "NODE_CHECKS";
+    String SERVICE_CHECKS = "SERVICE_CHECKS";
+    String SERVICE_INSTANCES = "SERVICE_INSTANCES";
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulHealthProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulHealthProducer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulHealthProducer.java
new file mode 100644
index 0000000..d072f8e
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulHealthProducer.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.enpoint;
+
+import com.orbitz.consul.Consul;
+import com.orbitz.consul.HealthClient;
+import com.orbitz.consul.model.State;
+import org.apache.camel.InvokeOnHeader;
+import org.apache.camel.Message;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.component.consul.ConsulConstants;
+import org.apache.camel.component.consul.ConsulEndpoint;
+
+public final class ConsulHealthProducer extends AbstractConsulProducer<HealthClient> {
+
+    public ConsulHealthProducer(ConsulEndpoint endpoint, ConsulConfiguration configuration) {
+        super(endpoint, configuration, Consul::healthClient);
+    }
+
+    @InvokeOnHeader(ConsulHealthActions.NODE_CHECKS)
+    protected void nodeChecks(Message message) throws Exception {
+        processConsulResponse(
+            message,
+            getClient().getNodeChecks(
+                getMandatoryHeader(message, ConsulConstants.CONSUL_NODE, String.class),
+                buildQueryOptions(message, getConfiguration())
+            )
+        );
+    }
+
+    @InvokeOnHeader(ConsulHealthActions.SERVICE_CHECKS)
+    protected void serviceChecks(Message message) throws Exception {
+        processConsulResponse(
+            message,
+            getClient().getServiceChecks(
+                getMandatoryHeader(message, ConsulConstants.CONSUL_SERVICE, String.class),
+                buildQueryOptions(message, getConfiguration())
+            )
+        );
+    }
+
+    @InvokeOnHeader(ConsulHealthActions.SERVICE_INSTANCES)
+    protected void serviceInstances(Message message) throws Exception {
+        boolean healthyOnly = message.getHeader(ConsulConstants.CONSUL_HEALTHY_ONLY, false, boolean.class);
+
+        if (healthyOnly) {
+            processConsulResponse(
+                message,
+                getClient().getHealthyServiceInstances(
+                    getMandatoryHeader(message, ConsulConstants.CONSUL_SERVICE, String.class),
+                    buildQueryOptions(message, getConfiguration())
+                )
+            );
+        } else {
+            processConsulResponse(
+                message,
+                getClient().getAllServiceInstances(
+                    getMandatoryHeader(message, ConsulConstants.CONSUL_SERVICE, String.class),
+                    buildQueryOptions(message, getConfiguration())
+                )
+            );
+        }
+    }
+
+    @InvokeOnHeader(ConsulHealthActions.CHECKS)
+    protected void checks(Message message) throws Exception {
+        processConsulResponse(
+            message,
+            getClient().getChecksByState(
+                getMandatoryHeader(message, ConsulConstants.CONSUL_HEALTHY_STATE, State.class),
+                buildQueryOptions(message, getConfiguration())
+            )
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueConsumer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueConsumer.java
index 09804af..e87900d 100644
--- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueConsumer.java
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueConsumer.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.consul.enpoint;
 import java.util.List;
 
 import com.google.common.base.Optional;
+import com.orbitz.consul.Consul;
 import com.orbitz.consul.KeyValueClient;
 import com.orbitz.consul.async.ConsulResponseCallback;
 import com.orbitz.consul.model.ConsulResponse;
@@ -27,15 +28,14 @@ import com.orbitz.consul.option.QueryOptions;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
-import org.apache.camel.component.consul.AbstractConsulConsumer;
 import org.apache.camel.component.consul.ConsulConfiguration;
 import org.apache.camel.component.consul.ConsulConstants;
 import org.apache.camel.component.consul.ConsulEndpoint;
 
-public class ConsulKeyValueConsumer extends AbstractConsulConsumer<KeyValueClient> {
+public final class ConsulKeyValueConsumer extends AbstractConsulConsumer<KeyValueClient> {
 
     public ConsulKeyValueConsumer(ConsulEndpoint endpoint, ConsulConfiguration configuration, Processor processor) {
-        super(endpoint, configuration, processor, c -> c.keyValueClient());
+        super(endpoint, configuration, processor, Consul::keyValueClient);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueProducer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueProducer.java
index 390b103..8eb7134 100644
--- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueProducer.java
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueProducer.java
@@ -16,20 +16,20 @@
  */
 package org.apache.camel.component.consul.enpoint;
 
+import com.orbitz.consul.Consul;
 import com.orbitz.consul.KeyValueClient;
 import com.orbitz.consul.option.PutOptions;
 import com.orbitz.consul.option.QueryOptions;
 import org.apache.camel.InvokeOnHeader;
 import org.apache.camel.Message;
-import org.apache.camel.component.consul.AbstractConsulProducer;
 import org.apache.camel.component.consul.ConsulConfiguration;
 import org.apache.camel.component.consul.ConsulConstants;
 import org.apache.camel.component.consul.ConsulEndpoint;
 
-public class ConsulKeyValueProducer extends AbstractConsulProducer<KeyValueClient> {
+public final class ConsulKeyValueProducer extends AbstractConsulProducer<KeyValueClient> {
 
     public ConsulKeyValueProducer(ConsulEndpoint endpoint, ConsulConfiguration configuration) {
-        super(endpoint, configuration, c -> c.keyValueClient());
+        super(endpoint, configuration, Consul::keyValueClient);
     }
 
     @InvokeOnHeader(ConsulKeyValueActions.PUT)
@@ -37,10 +37,10 @@ public class ConsulKeyValueProducer extends AbstractConsulProducer<KeyValueClien
         message.setHeader(
             ConsulConstants.CONSUL_RESULT,
             getClient().putValue(
-                getMandatoryKey(message),
+                getMandatoryHeader(message, ConsulConstants.CONSUL_KEY, getConfiguration().getKey(), String.class),
                 message.getBody(String.class),
                 message.getHeader(ConsulConstants.CONSUL_FLAGS, 0L, Long.class),
-                getOption(message, PutOptions.BLANK, PutOptions.class)
+                message.getHeader(ConsulConstants.CONSUL_OPTIONS, PutOptions.BLANK, PutOptions.class)
             )
         );
     }
@@ -49,14 +49,15 @@ public class ConsulKeyValueProducer extends AbstractConsulProducer<KeyValueClien
     protected void getValue(Message message) throws Exception {
         Object result;
 
-        if (isValueAsString(message)) {
+        Boolean asString = message.getHeader(ConsulConstants.CONSUL_VALUE_AS_STRING, getConfiguration().isValueAsString(), Boolean.class);
+        if (asString) {
             result = getClient().getValueAsString(
-                getMandatoryKey(message)
+                getMandatoryHeader(message, ConsulConstants.CONSUL_KEY, getConfiguration().getKey(), String.class)
             ).orNull();
         } else {
             result = getClient().getValue(
-                getMandatoryKey(message),
-                getOption(message, QueryOptions.BLANK, QueryOptions.class)
+                getMandatoryHeader(message, ConsulConstants.CONSUL_KEY, getConfiguration().getKey(), String.class),
+                message.getHeader(ConsulConstants.CONSUL_OPTIONS, QueryOptions.BLANK, QueryOptions.class)
             ).orNull();
         }
 
@@ -67,14 +68,15 @@ public class ConsulKeyValueProducer extends AbstractConsulProducer<KeyValueClien
     protected void getValues(Message message) throws Exception {
         Object result;
 
-        if (isValueAsString(message)) {
+        Boolean asString = message.getHeader(ConsulConstants.CONSUL_VALUE_AS_STRING, getConfiguration().isValueAsString(), Boolean.class);
+        if (asString) {
             result = getClient().getValuesAsString(
-                getMandatoryKey(message)
+                getMandatoryHeader(message, ConsulConstants.CONSUL_KEY, getConfiguration().getKey(), String.class)
             );
         } else {
             result = getClient().getValues(
-                getMandatoryKey(message),
-                getOption(message, QueryOptions.BLANK, QueryOptions.class)
+                getMandatoryHeader(message, ConsulConstants.CONSUL_KEY, getConfiguration().getKey(), String.class),
+                message.getHeader(ConsulConstants.CONSUL_OPTIONS, QueryOptions.BLANK, QueryOptions.class)
             );
         }
 
@@ -83,23 +85,39 @@ public class ConsulKeyValueProducer extends AbstractConsulProducer<KeyValueClien
 
     @InvokeOnHeader(ConsulKeyValueActions.GET_KEYS)
     protected void getKeys(Message message) throws Exception {
-        setBodyAndResult(message, getClient().getKeys(getMandatoryKey(message)));
+        setBodyAndResult(
+            message,
+            getClient().getKeys(
+                getMandatoryHeader(message, ConsulConstants.CONSUL_KEY, getConfiguration().getKey(), String.class)
+            )
+        );
     }
 
     @InvokeOnHeader(ConsulKeyValueActions.GET_SESSIONS)
     protected void getSessions(Message message) throws Exception {
-        setBodyAndResult(message, getClient().getSession(getMandatoryKey(message)));
+        setBodyAndResult(
+            message,
+            getClient().getSession(
+                getMandatoryHeader(message, ConsulConstants.CONSUL_KEY, getConfiguration().getKey(), String.class)
+            )
+        );
     }
 
     @InvokeOnHeader(ConsulKeyValueActions.DELETE_KEY)
     protected void deleteKey(Message message) throws Exception {
-        getClient().deleteKey(getMandatoryKey(message));
+        getClient().deleteKey(
+            getMandatoryHeader(message, ConsulConstants.CONSUL_KEY, getConfiguration().getKey(), String.class)
+        );
+
         message.setHeader(ConsulConstants.CONSUL_RESULT, true);
     }
 
     @InvokeOnHeader(ConsulKeyValueActions.DELETE_KEYS)
     protected void deleteKeys(Message message) throws Exception {
-        getClient().deleteKeys(getMandatoryKey(message));
+        getClient().deleteKeys(
+            getMandatoryHeader(message, ConsulConstants.CONSUL_KEY, getConfiguration().getKey(), String.class)
+        );
+
         message.setHeader(ConsulConstants.CONSUL_RESULT, true);
     }
 
@@ -107,8 +125,8 @@ public class ConsulKeyValueProducer extends AbstractConsulProducer<KeyValueClien
     protected void lock(Message message) throws Exception {
         message.setHeader(ConsulConstants.CONSUL_RESULT,
             getClient().acquireLock(
-                getMandatoryKey(message),
-                getBody(message, null, String.class),
+                getMandatoryHeader(message, ConsulConstants.CONSUL_KEY, getConfiguration().getKey(), String.class),
+                message.getBody(String.class),
                 message.getHeader(ConsulConstants.CONSUL_SESSION, "", String.class)
             )
         );
@@ -118,7 +136,7 @@ public class ConsulKeyValueProducer extends AbstractConsulProducer<KeyValueClien
     protected void unlock(Message message) throws Exception {
         message.setHeader(ConsulConstants.CONSUL_RESULT,
             getClient().releaseLock(
-                getMandatoryKey(message),
+                getMandatoryHeader(message, ConsulConstants.CONSUL_KEY, getConfiguration().getKey(), String.class),
                 getMandatoryHeader(message, ConsulConstants.CONSUL_SESSION, String.class)
             )
         );

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulPreparedQueryActions.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulPreparedQueryActions.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulPreparedQueryActions.java
new file mode 100644
index 0000000..0266f6d
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulPreparedQueryActions.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.enpoint;
+
+public interface ConsulPreparedQueryActions {
+    String CREATE = "CREATE";
+    String GET = "GET";
+    String EXECUTE = "EXECUTE";
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulPreparedQueryProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulPreparedQueryProducer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulPreparedQueryProducer.java
new file mode 100644
index 0000000..7e0cc21
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulPreparedQueryProducer.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.enpoint;
+
+import com.orbitz.consul.Consul;
+import com.orbitz.consul.PreparedQueryClient;
+import com.orbitz.consul.model.query.PreparedQuery;
+import org.apache.camel.InvokeOnHeader;
+import org.apache.camel.Message;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.component.consul.ConsulConstants;
+import org.apache.camel.component.consul.ConsulEndpoint;
+import org.apache.camel.util.ObjectHelper;
+
+public final class ConsulPreparedQueryProducer extends AbstractConsulProducer<PreparedQueryClient> {
+
+    public ConsulPreparedQueryProducer(ConsulEndpoint endpoint, ConsulConfiguration configuration) {
+        super(endpoint, configuration, Consul::preparedQueryClient);
+    }
+
+    @InvokeOnHeader(ConsulPreparedQueryActions.CREATE)
+    protected void create(Message message) throws Exception {
+        setBodyAndResult(
+            message,
+            getClient().createPreparedQuery(
+                message.getMandatoryBody(PreparedQuery.class)
+            )
+        );
+    }
+
+    @InvokeOnHeader(ConsulPreparedQueryActions.GET)
+    protected void get(Message message) throws Exception {
+        String id = message.getHeader(ConsulConstants.CONSUL_PREPARED_QUERY_ID, String.class);
+
+        if (ObjectHelper.isEmpty(id)) {
+            setBodyAndResult(
+                message,
+                getClient().getPreparedQuery(
+                    message.getMandatoryBody(String.class)
+                )
+            );
+        } else {
+            setBodyAndResult(
+                message,
+                getClient().getPreparedQuery(id)
+            );
+        }
+    }
+
+    @InvokeOnHeader(ConsulPreparedQueryActions.EXECUTE)
+    protected void execute(Message message) throws Exception {
+        String id = message.getHeader(ConsulConstants.CONSUL_PREPARED_QUERY_ID, String.class);
+
+        if (ObjectHelper.isEmpty(id)) {
+            setBodyAndResult(
+                message,
+                getClient().execute(
+                    message.getMandatoryBody(String.class)
+                )
+            );
+        } else {
+            setBodyAndResult(
+                message,
+                getClient().execute(id)
+            );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e7063798/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulSessionActions.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulSessionActions.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulSessionActions.java
new file mode 100644
index 0000000..1df9f95
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulSessionActions.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.enpoint;
+
+public interface ConsulSessionActions {
+    String CREATE = "CREATE";
+    String DESTROY = "DESTROY";
+    String INFO = "INFO";
+    String LIST = "LIST";
+    String RENEW = "RENEW";
+}


[3/3] camel git commit: Add category to spring-boot and spring-cloud servicecall examples

Posted by lb...@apache.org.
Add category to spring-boot and spring-cloud servicecall examples


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/33952630
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/33952630
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/33952630

Branch: refs/heads/master
Commit: 339526303a5c50b3edd13b5fa43d04004b2d923f
Parents: e706379
Author: lburgazzoli <lb...@gmail.com>
Authored: Wed Apr 5 07:05:28 2017 +0200
Committer: lburgazzoli <lb...@gmail.com>
Committed: Wed Apr 5 07:05:28 2017 +0200

----------------------------------------------------------------------
 examples/README.adoc                                         | 8 ++++----
 .../camel-example-spring-boot-servicecall/consumer/pom.xml   | 2 +-
 examples/camel-example-spring-boot-servicecall/pom.xml       | 4 ++++
 .../camel-example-spring-boot-servicecall/services/pom.xml   | 2 +-
 .../camel-example-spring-cloud-servicecall/consumer/pom.xml  | 2 +-
 examples/camel-example-spring-cloud-servicecall/pom.xml      | 4 ++++
 .../camel-example-spring-cloud-servicecall/service/pom.xml   | 2 +-
 7 files changed, 16 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/33952630/examples/README.adoc
----------------------------------------------------------------------
diff --git a/examples/README.adoc b/examples/README.adoc
index 4f1825c..033e8b8 100644
--- a/examples/README.adoc
+++ b/examples/README.adoc
@@ -17,10 +17,6 @@ Number of Examples: 92 (10 deprecated)
 |=======================================================================
 | Example | Category | Description
 
-| link:camel-example-spring-boot-servicecall/README.adoc[Spring Boot Servicecall] (camel-example-spring-boot-servicecall) |  | An example showing how to work with Camel ServiceCall EIP and Spring Boot
-
-| link:camel-example-spring-cloud-servicecall/README.adoc[Spring Cloud Servicecall] (camel-example-spring-cloud-servicecall) |  | An example showing how to work with Camel ServiceCall EIP and Spring Cloud
-
 | link:camel-example-cdi/README.md[CDI] (camel-example-cdi) | Beginner | An example showing how to work with Camel and CDI for dependency injection
 
 | link:camel-example-cdi-properties/README.md[CDI Properties] (camel-example-cdi-properties) | Beginner | DeltaSpike configuration properties CDI example
@@ -68,6 +64,10 @@ Number of Examples: 92 (10 deprecated)
 
 | link:camel-example-hazelcast-kubernetes/ReadMe.md[Hazelcast Kubernetes] (camel-example-hazelcast-kubernetes) | Cloud | An example with Camel and Hazelcast running on Kubernetes
 
+| link:camel-example-spring-boot-servicecall/README.adoc[Spring Boot Servicecall] (camel-example-spring-boot-servicecall) | Cloud | An example showing how to work with Camel ServiceCall EIP and Spring Boot
+
+| link:camel-example-spring-cloud-servicecall/README.adoc[Spring Cloud Servicecall] (camel-example-spring-cloud-servicecall) | Cloud | An example showing how to work with Camel ServiceCall EIP and Spring Cloud
+
 | link:camel-example-cdi-cassandraql/README.md[CDI Cassandra] (camel-example-cdi-cassandraql) | Database | Cassandraql CDI example
 
 | link:camel-example-jdbc/README.md[JDBC] (camel-example-jdbc) | Database | An example for showing Camel using JDBC component

http://git-wip-us.apache.org/repos/asf/camel/blob/33952630/examples/camel-example-spring-boot-servicecall/consumer/pom.xml
----------------------------------------------------------------------
diff --git a/examples/camel-example-spring-boot-servicecall/consumer/pom.xml b/examples/camel-example-spring-boot-servicecall/consumer/pom.xml
index db8ab42..7e9cdb1 100644
--- a/examples/camel-example-spring-boot-servicecall/consumer/pom.xml
+++ b/examples/camel-example-spring-boot-servicecall/consumer/pom.xml
@@ -32,7 +32,7 @@
   <description>An example showing how to work with Camel ServiceCall EIP and Spring Boot (Consumer)</description>
 
   <properties>
-    <category>Beginner</category>
+    <category>Cloud</category>
 
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

http://git-wip-us.apache.org/repos/asf/camel/blob/33952630/examples/camel-example-spring-boot-servicecall/pom.xml
----------------------------------------------------------------------
diff --git a/examples/camel-example-spring-boot-servicecall/pom.xml b/examples/camel-example-spring-boot-servicecall/pom.xml
index 9546468..fb8891b 100644
--- a/examples/camel-example-spring-boot-servicecall/pom.xml
+++ b/examples/camel-example-spring-boot-servicecall/pom.xml
@@ -32,6 +32,10 @@
   <description>An example showing how to work with Camel ServiceCall EIP and Spring Boot</description>
   <packaging>pom</packaging>
 
+  <properties>
+    <category>Cloud</category>
+  </properties>
+
   <modules>
     <module>consumer</module>
     <module>services</module>

http://git-wip-us.apache.org/repos/asf/camel/blob/33952630/examples/camel-example-spring-boot-servicecall/services/pom.xml
----------------------------------------------------------------------
diff --git a/examples/camel-example-spring-boot-servicecall/services/pom.xml b/examples/camel-example-spring-boot-servicecall/services/pom.xml
index 4299bd3..4265c98 100644
--- a/examples/camel-example-spring-boot-servicecall/services/pom.xml
+++ b/examples/camel-example-spring-boot-servicecall/services/pom.xml
@@ -32,7 +32,7 @@
   <description>An example showing how to work with Camel ServiceCall EIP and Spring Boot (Services)</description>
 
   <properties>
-    <category>Beginner</category>
+    <category>Cloud</category>
 
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

http://git-wip-us.apache.org/repos/asf/camel/blob/33952630/examples/camel-example-spring-cloud-servicecall/consumer/pom.xml
----------------------------------------------------------------------
diff --git a/examples/camel-example-spring-cloud-servicecall/consumer/pom.xml b/examples/camel-example-spring-cloud-servicecall/consumer/pom.xml
index a2fee1f..46f1390 100644
--- a/examples/camel-example-spring-cloud-servicecall/consumer/pom.xml
+++ b/examples/camel-example-spring-cloud-servicecall/consumer/pom.xml
@@ -32,7 +32,7 @@
   <description>An example showing how to work with Camel ServiceCall EIP and Spring Cloud (Consumer)</description>
 
   <properties>
-    <category>Beginner</category>
+    <category>Cloud</category>
 
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

http://git-wip-us.apache.org/repos/asf/camel/blob/33952630/examples/camel-example-spring-cloud-servicecall/pom.xml
----------------------------------------------------------------------
diff --git a/examples/camel-example-spring-cloud-servicecall/pom.xml b/examples/camel-example-spring-cloud-servicecall/pom.xml
index 10c3517..d911316 100644
--- a/examples/camel-example-spring-cloud-servicecall/pom.xml
+++ b/examples/camel-example-spring-cloud-servicecall/pom.xml
@@ -32,6 +32,10 @@
   <description>An example showing how to work with Camel ServiceCall EIP and Spring Cloud</description>
   <packaging>pom</packaging>
 
+    <properties>
+    <category>Cloud</category>
+  </properties>
+
   <modules>
     <module>consumer</module>
     <module>service</module>

http://git-wip-us.apache.org/repos/asf/camel/blob/33952630/examples/camel-example-spring-cloud-servicecall/service/pom.xml
----------------------------------------------------------------------
diff --git a/examples/camel-example-spring-cloud-servicecall/service/pom.xml b/examples/camel-example-spring-cloud-servicecall/service/pom.xml
index ef8e7eb..d1e9ff8 100644
--- a/examples/camel-example-spring-cloud-servicecall/service/pom.xml
+++ b/examples/camel-example-spring-cloud-servicecall/service/pom.xml
@@ -32,7 +32,7 @@
   <description>An example showing how to work with Camel ServiceCall EIP and Spring Cloud (Service)</description>
 
   <properties>
-    <category>Beginner</category>
+    <category>Cloud</category>
 
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>