You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2016/05/23 15:22:41 UTC

[1/2] camel git commit: CAMEL-9888: Create a camel-consul component

Repository: camel
Updated Branches:
  refs/heads/master dba22f93a -> 38d5374aa


http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/resources/META-INF/LICENSE.txt
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/resources/META-INF/LICENSE.txt b/components/camel-consul/src/main/resources/META-INF/LICENSE.txt
new file mode 100644
index 0000000..6b0b127
--- /dev/null
+++ b/components/camel-consul/src/main/resources/META-INF/LICENSE.txt
@@ -0,0 +1,203 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/resources/META-INF/NOTICE.txt
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/resources/META-INF/NOTICE.txt b/components/camel-consul/src/main/resources/META-INF/NOTICE.txt
new file mode 100644
index 0000000..2e215bf
--- /dev/null
+++ b/components/camel-consul/src/main/resources/META-INF/NOTICE.txt
@@ -0,0 +1,11 @@
+   =========================================================================
+   ==  NOTICE file corresponding to the section 4 d of                    ==
+   ==  the Apache License, Version 2.0,                                   ==
+   ==  in this case for the Apache Camel distribution.                    ==
+   =========================================================================
+
+   This product includes software developed by
+   The Apache Software Foundation (http://www.apache.org/).
+
+   Please read the different LICENSE files present in the licenses directory of
+   this distribution.

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/resources/META-INF/services/org/apache/camel/component/consul
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/resources/META-INF/services/org/apache/camel/component/consul b/components/camel-consul/src/main/resources/META-INF/services/org/apache/camel/component/consul
new file mode 100644
index 0000000..e1e36dc
--- /dev/null
+++ b/components/camel-consul/src/main/resources/META-INF/services/org/apache/camel/component/consul
@@ -0,0 +1 @@
+class=org.apache.camel.component.consul.ConsulComponent

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulEventTest.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulEventTest.java b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulEventTest.java
new file mode 100644
index 0000000..372b592
--- /dev/null
+++ b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulEventTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.consul;
+
+import java.util.List;
+
+import com.orbitz.consul.model.EventResponse;
+import com.orbitz.consul.model.event.Event;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.consul.enpoint.ConsulEventActions;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+
+public class ConsulEventTest extends ConsulTestSupport {
+
+    @Test
+    public void testFireEvent() throws Exception {
+        String key = generateRandomString();
+        String val = generateRandomString();
+
+        MockEndpoint mock = getMockEndpoint("mock:event");
+        mock.expectedMinimumMessageCount(1);
+        mock.expectedHeaderReceived(ConsulConstants.CONSUL_RESULT, true);
+
+        fluentTemplate()
+            .withHeader(ConsulConstants.CONSUL_ACTION, ConsulEventActions.FIRE)
+            .withHeader(ConsulConstants.CONSUL_KEY, key)
+            .withBody(val)
+            .to("direct:event")
+            .send();
+
+        mock.assertIsSatisfied();
+
+        EventResponse response = getConsul().eventClient().listEvents(key);
+        List<Event> events = response.getEvents();
+
+        assertFalse(events.isEmpty());
+        assertTrue(events.get(0).getPayload().isPresent());
+        assertEquals(val, events.get(0).getPayload().get());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:event")
+                    .to("consul:event")
+                        .to("mock:event");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulEventWatchTest.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulEventWatchTest.java b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulEventWatchTest.java
new file mode 100644
index 0000000..591c79c
--- /dev/null
+++ b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulEventWatchTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import java.util.List;
+
+import com.orbitz.consul.EventClient;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+public class ConsulEventWatchTest extends ConsulTestSupport {
+    private String key;
+    private EventClient client;
+
+    @Override
+    public void doPreSetup() {
+        key = generateRandomString();
+        client = getConsul().eventClient();
+    }
+
+    @Test
+    public void testWatchEvent() throws Exception {
+        List<String> values = generateRandomListOfStrings(3);
+
+        MockEndpoint mock = getMockEndpoint("mock:event-watch");
+        mock.expectedMessageCount(values.size());
+        mock.expectedBodiesReceived(values);
+        mock.expectedHeaderReceived(ConsulConstants.CONSUL_RESULT, true);
+
+        values.forEach(v -> client.fireEvent(key, v));
+
+        mock.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                fromF("consul:event?key=%s", key)
+                    .to("log:org.apache.camel.component.consul?level=INFO&showAll=true")
+                        .to("mock:event-watch");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulKeyValueTest.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulKeyValueTest.java b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulKeyValueTest.java
new file mode 100644
index 0000000..798acc7
--- /dev/null
+++ b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulKeyValueTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import com.google.common.base.Optional;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.consul.enpoint.ConsulKeyValueActions;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+public class ConsulKeyValueTest extends ConsulTestSupport {
+
+    @Test
+    public void testKeyPut() throws Exception {
+        String key = generateKey();
+        String val = generateRandomString();
+
+        MockEndpoint mock = getMockEndpoint("mock:kv");
+        mock.expectedMinimumMessageCount(1);
+        mock.expectedBodiesReceived(val);
+        mock.expectedHeaderReceived(ConsulConstants.CONSUL_RESULT, true);
+
+        fluentTemplate()
+            .withHeader(ConsulConstants.CONSUL_ACTION, ConsulKeyValueActions.PUT)
+            .withHeader(ConsulConstants.CONSUL_KEY, key)
+            .withBody(val)
+            .to("direct:kv")
+            .send();
+
+        mock.assertIsSatisfied();
+
+        Optional<String> keyVal = getConsul().keyValueClient().getValueAsString(key);
+
+        assertTrue(keyVal.isPresent());
+        assertEquals(val, keyVal.get());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:kv")
+                    .to("consul:kv")
+                        .to("mock:kv");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulKeyValueWatchTest.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulKeyValueWatchTest.java b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulKeyValueWatchTest.java
new file mode 100644
index 0000000..71dfbf5
--- /dev/null
+++ b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulKeyValueWatchTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import java.util.List;
+import java.util.Random;
+
+import com.orbitz.consul.KeyValueClient;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+public class ConsulKeyValueWatchTest extends ConsulTestSupport {
+    private String key;
+    private KeyValueClient client;
+    private Random random;
+
+    @Override
+    public void doPreSetup() {
+        key = generateKey();
+        client = getConsul().keyValueClient();
+        random = new Random();
+    }
+
+    @Test
+    public void testWatchKey() throws Exception {
+        List<String> values = generateRandomListOfStrings(3);
+
+        MockEndpoint mock = getMockEndpoint("mock:kv-watch");
+        mock.expectedMessageCount(values.size());
+        mock.expectedBodiesReceived(values);
+        mock.expectedHeaderReceived(ConsulConstants.CONSUL_RESULT, true);
+
+        for (String val : values) {
+            client.putValue(key, val);
+            Thread.sleep(250 + random.nextInt(250));
+        }
+
+        mock.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                fromF("consul:kv?key=%s&valueAsString=true", key)
+                    .to("log:org.apache.camel.component.consul?level=INFO&showAll=true")
+                        .to("mock:kv-watch");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulTestSupport.java b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulTestSupport.java
new file mode 100644
index 0000000..6cf3683
--- /dev/null
+++ b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulTestSupport.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import com.orbitz.consul.Consul;
+import com.orbitz.consul.KeyValueClient;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConsulTestSupport extends CamelTestSupport {
+    public static final Logger LOGGER = LoggerFactory.getLogger(ConsulTestSupport.class);
+    public static final String KV_PREFIX = "/camel";
+
+    @Rule
+    public final TestName testName = new TestName();
+
+    protected Consul getConsul() {
+        return Consul.builder().build();
+    }
+
+    protected KeyValueClient getKeyValueClient() {
+        return getConsul().keyValueClient();
+    }
+
+    protected String generateRandomString() {
+        return UUID.randomUUID().toString();
+    }
+
+    protected String[] generateRandomArrayOfStrings(int size) {
+        String[] array = new String[size];
+        Arrays.setAll(array, i -> generateRandomString());
+
+        return array;
+    }
+
+    protected List<String> generateRandomListOfStrings(int size) {
+        return Arrays.asList(generateRandomArrayOfStrings(size));
+    }
+
+    protected String generateKey() {
+        return KV_PREFIX + "/" + testName.getMethodName() + "/" + generateRandomString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/test/resources/log4j.properties b/components/camel-consul/src/test/resources/log4j.properties
new file mode 100644
index 0000000..4103089
--- /dev/null
+++ b/components/camel-consul/src/test/resources/log4j.properties
@@ -0,0 +1,20 @@
+#
+# The logging properties used
+#
+log4j.rootLogger=INFO, file
+
+# uncomment the following line to turn on Camel debugging
+log4j.logger.org.apache.camel.component.consul=DEBUG
+
+# CONSOLE appender not used by default
+log4j.appender.out=org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
+#log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+# File appender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.file.file=target/camel-consul-test.log
+log4j.appender.file.append=true

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/pom.xml
----------------------------------------------------------------------
diff --git a/components/pom.xml b/components/pom.xml
index 66ff56a..e84bf7f 100644
--- a/components/pom.xml
+++ b/components/pom.xml
@@ -86,6 +86,7 @@
     <module>camel-cmis</module>
     <module>camel-coap</module>
     <module>camel-cometd</module>
+    <module>camel-consul</module>
     <module>camel-context</module>
     <module>camel-couchdb</module>
     <module>camel-crypto</module>

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index d58e2d9..5aa1953 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -131,6 +131,8 @@
     <commons-pool2-version>2.4.2</commons-pool2-version>
     <commons-vfs2-version>2.0</commons-vfs2-version>
     <compress-lzf-version>1.0.3</compress-lzf-version>
+    <consul-client-version>0.12.0</consul-client-version>
+    <consul-client-bundle-version>0.12.0_1</consul-client-bundle-version>
     <cobertura-maven-plugin-version>2.7</cobertura-maven-plugin-version>
     <cxf-version>3.1.6</cxf-version>
     <cxf-version-range>[3.0,4.0)</cxf-version-range>
@@ -822,6 +824,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.camel</groupId>
+        <artifactId>camel-consul</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.camel</groupId>
         <artifactId>camel-context</artifactId>
         <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/platforms/karaf/features/src/main/resources/bundles.properties
----------------------------------------------------------------------
diff --git a/platforms/karaf/features/src/main/resources/bundles.properties b/platforms/karaf/features/src/main/resources/bundles.properties
index affa23a..022290b 100644
--- a/platforms/karaf/features/src/main/resources/bundles.properties
+++ b/platforms/karaf/features/src/main/resources/bundles.properties
@@ -38,6 +38,7 @@ org.apache.servicemix.bundles/org.apache.servicemix.bundles.commons-csv/${common
 org.apache.servicemix.bundles/org.apache.servicemix.bundles.commons-dbcp/${commons-dbcp-bundle-version}/jar
 org.apache.servicemix.bundles/org.apache.servicemix.bundles.commons-httpclient/${commons-httpclient-bundle-version}/jar
 org.apache.servicemix.bundles/org.apache.servicemix.bundles.commons-io/${commons-io-bundle-version}/jar
+org.apache.servicemix.bundles/org.apache.servicemix.bundles.orbitz-consul-client/${consul-client-bundle-version}/jar
 org.apache.servicemix.bundles/org.apache.servicemix.bundles.dom4j/${dom4j-bundle-version}/jar
 org.apache.servicemix.bundles/org.apache.servicemix.bundles.flatpack/${flatpack-bundle-version}/jar
 org.apache.servicemix.bundles/org.apache.servicemix.bundles.fop/${fop-bundle-version}/jar

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/platforms/karaf/features/src/main/resources/features.xml
----------------------------------------------------------------------
diff --git a/platforms/karaf/features/src/main/resources/features.xml b/platforms/karaf/features/src/main/resources/features.xml
index 97f04e1..3c9b2de 100644
--- a/platforms/karaf/features/src/main/resources/features.xml
+++ b/platforms/karaf/features/src/main/resources/features.xml
@@ -328,6 +328,12 @@
     <bundle dependency='true'>mvn:org.cometd.java/cometd-java-common/${cometd-java-server}</bundle>
     <bundle>mvn:org.apache.camel/camel-cometd/${project.version}</bundle>
   </feature>
+  <feature name='camel-consul' version='${project.version}' resolver='(obr)' start-level='50'>
+    <feature version='${project.version}'>camel-core</feature>
+    <bundle dependency='true'>mvn:com.google.guava/guava/${google-guava-version}</bundle>
+    <bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.orbitz-consul-client/${consul-client-bundle-version}</bundle>
+    <bundle>mvn:org.apache.camel/camel-consul/${project.version}</bundle>
+  </feature>
   <feature name='camel-context' version='${project.version}' resolver='(obr)' start-level='50'>
     <feature version='${project.version}'>camel-core</feature>
     <bundle>mvn:org.apache.camel/camel-context/${project.version}</bundle>


[2/2] camel git commit: CAMEL-9888: Create a camel-consul component

Posted by lb...@apache.org.
CAMEL-9888: Create a camel-consul component


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/38d5374a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/38d5374a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/38d5374a

Branch: refs/heads/master
Commit: 38d5374aaf945ba587d7fcc06de8bac2ef4e0a48
Parents: dba22f9
Author: lburgazzoli <lb...@gmail.com>
Authored: Mon May 23 17:15:34 2016 +0200
Committer: lburgazzoli <lb...@gmail.com>
Committed: Mon May 23 17:22:25 2016 +0200

----------------------------------------------------------------------
 apache-camel/pom.xml                            |   4 +
 .../src/main/descriptors/common-bin.xml         |   1 +
 components/camel-consul/pom.xml                 | 120 ++++++++
 .../consul/AbstractConsulConsumer.java          | 110 +++++++
 .../consul/AbstractConsulEndpoint.java          |  94 ++++++
 .../consul/AbstractConsulProducer.java          | 129 ++++++++
 .../camel/component/consul/ConsulComponent.java |  65 ++++
 .../component/consul/ConsulConfiguration.java   | 241 +++++++++++++++
 .../camel/component/consul/ConsulConstants.java |  42 +++
 .../component/consul/ConsulEndpointFactory.java |  25 ++
 .../consul/enpoint/ConsulAgentActions.java      |  25 ++
 .../consul/enpoint/ConsulAgentEndpoint.java     |  43 +++
 .../consul/enpoint/ConsulAgentProducer.java     |  33 ++
 .../consul/enpoint/ConsulEventActions.java      |  23 ++
 .../consul/enpoint/ConsulEventConsumer.java     | 131 ++++++++
 .../consul/enpoint/ConsulEventEndpoint.java     |  44 +++
 .../consul/enpoint/ConsulEventProducer.java     |  56 ++++
 .../consul/enpoint/ConsulKeyValueActions.java   |  30 ++
 .../consul/enpoint/ConsulKeyValueConsumer.java  | 128 ++++++++
 .../consul/enpoint/ConsulKeyValueEndpoint.java  |  43 +++
 .../consul/enpoint/ConsulKeyValueProducer.java  | 126 ++++++++
 .../consul/policy/ConsulRoutePolicy.java        | 308 +++++++++++++++++++
 .../src/main/resources/META-INF/LICENSE.txt     | 203 ++++++++++++
 .../src/main/resources/META-INF/NOTICE.txt      |  11 +
 .../services/org/apache/camel/component/consul  |   1 +
 .../camel/component/consul/ConsulEventTest.java |  68 ++++
 .../component/consul/ConsulEventWatchTest.java  |  60 ++++
 .../component/consul/ConsulKeyValueTest.java    |  62 ++++
 .../consul/ConsulKeyValueWatchTest.java         |  66 ++++
 .../component/consul/ConsulTestSupport.java     |  64 ++++
 .../src/test/resources/log4j.properties         |  20 ++
 components/pom.xml                              |   1 +
 parent/pom.xml                                  |   7 +
 .../src/main/resources/bundles.properties       |   1 +
 .../features/src/main/resources/features.xml    |   6 +
 35 files changed, 2391 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/apache-camel/pom.xml
----------------------------------------------------------------------
diff --git a/apache-camel/pom.xml b/apache-camel/pom.xml
index 8f8c2f7..a556261 100644
--- a/apache-camel/pom.xml
+++ b/apache-camel/pom.xml
@@ -165,6 +165,10 @@
     </dependency>
     <dependency>
       <groupId>org.apache.camel</groupId>
+      <artifactId>camel-consul</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
       <artifactId>camel-context</artifactId>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/apache-camel/src/main/descriptors/common-bin.xml
----------------------------------------------------------------------
diff --git a/apache-camel/src/main/descriptors/common-bin.xml b/apache-camel/src/main/descriptors/common-bin.xml
index 3587254..5745702 100644
--- a/apache-camel/src/main/descriptors/common-bin.xml
+++ b/apache-camel/src/main/descriptors/common-bin.xml
@@ -54,6 +54,7 @@
         <include>org.apache.camel:camel-core</include>
         <include>org.apache.camel:camel-core-osgi</include>
         <include>org.apache.camel:camel-cometd</include>
+        <include>org.apache.camel:camel-consul</include>
         <include>org.apache.camel:camel-context</include>
         <include>org.apache.camel:camel-couchdb</include>
         <include>org.apache.camel:camel-crypto</include>

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-consul/pom.xml b/components/camel-consul/pom.xml
new file mode 100644
index 0000000..9b9dca9
--- /dev/null
+++ b/components/camel-consul/pom.xml
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.camel</groupId>
+    <artifactId>components</artifactId>
+    <version>2.18-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>camel-consul</artifactId>
+  <packaging>jar</packaging>
+  <name>Camel :: Consul</name>
+  <description>Camel Consul support</description>
+
+  <properties>
+    <camel.osgi.export.pkg>
+      org.apache.camel.component.consul.*,
+    </camel.osgi.export.pkg>
+    <camel.osgi.export.service>
+      org.apache.camel.spi.ComponentResolver;component=consul
+    </camel.osgi.export.service>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.orbitz.consul</groupId>
+      <artifactId>consul-client</artifactId>
+      <version>${consul-client-version}</version>
+    </dependency>
+
+    <!-- testing -->
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>${mockito-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>java-hamcrest</artifactId>
+      <version>${hamcrest-version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+
+  <profiles>
+    <profile>
+      <id>consul-skip-tests</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <skipTests>true</skipTests>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>consul-tests</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <skipTests>false</skipTests>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+
+  </profiles>
+
+</project>

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulConsumer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulConsumer.java
new file mode 100644
index 0000000..c3b6545
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulConsumer.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import java.math.BigInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import com.orbitz.consul.Consul;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * @author lburgazzoli
+ */
+public abstract class AbstractConsulConsumer<C> extends DefaultConsumer {
+    protected final AbstractConsulEndpoint endpoint;
+    protected final ConsulConfiguration configuration;
+    protected final String key;
+    protected final AtomicReference<BigInteger> index;
+
+    private final Function<Consul, C> clientSupplier;
+    private Runnable watcher;
+
+    protected AbstractConsulConsumer(AbstractConsulEndpoint endpoint, ConsulConfiguration configuration, Processor processor, Function<Consul, C> clientSupplier) {
+        super(endpoint, processor);
+
+        this.endpoint = endpoint;
+        this.configuration = configuration;
+        this.key = ObjectHelper.notNull(configuration.getKey(), ConsulConstants.CONSUL_KEY);
+        this.index = new AtomicReference<>(BigInteger.valueOf(configuration.getFirstIndex()));
+        this.clientSupplier = clientSupplier;
+        this.watcher = null;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        watcher = createWatcher(clientSupplier.apply(endpoint.getConsul()));
+        watcher.run();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        watcher = null;
+
+        super.doStop();
+    }
+
+    // *************************************************************************
+    //
+    // *************************************************************************
+
+    protected abstract Runnable createWatcher(C client) throws Exception;
+
+    // *************************************************************************
+    // Handlers
+    // *************************************************************************
+
+    protected abstract class AbstractWatcher implements Runnable {
+        private final C client;
+
+        public AbstractWatcher(C client) {
+            this.client = client;
+        }
+
+        protected void onError(Throwable throwable) {
+            if (isRunAllowed()) {
+                getExceptionHandler().handleException("Error watching for event " + key, throwable);
+            }
+        }
+
+        protected final void setIndex(BigInteger responseIndex) {
+            index.set(responseIndex);
+        }
+
+        @Override
+        public final void run() {
+            if (isRunAllowed()) {
+                watch(client);
+            }
+        }
+
+        protected final C client() {
+            return client;
+        }
+
+        protected final void watch() {
+            watch(client);
+        }
+
+        protected abstract void watch(C client);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulEndpoint.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulEndpoint.java
new file mode 100644
index 0000000..981c314
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulEndpoint.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import com.orbitz.consul.Consul;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.util.ObjectHelper;
+
+public abstract class AbstractConsulEndpoint extends DefaultEndpoint {
+
+    @UriPath(description = "The consul configuration")
+    @Metadata(required = "true")
+    private final ConsulConfiguration configuration;
+
+    @UriParam(description = "The API endpoint")
+    @Metadata(required = "true")
+    private final String apiEndpoint;
+
+    private Consul consul;
+
+    protected AbstractConsulEndpoint(String apiEndpoint, String uri, ConsulComponent component, ConsulConfiguration configuration) {
+        super(uri, component);
+
+        this.configuration = configuration;
+        this.apiEndpoint = apiEndpoint;
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+
+    // *************************************************************************
+    //
+    // *************************************************************************
+
+    public ConsulConfiguration getConfiguration() {
+        return this.configuration;
+    }
+
+    public String getApiEndpoint() {
+        return this.apiEndpoint;
+    }
+
+    public synchronized Consul getConsul() throws Exception {
+        if (consul == null) {
+            Consul.Builder builder = Consul.builder();
+            builder.withPing(configuration.isPingInstance());
+
+            if (ObjectHelper.isNotEmpty(configuration.getUrl())) {
+                builder.withUrl(configuration.getUrl());
+            }
+            if (ObjectHelper.isNotEmpty(configuration.getSslContextParameters())) {
+                builder.withSslContext(configuration.getSslContextParameters().createSSLContext(getCamelContext()));
+            }
+            if (ObjectHelper.isNotEmpty(configuration.getAclToken())) {
+                builder.withAclToken(configuration.getAclToken());
+            }
+            if (configuration.requiresBasicAuthentication()) {
+                builder.withBasicAuth(configuration.getUserName(), configuration.getPassword());
+            }
+            if (ObjectHelper.isNotEmpty(configuration.getConnectTimeoutMillis())) {
+                builder.withConnectTimeoutMillis(configuration.getConnectTimeoutMillis());
+            }
+            if (ObjectHelper.isNotEmpty(configuration.getReadTimeoutMillis())) {
+                builder.withReadTimeoutMillis(configuration.getReadTimeoutMillis());
+            }
+            if (ObjectHelper.isNotEmpty(configuration.getWriteTimeoutMillis())) {
+                builder.withWriteTimeoutMillis(configuration.getWriteTimeoutMillis());
+            }
+
+            consul = builder.build();
+        }
+
+        return consul;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulProducer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulProducer.java
new file mode 100644
index 0000000..2be260c
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulProducer.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import java.util.function.Function;
+
+import com.orbitz.consul.Consul;
+import org.apache.camel.Message;
+import org.apache.camel.NoSuchHeaderException;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.HeaderSelectorProducer;
+
+
+public abstract class AbstractConsulProducer<C> extends HeaderSelectorProducer {
+    private final AbstractConsulEndpoint endpoint;
+    private final ConsulConfiguration configuration;
+    private final Function<Consul, C> clientSupplier;
+    private C client;
+
+    protected AbstractConsulProducer(AbstractConsulEndpoint endpoint, ConsulConfiguration configuration, Function<Consul, C> clientSupplier) {
+        super(endpoint, ConsulConstants.CONSUL_ACTION, configuration.getAction());
+
+        this.endpoint = endpoint;
+        this.configuration = configuration;
+        this.clientSupplier = clientSupplier;
+        this.client = null;
+    }
+
+    // *************************************************************************
+    //
+    // *************************************************************************
+
+    protected Consul getConsul() throws Exception {
+        return endpoint.getConsul();
+    }
+
+    protected C getClient() throws Exception {
+        if (client == null) {
+            client = clientSupplier.apply(getConsul());
+        }
+
+        return client;
+    }
+
+    protected ConsulConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    protected <D> D getHeader(Message message, String header, D defaultValue, Class<D> type) {
+        return message.getHeader(header, defaultValue, type);
+    }
+
+    protected <D> D getMandatoryHeader(Message message, String header, Class<D> type) throws Exception {
+        return getMandatoryHeader(message, header, null, type);
+    }
+
+    protected <D> D getMandatoryHeader(Message message, String header, D defaultValue, Class<D> type) throws Exception {
+        D value = getHeader(message, header, defaultValue, type);
+        if (value == null) {
+            throw new NoSuchHeaderException(message.getExchange(), header, type);
+        }
+
+        return value;
+    }
+
+    protected String getKey(Message message) {
+        return message.getHeader(
+            ConsulConstants.CONSUL_KEY,
+            configuration.getKey(),
+            String.class);
+    }
+
+    protected String getMandatoryKey(Message message) throws Exception {
+        return getMandatoryHeader(
+            message,
+            ConsulConstants.CONSUL_KEY,
+            configuration.getKey(),
+            String.class);
+    }
+
+    protected <T> T getOption(Message message, T defaultValue, Class<T> type) {
+        return message.getHeader(ConsulConstants.CONSUL_OPTIONS, defaultValue, type);
+    }
+
+    protected boolean isValueAsString(Message message) throws Exception {
+        return message.getHeader(
+            ConsulConstants.CONSUL_VALUE_AS_STRING,
+            configuration.isValueAsString(),
+            Boolean.class);
+    }
+
+    protected <T> T getBody(Message message, T defaultValue, Class<T> type) throws Exception {
+        T body = message.getBody(type);
+        if (body == null) {
+            body = defaultValue;
+        }
+
+        return  body;
+    }
+
+    protected void setBodyAndResult(Message message, Object body) throws Exception {
+        setBodyAndResult(message, body, body != null);
+    }
+
+    protected void setBodyAndResult(Message message, Object body, boolean result) throws Exception {
+        message.setHeader(ConsulConstants.CONSUL_RESULT, result);
+        if (body != null) {
+            message.setBody(body);
+        }
+    }
+
+    protected Processor wrap(Function<C, Object> supplier) {
+        return exchange -> setBodyAndResult(exchange.getIn(), supplier.apply(getClient()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulComponent.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulComponent.java
new file mode 100644
index 0000000..bbd6a1c
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulComponent.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.component.consul.enpoint.ConsulAgentEndpoint;
+import org.apache.camel.component.consul.enpoint.ConsulEventEndpoint;
+import org.apache.camel.component.consul.enpoint.ConsulKeyValueEndpoint;
+import org.apache.camel.impl.UriEndpointComponent;
+
+/**
+ * Represents the component that manages {@link AbstractConsulEndpoint}.
+ */
+public class ConsulComponent extends UriEndpointComponent {
+    
+    public ConsulComponent() {
+        super(AbstractConsulEndpoint.class);
+    }
+
+    public ConsulComponent(CamelContext context) {
+        super(context, AbstractConsulEndpoint.class);
+    }
+
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        ConsulConfiguration configuration = new ConsulConfiguration();
+        setProperties(configuration, parameters);
+
+        return ConsulApiEndpoint.valueOf(remaining).create(uri, this, configuration);
+    }
+
+    private enum ConsulApiEndpoint implements ConsulEndpointFactory {
+        kv(ConsulKeyValueEndpoint::new),
+        event(ConsulEventEndpoint::new),
+        agent(ConsulAgentEndpoint::new);
+
+        private final ConsulEndpointFactory factory;
+
+        ConsulApiEndpoint(ConsulEndpointFactory factory) {
+            this.factory = factory;
+        }
+
+        @Override
+        public Endpoint create(String uri, ConsulComponent component, ConsulConfiguration configuration) throws Exception {
+            return factory.create(uri, component, configuration);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConfiguration.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConfiguration.java
new file mode 100644
index 0000000..2c0e576
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConfiguration.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.jsse.SSLContextParameters;
+
+@UriParams
+public class ConsulConfiguration {
+    @UriParam
+    private String url;
+
+    @UriParam(label = "security")
+    private SSLContextParameters sslContextParameters;
+    @UriParam(label = "security")
+    private String aclToken;
+    @UriParam(label = "security")
+    private String userName;
+    @UriParam(label = "security")
+    private String password;
+
+    @UriParam
+    private Long connectTimeoutMillis;
+    @UriParam
+    private Long readTimeoutMillis;
+    @UriParam
+    private Long writeTimeoutMillis;
+    @UriParam(defaultValue = "true")
+    private boolean pingInstance = true;
+
+
+    @UriParam(label = "producer")
+    private String action;
+
+    @UriParam(label = "producer,kv", defaultValue = "false")
+    private boolean valueAsString;
+
+    @UriParam
+    private String key;
+
+    @UriParam(label = "consumer,watch", defaultValue = "10")
+    private Integer blockSeconds = 10;
+
+    @UriParam(label = "consumer,watch", defaultValue = "0")
+    private long firstIndex;
+
+    @UriParam(label = "consumer,watch", defaultValue = "false")
+    private boolean recursive;
+
+
+    public String getUrl() {
+        return url;
+    }
+
+    /**
+     * The Consul agent URL
+     */
+    public void setUrl(String url) {
+        this.url = url;
+    }
+
+    public SSLContextParameters getSslContextParameters() {
+        return sslContextParameters;
+    }
+
+    /**
+     * SSL configuration using an org.apache.camel.util.jsse.SSLContextParameters
+     * instance.
+     */
+    public void setSslContextParameters(SSLContextParameters sslContextParameters) {
+        this.sslContextParameters = sslContextParameters;
+    }
+
+    public String getAclToken() {
+        return aclToken;
+    }
+
+    /**
+     * Sets the ACL token to be used with Consul
+     */
+    public void setAclToken(String aclToken) {
+        this.aclToken = aclToken;
+    }
+
+    public String getAction() {
+        return action;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    /**
+     * Sets the username to be used for basic authentication
+     */
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    /**
+     * Sets the password to be used for basic authentication
+     */
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public boolean requiresBasicAuthentication() {
+        return ObjectHelper.isNotEmpty(userName) && ObjectHelper.isNotEmpty(password);
+    }
+
+    public Long getConnectTimeoutMillis() {
+        return connectTimeoutMillis;
+    }
+
+    /**
+     * Connect timeout for OkHttpClient
+     */
+    public void setConnectTimeoutMillis(Long connectTimeoutMillis) {
+        this.connectTimeoutMillis = connectTimeoutMillis;
+    }
+
+    public Long getReadTimeoutMillis() {
+        return readTimeoutMillis;
+    }
+
+    /**
+     * Read timeout for OkHttpClient
+     */
+    public void setReadTimeoutMillis(Long readTimeoutMillis) {
+        this.readTimeoutMillis = readTimeoutMillis;
+    }
+
+    public Long getWriteTimeoutMillis() {
+        return writeTimeoutMillis;
+    }
+
+    /**
+     * Write timeout for OkHttpClient
+     */
+    public void setWritTeimeoutMillis(Long writeTimeoutMillis) {
+        this.writeTimeoutMillis = writeTimeoutMillis;
+    }
+
+    public void setWriteTimeoutMillis(Long writeTimeoutMillis) {
+        this.writeTimeoutMillis = writeTimeoutMillis;
+    }
+
+    public boolean isPingInstance() {
+        return pingInstance;
+    }
+
+    /**
+     * Configure if the AgentClient should attempt a ping before returning the Consul instance
+     */
+    public void setPingInstance(boolean pingInstance) {
+        this.pingInstance = pingInstance;
+    }
+
+    /**
+     * The default action. Can be overridden by CamelConsulAction
+     */
+    public void setAction(String action) {
+        this.action = action;
+    }
+
+    public boolean isValueAsString() {
+        return valueAsString;
+    }
+
+    /**
+     * Default to transform values retrieved from Consul i.e. on KV endpoint to
+     * string.
+     */
+    public void setValueAsString(boolean valueAsString) {
+        this.valueAsString = valueAsString;
+    }
+
+    public String getKey() {
+        return key;
+    }
+
+    /**
+     * The default action. Can be overridden by CamelConsulKey
+     */
+    public void setKey(String key) {
+        this.key = key;
+    }
+
+    public Integer getBlockSeconds() {
+        return blockSeconds;
+    }
+
+    /**
+     * The second to wait for a watch event, default 10 seconds
+     */
+    public void setBlockSeconds(Integer blockSeconds) {
+        this.blockSeconds = blockSeconds;
+    }
+
+    public long getFirstIndex() {
+        return firstIndex;
+    }
+
+    /**
+     * The first index for watch for, default 0
+     */
+    public void setFirstIndex(long firstIndex) {
+        this.firstIndex = firstIndex;
+    }
+
+    public boolean isRecursive() {
+        return recursive;
+    }
+
+    /**
+     * Recursively watch, default false
+     */
+    public void setRecursive(boolean recursive) {
+        this.recursive = recursive;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java
new file mode 100644
index 0000000..923e1d7
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+public interface ConsulConstants {
+    String CONSUL_ENDPOINT_KV = "kv";
+
+    String CONSUL_ACTION = "CamelConsulAction";
+    String CONSUL_KEY = "CamelConsulKey";
+    String CONSUL_EVENT_ID = "CamelConsulEventId";
+    String CONSUL_EVENT_NAME = "CamelConsulEventName";
+    String CONSUL_EVENT_LTIME = "CamelConsulEventLTime";
+    String CONSUL_NODE_FILTER = "CamelConsulNodeFilter";
+    String CONSUL_TAG_FILTER = "CamelConsulTagFilter";
+    String CONSUL_SERVICE_FILTER = "CamelConsulSessionFilter";
+    String CONSUL_VERSION = "CamelConsulVersion";
+    String CONSUL_VALUE = "CamelConsulValue";
+    String CONSUL_VALUES = "CamelConsulValues";
+    String CONSUL_FLAGS = "CamelConsulFlags";
+    String CONSUL_CREATE_INDEX = "CamelConsulCreateIndex";
+    String CONSUL_LOCK_INDEX = "CamelConsulCreateIndex";
+    String CONSUL_MODIFY_INDEX = "CamelConsulModifyIndex";
+    String CONSUL_OPTIONS = "CamelConsulOptions";
+    String CONSUL_RESULT = "CamelConsulResult";
+    String CONSUL_SESSION = "CamelConsulSession";
+    String CONSUL_OPERATION = "CamelConsulOperation";
+    String CONSUL_VALUE_AS_STRING = "CamelConsulValueAsString";
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulEndpointFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulEndpointFactory.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulEndpointFactory.java
new file mode 100644
index 0000000..3401ff8
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulEndpointFactory.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+
+import org.apache.camel.Endpoint;
+
+@FunctionalInterface
+public interface ConsulEndpointFactory {
+    Endpoint create(String uri, ConsulComponent component, ConsulConfiguration configuration) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentActions.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentActions.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentActions.java
new file mode 100644
index 0000000..aff15ab
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentActions.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.consul.enpoint;
+
+public interface ConsulAgentActions {
+    String CHECKS = "CHECKS";
+    String SERVICES = "SERVICES";
+    String MEMBERS = "MEMBERS";
+    String AGENT = "AGENT";
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentEndpoint.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentEndpoint.java
new file mode 100644
index 0000000..9d5742d
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentEndpoint.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.consul.enpoint;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.consul.AbstractConsulEndpoint;
+import org.apache.camel.component.consul.ConsulComponent;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.spi.UriEndpoint;
+
+@UriEndpoint(scheme = "consul", title = "Consul Agent", syntax = "consul://agent", producerOnly = true, label = "api,cloud")
+public class ConsulAgentEndpoint extends AbstractConsulEndpoint {
+    public ConsulAgentEndpoint(String uri, ConsulComponent component, ConsulConfiguration configuration) {
+        super("agent", uri, component, configuration);
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        return new ConsulAgentProducer(this, getConfiguration());
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        throw new IllegalArgumentException("Not implemented");
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentProducer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentProducer.java
new file mode 100644
index 0000000..940095a
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentProducer.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.enpoint;
+
+import com.orbitz.consul.AgentClient;
+import org.apache.camel.component.consul.AbstractConsulEndpoint;
+import org.apache.camel.component.consul.AbstractConsulProducer;
+import org.apache.camel.component.consul.ConsulConfiguration;
+
+public class ConsulAgentProducer extends AbstractConsulProducer<AgentClient> {
+    ConsulAgentProducer(AbstractConsulEndpoint endpoint, ConsulConfiguration configuration) {
+        super(endpoint, configuration, c -> c.agentClient());
+
+        bind(ConsulAgentActions.CHECKS, wrap(c -> c.getChecks()));
+        bind(ConsulAgentActions.SERVICES, wrap(c -> c.getServices()));
+        bind(ConsulAgentActions.MEMBERS, wrap(c -> c.getMembers()));
+        bind(ConsulAgentActions.AGENT, wrap(c -> c.getAgent()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventActions.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventActions.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventActions.java
new file mode 100644
index 0000000..76e63ec
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventActions.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.consul.enpoint;
+
+public interface ConsulEventActions {
+    String FIRE = "FIRE";
+    String LIST = "LIST";
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventConsumer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventConsumer.java
new file mode 100644
index 0000000..f6f658c
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventConsumer.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.enpoint;
+
+import java.math.BigInteger;
+import java.util.List;
+
+import com.orbitz.consul.EventClient;
+import com.orbitz.consul.async.EventResponseCallback;
+import com.orbitz.consul.model.EventResponse;
+import com.orbitz.consul.model.event.Event;
+import com.orbitz.consul.option.QueryOptions;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.component.consul.AbstractConsulConsumer;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.component.consul.ConsulConstants;
+
+public class ConsulEventConsumer extends AbstractConsulConsumer<EventClient> {
+    protected ConsulEventConsumer(ConsulEventEndpoint endpoint, ConsulConfiguration configuration, Processor processor) {
+        super(endpoint, configuration, processor, c -> c.eventClient());
+    }
+
+    @Override
+    protected Runnable createWatcher(EventClient client) throws Exception {
+        return new EventWatcher(client);
+    }
+
+    // *************************************************************************
+    // Watch
+    // *************************************************************************
+
+    private class EventWatcher extends AbstractWatcher implements EventResponseCallback {
+        EventWatcher(EventClient client) {
+            super(client);
+        }
+
+        @Override
+        public void watch(EventClient client) {
+            client.listEvents(
+                key,
+                QueryOptions.blockSeconds(configuration.getBlockSeconds(), index.get()).build(),
+                this
+            );
+        }
+
+        @Override
+        public void onComplete(EventResponse eventResponse) {
+            if (isRunAllowed()) {
+                List<Event> events = filterEvents(eventResponse.getEvents(), index.get());
+                events.forEach(this::onEvent);
+
+                setIndex(eventResponse.getIndex());
+
+                watch();
+            }
+        }
+
+        @Override
+        public void onFailure(Throwable throwable) {
+            onError(throwable);
+        }
+
+        private void onEvent(Event event) {
+            final Exchange exchange = endpoint.createExchange();
+            final Message message = exchange.getIn();
+
+            message.setHeader(ConsulConstants.CONSUL_KEY, key);
+            message.setHeader(ConsulConstants.CONSUL_RESULT, true);
+            message.setHeader(ConsulConstants.CONSUL_EVENT_ID, event.getId());
+            message.setHeader(ConsulConstants.CONSUL_EVENT_NAME, event.getName());
+            message.setHeader(ConsulConstants.CONSUL_EVENT_LTIME, event.getLTime());
+            message.setHeader(ConsulConstants.CONSUL_NODE_FILTER, event.getNodeFilter());
+            message.setHeader(ConsulConstants.CONSUL_SERVICE_FILTER, event.getServiceFilter());
+            message.setHeader(ConsulConstants.CONSUL_TAG_FILTER, event.getTagFilter());
+            message.setHeader(ConsulConstants.CONSUL_VERSION, event.getVersion());
+            message.setBody(event.getPayload().orNull());
+
+            try {
+                getProcessor().process(exchange);
+            } catch (Exception e) {
+                getExceptionHandler().handleException("Error processing exchange", exchange, e);
+            }
+        }
+
+        /**
+         * from spring-cloud-consul (https://github.com/spring-cloud/spring-cloud-consul):
+         *     spring-cloud-consul-bus/src/main/java/org/springframework/cloud/consul/bus/EventService.java
+         */
+        private List<Event> filterEvents(List<Event> toFilter, BigInteger lastIndex) {
+            List<Event> events = toFilter;
+            if (lastIndex != null) {
+                for (int i = 0; i < events.size(); i++) {
+                    Event event = events.get(i);
+                    BigInteger eventIndex = getEventIndexFromId(event);
+                    if (eventIndex.equals(lastIndex)) {
+                        events = events.subList(i + 1, events.size());
+                        break;
+                    }
+                }
+            }
+            return events;
+        }
+
+        private BigInteger getEventIndexFromId(Event event) {
+            String eventId = event.getId();
+            String lower = eventId.substring(0, 8) + eventId.substring(9, 13) + eventId.substring(14, 18);
+            String upper = eventId.substring(19, 23) + eventId.substring(24, 36);
+
+            BigInteger lowVal = new BigInteger(lower, 16);
+            BigInteger highVal = new BigInteger(upper, 16);
+
+            return lowVal.xor(highVal);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventEndpoint.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventEndpoint.java
new file mode 100644
index 0000000..df3254c
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventEndpoint.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.consul.enpoint;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.consul.AbstractConsulEndpoint;
+import org.apache.camel.component.consul.ConsulComponent;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.spi.UriEndpoint;
+
+
+@UriEndpoint(scheme = "consul", title = "Consul KeyValue", syntax = "consul://event", consumerClass = ConsulEventConsumer.class, label = "api,cloud")
+public class ConsulEventEndpoint extends AbstractConsulEndpoint {
+    public ConsulEventEndpoint(String uri, ConsulComponent component, ConsulConfiguration configuration) {
+        super("event", uri, component, configuration);
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        return new ConsulEventProducer(this, getConfiguration());
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        return new ConsulEventConsumer(this, getConfiguration(), processor);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventProducer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventProducer.java
new file mode 100644
index 0000000..26b6595
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventProducer.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.consul.enpoint;
+
+import com.orbitz.consul.EventClient;
+import com.orbitz.consul.option.EventOptions;
+import com.orbitz.consul.option.QueryOptions;
+import org.apache.camel.InvokeOnHeader;
+import org.apache.camel.Message;
+import org.apache.camel.component.consul.AbstractConsulEndpoint;
+import org.apache.camel.component.consul.AbstractConsulProducer;
+import org.apache.camel.component.consul.ConsulConfiguration;
+
+public class ConsulEventProducer extends AbstractConsulProducer<EventClient> {
+    ConsulEventProducer(AbstractConsulEndpoint endpoint, ConsulConfiguration configuration) {
+        super(endpoint, configuration, c -> c.eventClient());
+    }
+
+    @InvokeOnHeader(ConsulEventActions.FIRE)
+    protected void fire(Message message) throws Exception {
+        setBodyAndResult(
+            message,
+            getClient().fireEvent(
+                getMandatoryKey(message),
+                getOption(message, EventOptions.BLANK, EventOptions.class),
+                message.getBody(String.class)
+            )
+        );
+    }
+
+    @InvokeOnHeader(ConsulEventActions.LIST)
+    protected void list(Message message) throws Exception {
+        setBodyAndResult(
+            message,
+            getClient().listEvents(
+                getKey(message),
+                getOption(message, QueryOptions.BLANK, QueryOptions.class)
+            )
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueActions.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueActions.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueActions.java
new file mode 100644
index 0000000..d014bbd
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueActions.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.consul.enpoint;
+
+public interface ConsulKeyValueActions {
+    String PUT = "PUT";
+    String GET_VALUE = "GET_VALUE";
+    String GET_VALUES = "GET_VALUES";
+    String GET_KEYS = "GET_KEYS";
+    String GET_SESSIONS = "GET_SESSIONS";
+    String DELETE_KEY = "DELETE_KEY";
+    String DELETE_KEYS = "DELETE_KEYS";
+    String LOCK = "LOCK";
+    String UNLOCK = "UNLOCK";
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueConsumer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueConsumer.java
new file mode 100644
index 0000000..a90d8cb
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueConsumer.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.enpoint;
+
+import java.util.List;
+
+import com.google.common.base.Optional;
+import com.orbitz.consul.KeyValueClient;
+import com.orbitz.consul.async.ConsulResponseCallback;
+import com.orbitz.consul.model.ConsulResponse;
+import com.orbitz.consul.model.kv.Value;
+import com.orbitz.consul.option.QueryOptions;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.component.consul.AbstractConsulConsumer;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.component.consul.ConsulConstants;
+
+public class ConsulKeyValueConsumer extends AbstractConsulConsumer<KeyValueClient> {
+
+    protected ConsulKeyValueConsumer(ConsulKeyValueEndpoint endpoint, ConsulConfiguration configuration, Processor processor) {
+        super(endpoint, configuration, processor, c -> c.keyValueClient());
+    }
+
+    @Override
+    protected Runnable createWatcher(KeyValueClient client) throws Exception {
+        return configuration.isRecursive() ? new RecursivePathWatcher(client) : new PathWatcher(client);
+    }
+
+    // *************************************************************************
+    // Watch
+    // *************************************************************************
+
+    private abstract class AbstractPathWatcher<T> extends AbstractWatcher implements ConsulResponseCallback<T> {
+        protected AbstractPathWatcher(KeyValueClient client) {
+            super(client);
+        }
+
+        protected QueryOptions queryOptions() {
+            return QueryOptions.blockSeconds(configuration.getBlockSeconds(), index.get()).build();
+        }
+
+        @Override
+        public void onComplete(ConsulResponse<T> consulResponse) {
+            if (isRunAllowed()) {
+                onResponse(consulResponse.getResponse());
+                setIndex(consulResponse.getIndex());
+                watch();
+            }
+        }
+
+        @Override
+        public void onFailure(Throwable throwable) {
+            onError(throwable);
+        }
+
+        protected void onValue(Value value) {
+            final Exchange exchange = endpoint.createExchange();
+            final Message message = exchange.getIn();
+
+            message.setHeader(ConsulConstants.CONSUL_KEY, value.getKey());
+            message.setHeader(ConsulConstants.CONSUL_RESULT, true);
+            message.setHeader(ConsulConstants.CONSUL_FLAGS, value.getFlags());
+            message.setHeader(ConsulConstants.CONSUL_CREATE_INDEX, value.getCreateIndex());
+            message.setHeader(ConsulConstants.CONSUL_LOCK_INDEX, value.getLockIndex());
+            message.setHeader(ConsulConstants.CONSUL_MODIFY_INDEX, value.getModifyIndex());
+            message.setHeader(ConsulConstants.CONSUL_SESSION, value.getSession().orNull());
+            message.setBody(configuration.isValueAsString() ? value.getValueAsString().orNull() : value.getValue().orNull());
+
+            try {
+                getProcessor().process(exchange);
+            } catch (Exception e) {
+                getExceptionHandler().handleException("Error processing exchange", exchange, e);
+            }
+        }
+
+        protected abstract void onResponse(T consulResponse);
+    }
+
+    private class PathWatcher extends AbstractPathWatcher<Optional<Value>> {
+        PathWatcher(KeyValueClient client) {
+            super(client);
+        }
+
+        @Override
+        public void watch(KeyValueClient client) {
+            client.getValue(key, queryOptions(), this);
+        }
+
+        @Override
+        public void onResponse(Optional<Value> value) {
+            if (value.isPresent()) {
+                onValue(value.get());
+            }
+        }
+    }
+
+    private class RecursivePathWatcher extends AbstractPathWatcher<List<Value>> {
+        RecursivePathWatcher(KeyValueClient client) {
+            super(client);
+        }
+
+        @Override
+        public void watch(KeyValueClient client) {
+            client.getValues(key, queryOptions(), this);
+        }
+
+        @Override
+        public void onResponse(List<Value> values) {
+            values.forEach(this::onValue);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueEndpoint.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueEndpoint.java
new file mode 100644
index 0000000..2910910
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueEndpoint.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.consul.enpoint;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.consul.AbstractConsulEndpoint;
+import org.apache.camel.component.consul.ConsulComponent;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.spi.UriEndpoint;
+
+@UriEndpoint(scheme = "consul", title = "Consul KeyValue", syntax = "consul://kv", consumerClass = ConsulKeyValueConsumer.class, label = "api,cloud")
+public class ConsulKeyValueEndpoint extends AbstractConsulEndpoint {
+    public ConsulKeyValueEndpoint(String uri, ConsulComponent component, ConsulConfiguration configuration) {
+        super("kv", uri, component, configuration);
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        return new ConsulKeyValueProducer(this, getConfiguration());
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        return new ConsulKeyValueConsumer(this, getConfiguration(), processor);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueProducer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueProducer.java
new file mode 100644
index 0000000..9596f4c
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueProducer.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.enpoint;
+
+import com.orbitz.consul.KeyValueClient;
+import com.orbitz.consul.option.PutOptions;
+import com.orbitz.consul.option.QueryOptions;
+import org.apache.camel.InvokeOnHeader;
+import org.apache.camel.Message;
+import org.apache.camel.component.consul.AbstractConsulEndpoint;
+import org.apache.camel.component.consul.AbstractConsulProducer;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.component.consul.ConsulConstants;
+
+public class ConsulKeyValueProducer extends AbstractConsulProducer<KeyValueClient> {
+
+    ConsulKeyValueProducer(AbstractConsulEndpoint endpoint, ConsulConfiguration configuration) {
+        super(endpoint, configuration, c -> c.keyValueClient());
+    }
+
+    @InvokeOnHeader(ConsulKeyValueActions.PUT)
+    protected void put(Message message) throws Exception {
+        message.setHeader(
+            ConsulConstants.CONSUL_RESULT,
+            getClient().putValue(
+                getMandatoryKey(message),
+                message.getBody(String.class),
+                message.getHeader(ConsulConstants.CONSUL_FLAGS, 0L, Long.class),
+                getOption(message, PutOptions.BLANK, PutOptions.class)
+            )
+        );
+    }
+
+    @InvokeOnHeader(ConsulKeyValueActions.GET_VALUE)
+    protected void getValue(Message message) throws Exception {
+        Object result;
+
+        if (isValueAsString(message)) {
+            result = getClient().getValueAsString(
+                getMandatoryKey(message)
+            ).orNull();
+        } else {
+            result = getClient().getValue(
+                getMandatoryKey(message),
+                getOption(message, QueryOptions.BLANK, QueryOptions.class)
+            ).orNull();
+        }
+
+        setBodyAndResult(message, result);
+    }
+
+    @InvokeOnHeader(ConsulKeyValueActions.GET_VALUES)
+    protected void getValues(Message message) throws Exception {
+        Object result;
+
+        if (isValueAsString(message)) {
+            result = getClient().getValuesAsString(
+                getMandatoryKey(message)
+            );
+        } else {
+            result = getClient().getValues(
+                getMandatoryKey(message),
+                getOption(message, QueryOptions.BLANK, QueryOptions.class)
+            );
+        }
+
+        setBodyAndResult(message, result);
+    }
+
+    @InvokeOnHeader(ConsulKeyValueActions.GET_KEYS)
+    protected void getKeys(Message message) throws Exception {
+        setBodyAndResult(message, getClient().getKeys(getMandatoryKey(message)));
+    }
+
+    @InvokeOnHeader(ConsulKeyValueActions.GET_SESSIONS)
+    protected void getSessions(Message message) throws Exception {
+        setBodyAndResult(message, getClient().getSession(getMandatoryKey(message)));
+    }
+
+    @InvokeOnHeader(ConsulKeyValueActions.DELETE_KEY)
+    protected void deleteKey(Message message) throws Exception {
+        getClient().deleteKey(getMandatoryKey(message));
+        message.setHeader(ConsulConstants.CONSUL_RESULT, true);
+    }
+
+    @InvokeOnHeader(ConsulKeyValueActions.DELETE_KEYS)
+    protected void deleteKeys(Message message) throws Exception {
+        getClient().deleteKeys(getMandatoryKey(message));
+        message.setHeader(ConsulConstants.CONSUL_RESULT, true);
+    }
+
+    @InvokeOnHeader(ConsulKeyValueActions.LOCK)
+    protected void lock(Message message) throws Exception {
+        message.setHeader(ConsulConstants.CONSUL_RESULT,
+            getClient().acquireLock(
+                getMandatoryKey(message),
+                getBody(message, null, String.class),
+                message.getHeader(ConsulConstants.CONSUL_SESSION, "", String.class)
+            )
+        );
+    }
+
+    @InvokeOnHeader(ConsulKeyValueActions.UNLOCK)
+    protected void unlock(Message message) throws Exception {
+        message.setHeader(ConsulConstants.CONSUL_RESULT,
+            getClient().releaseLock(
+                getMandatoryKey(message),
+                getMandatoryHeader(message, ConsulConstants.CONSUL_SESSION, String.class)
+            )
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java
new file mode 100644
index 0000000..0b44e86
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.policy;
+
+import java.math.BigInteger;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.base.Optional;
+import com.orbitz.consul.Consul;
+import com.orbitz.consul.KeyValueClient;
+import com.orbitz.consul.SessionClient;
+import com.orbitz.consul.async.ConsulResponseCallback;
+import com.orbitz.consul.model.ConsulResponse;
+import com.orbitz.consul.model.kv.Value;
+import com.orbitz.consul.model.session.ImmutableSession;
+import com.orbitz.consul.option.QueryOptions;
+import org.apache.camel.Exchange;
+import org.apache.camel.NonManagedService;
+import org.apache.camel.Route;
+import org.apache.camel.support.RoutePolicySupport;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConsulRoutePolicy extends RoutePolicySupport implements NonManagedService {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ConsulRoutePolicy.class);
+
+    private final Object lock;
+    private final Consul consul;
+    private final SessionClient sessionClient;
+    private final KeyValueClient keyValueClient;
+    private final AtomicBoolean leader;
+    private final Set<Route> suspendedRoutes;
+    private final AtomicReference<BigInteger> index;
+
+    private String serviceName;
+    private String servicePath;
+    private int ttl;
+    private int lockDelay;
+    private ExecutorService executorService;
+    private boolean shouldStopConsumer;
+
+    private String sessionId;
+
+    public ConsulRoutePolicy() {
+        this(Consul.builder().build());
+    }
+
+    public ConsulRoutePolicy(Consul consul) {
+        this.consul = consul;
+        this.sessionClient = consul.sessionClient();
+        this.keyValueClient = consul.keyValueClient();
+        this.suspendedRoutes =  new HashSet<>();
+        this.leader = new AtomicBoolean(false);
+        this.lock = new Object();
+        this.index = new AtomicReference<>(BigInteger.valueOf(0));
+        this.serviceName = null;
+        this.servicePath = null;
+        this.ttl = 60;
+        this.lockDelay = 10;
+        this.executorService = null;
+        this.shouldStopConsumer = true;
+        this.sessionId = null;
+    }
+
+    @Override
+    public void onExchangeBegin(Route route, Exchange exchange)  {
+        if (leader.get()) {
+            if (shouldStopConsumer) {
+                startConsumer(route);
+            }
+        } else {
+            if (shouldStopConsumer) {
+                stopConsumer(route);
+            }
+
+            exchange.setException(new IllegalStateException(
+                "Consul based route policy prohibits processing exchanges, stopping route and failing the exchange")
+            );
+        }
+    }
+
+    @Override
+    public void onStop(Route route) {
+        synchronized (lock) {
+            suspendedRoutes.remove(route);
+        }
+    }
+
+    @Override
+    public synchronized void onSuspend(Route route) {
+        synchronized (lock) {
+            suspendedRoutes.remove(route);
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        if (sessionId == null) {
+            sessionId = sessionClient.createSession(
+                ImmutableSession.builder()
+                    .name(serviceName)
+                    .ttl(ttl + "s")
+                    .lockDelay(lockDelay + "s")
+                    .build()
+                ).getId();
+
+            LOGGER.debug("SessionID = {}", sessionId);
+            if (executorService == null) {
+                executorService = Executors.newSingleThreadExecutor();
+            }
+
+            setLeader(keyValueClient.acquireLock(servicePath, sessionId));
+
+            executorService.submit(new Watcher());
+        }
+
+        super.doStart();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+
+        if (sessionId != null) {
+            sessionClient.destroySession(sessionId);
+            sessionId = null;
+
+            if (executorService != null) {
+                executorService.shutdown();
+                executorService.awaitTermination(ttl / 3, TimeUnit.SECONDS);
+            }
+        }
+    }
+
+    // *************************************************************************
+    //
+    // *************************************************************************
+
+    protected void setLeader(boolean isLeader) {
+        if (isLeader && leader.compareAndSet(false, isLeader)) {
+            LOGGER.debug("Leadership taken ({}, {})", serviceName, sessionId);
+            startAllStoppedConsumers();
+        } else {
+            if (!leader.getAndSet(isLeader) && isLeader) {
+                LOGGER.debug("Leadership lost ({}, {})", serviceName, sessionId);
+            }
+        }
+    }
+
+    private void startConsumer(Route route) {
+        synchronized (lock) {
+            try {
+                if (suspendedRoutes.contains(route)) {
+                    startConsumer(route.getConsumer());
+                    suspendedRoutes.remove(route);
+                }
+            } catch (Exception e) {
+                handleException(e);
+            }
+        }
+    }
+
+    private void stopConsumer(Route route) {
+        synchronized (lock) {
+            try {
+                if (!suspendedRoutes.contains(route)) {
+                    LOGGER.debug("Stopping consumer for {} ({})", route.getId(), route.getConsumer());
+                    stopConsumer(route.getConsumer());
+                    suspendedRoutes.add(route);
+                }
+            } catch (Exception e) {
+                handleException(e);
+            }
+        }
+    }
+
+    private void startAllStoppedConsumers() {
+        synchronized (lock) {
+            try {
+                for (Route route : suspendedRoutes) {
+                    LOGGER.debug("Starting consumer for {} ({})", route.getId(), route.getConsumer());
+                    startConsumer(route.getConsumer());
+                }
+
+                suspendedRoutes.clear();
+            } catch (Exception e) {
+                handleException(e);
+            }
+        }
+    }
+
+    // *************************************************************************
+    // Getter/Setters
+    // *************************************************************************
+
+    public Consul getConsul() {
+        return consul;
+    }
+
+    public String getServiceName() {
+        return serviceName;
+    }
+
+    public void setServiceName(String serviceName) {
+        this.serviceName = serviceName;
+        this.servicePath = String.format("/service/%s/leader", serviceName);
+    }
+
+    public int getTtl() {
+        return ttl;
+    }
+
+    public void setTtl(int ttl) {
+        this.ttl = ttl > 10 ? ttl : 10;
+    }
+
+    public int getLockDelay() {
+        return lockDelay;
+    }
+
+    public void setLockDelay(int lockDelay) {
+        this.lockDelay = lockDelay > 10 ? lockDelay : 10;
+    }
+
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
+
+    public void setExecutorService(ExecutorService executorService) {
+        this.executorService = executorService;
+    }
+
+    public boolean isShouldStopConsumer() {
+        return shouldStopConsumer;
+    }
+
+    public void setShouldStopConsumer(boolean shouldStopConsumer) {
+        this.shouldStopConsumer = shouldStopConsumer;
+    }
+
+    // *************************************************************************
+    // Watch
+    // *************************************************************************
+
+    private class Watcher implements Runnable, ConsulResponseCallback<Optional<Value>> {
+
+        @Override
+        public void onComplete(ConsulResponse<Optional<Value>> consulResponse) {
+            if (isRunAllowed()) {
+                Value response = consulResponse.getResponse().orNull();
+                if (response != null) {
+                    String sid = response.getSession().orNull();
+                    if (ObjectHelper.isEmpty(sid)) {
+                        // If the key is not held by any session, try acquire a
+                        // lock (become leader)
+                        LOGGER.debug("Try to take leadership ...");
+                        setLeader(keyValueClient.acquireLock(servicePath, sessionId));
+                    } else if (!sessionId.equals(sid) && leader.get()) {
+                        // Looks like I've lost leadership
+                        setLeader(false);
+                    }
+                }
+
+                index.set(consulResponse.getIndex());
+                run();
+            }
+        }
+
+        @Override
+        public void onFailure(Throwable throwable) {
+            handleException(throwable);
+        }
+
+        @Override
+        public void run() {
+            if (isRunAllowed()) {
+                // Refresh session
+                sessionClient.renewSession(sessionId);
+
+                keyValueClient.getValue(
+                    servicePath,
+                    QueryOptions.blockSeconds(ttl / 3, index.get()).build(),
+                    this);
+            }
+        }
+    }
+}