You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2016/05/23 15:22:41 UTC
[1/2] camel git commit: CAMEL-9888: Create a camel-consul component
Repository: camel
Updated Branches:
refs/heads/master dba22f93a -> 38d5374aa
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/resources/META-INF/LICENSE.txt
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/resources/META-INF/LICENSE.txt b/components/camel-consul/src/main/resources/META-INF/LICENSE.txt
new file mode 100644
index 0000000..6b0b127
--- /dev/null
+++ b/components/camel-consul/src/main/resources/META-INF/LICENSE.txt
@@ -0,0 +1,203 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/resources/META-INF/NOTICE.txt
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/resources/META-INF/NOTICE.txt b/components/camel-consul/src/main/resources/META-INF/NOTICE.txt
new file mode 100644
index 0000000..2e215bf
--- /dev/null
+++ b/components/camel-consul/src/main/resources/META-INF/NOTICE.txt
@@ -0,0 +1,11 @@
+ =========================================================================
+ == NOTICE file corresponding to the section 4 d of ==
+ == the Apache License, Version 2.0, ==
+ == in this case for the Apache Camel distribution. ==
+ =========================================================================
+
+ This product includes software developed by
+ The Apache Software Foundation (http://www.apache.org/).
+
+ Please read the different LICENSE files present in the licenses directory of
+ this distribution.
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/resources/META-INF/services/org/apache/camel/component/consul
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/resources/META-INF/services/org/apache/camel/component/consul b/components/camel-consul/src/main/resources/META-INF/services/org/apache/camel/component/consul
new file mode 100644
index 0000000..e1e36dc
--- /dev/null
+++ b/components/camel-consul/src/main/resources/META-INF/services/org/apache/camel/component/consul
@@ -0,0 +1 @@
+class=org.apache.camel.component.consul.ConsulComponent
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulEventTest.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulEventTest.java b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulEventTest.java
new file mode 100644
index 0000000..372b592
--- /dev/null
+++ b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulEventTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.consul;
+
+import java.util.List;
+
+import com.orbitz.consul.model.EventResponse;
+import com.orbitz.consul.model.event.Event;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.consul.enpoint.ConsulEventActions;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+
+public class ConsulEventTest extends ConsulTestSupport {
+
+ @Test
+ public void testFireEvent() throws Exception {
+ String key = generateRandomString();
+ String val = generateRandomString();
+
+ MockEndpoint mock = getMockEndpoint("mock:event");
+ mock.expectedMinimumMessageCount(1);
+ mock.expectedHeaderReceived(ConsulConstants.CONSUL_RESULT, true);
+
+ fluentTemplate()
+ .withHeader(ConsulConstants.CONSUL_ACTION, ConsulEventActions.FIRE)
+ .withHeader(ConsulConstants.CONSUL_KEY, key)
+ .withBody(val)
+ .to("direct:event")
+ .send();
+
+ mock.assertIsSatisfied();
+
+ EventResponse response = getConsul().eventClient().listEvents(key);
+ List<Event> events = response.getEvents();
+
+ assertFalse(events.isEmpty());
+ assertTrue(events.get(0).getPayload().isPresent());
+ assertEquals(val, events.get(0).getPayload().get());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:event")
+ .to("consul:event")
+ .to("mock:event");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulEventWatchTest.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulEventWatchTest.java b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulEventWatchTest.java
new file mode 100644
index 0000000..591c79c
--- /dev/null
+++ b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulEventWatchTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import java.util.List;
+
+import com.orbitz.consul.EventClient;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+public class ConsulEventWatchTest extends ConsulTestSupport {
+ private String key;
+ private EventClient client;
+
+ @Override
+ public void doPreSetup() {
+ key = generateRandomString();
+ client = getConsul().eventClient();
+ }
+
+ @Test
+ public void testWatchEvent() throws Exception {
+ List<String> values = generateRandomListOfStrings(3);
+
+ MockEndpoint mock = getMockEndpoint("mock:event-watch");
+ mock.expectedMessageCount(values.size());
+ mock.expectedBodiesReceived(values);
+ mock.expectedHeaderReceived(ConsulConstants.CONSUL_RESULT, true);
+
+ values.forEach(v -> client.fireEvent(key, v));
+
+ mock.assertIsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+ fromF("consul:event?key=%s", key)
+ .to("log:org.apache.camel.component.consul?level=INFO&showAll=true")
+ .to("mock:event-watch");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulKeyValueTest.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulKeyValueTest.java b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulKeyValueTest.java
new file mode 100644
index 0000000..798acc7
--- /dev/null
+++ b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulKeyValueTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import com.google.common.base.Optional;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.consul.enpoint.ConsulKeyValueActions;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+public class ConsulKeyValueTest extends ConsulTestSupport {
+
+ @Test
+ public void testKeyPut() throws Exception {
+ String key = generateKey();
+ String val = generateRandomString();
+
+ MockEndpoint mock = getMockEndpoint("mock:kv");
+ mock.expectedMinimumMessageCount(1);
+ mock.expectedBodiesReceived(val);
+ mock.expectedHeaderReceived(ConsulConstants.CONSUL_RESULT, true);
+
+ fluentTemplate()
+ .withHeader(ConsulConstants.CONSUL_ACTION, ConsulKeyValueActions.PUT)
+ .withHeader(ConsulConstants.CONSUL_KEY, key)
+ .withBody(val)
+ .to("direct:kv")
+ .send();
+
+ mock.assertIsSatisfied();
+
+ Optional<String> keyVal = getConsul().keyValueClient().getValueAsString(key);
+
+ assertTrue(keyVal.isPresent());
+ assertEquals(val, keyVal.get());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:kv")
+ .to("consul:kv")
+ .to("mock:kv");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulKeyValueWatchTest.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulKeyValueWatchTest.java b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulKeyValueWatchTest.java
new file mode 100644
index 0000000..71dfbf5
--- /dev/null
+++ b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulKeyValueWatchTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import java.util.List;
+import java.util.Random;
+
+import com.orbitz.consul.KeyValueClient;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+public class ConsulKeyValueWatchTest extends ConsulTestSupport {
+ private String key;
+ private KeyValueClient client;
+ private Random random;
+
+ @Override
+ public void doPreSetup() {
+ key = generateKey();
+ client = getConsul().keyValueClient();
+ random = new Random();
+ }
+
+ @Test
+ public void testWatchKey() throws Exception {
+ List<String> values = generateRandomListOfStrings(3);
+
+ MockEndpoint mock = getMockEndpoint("mock:kv-watch");
+ mock.expectedMessageCount(values.size());
+ mock.expectedBodiesReceived(values);
+ mock.expectedHeaderReceived(ConsulConstants.CONSUL_RESULT, true);
+
+ for (String val : values) {
+ client.putValue(key, val);
+ Thread.sleep(250 + random.nextInt(250));
+ }
+
+ mock.assertIsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+ fromF("consul:kv?key=%s&valueAsString=true", key)
+ .to("log:org.apache.camel.component.consul?level=INFO&showAll=true")
+ .to("mock:kv-watch");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulTestSupport.java b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulTestSupport.java
new file mode 100644
index 0000000..6cf3683
--- /dev/null
+++ b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulTestSupport.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import com.orbitz.consul.Consul;
+import com.orbitz.consul.KeyValueClient;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConsulTestSupport extends CamelTestSupport {
+ public static final Logger LOGGER = LoggerFactory.getLogger(ConsulTestSupport.class);
+ public static final String KV_PREFIX = "/camel";
+
+ @Rule
+ public final TestName testName = new TestName();
+
+ protected Consul getConsul() {
+ return Consul.builder().build();
+ }
+
+ protected KeyValueClient getKeyValueClient() {
+ return getConsul().keyValueClient();
+ }
+
+ protected String generateRandomString() {
+ return UUID.randomUUID().toString();
+ }
+
+ protected String[] generateRandomArrayOfStrings(int size) {
+ String[] array = new String[size];
+ Arrays.setAll(array, i -> generateRandomString());
+
+ return array;
+ }
+
+ protected List<String> generateRandomListOfStrings(int size) {
+ return Arrays.asList(generateRandomArrayOfStrings(size));
+ }
+
+ protected String generateKey() {
+ return KV_PREFIX + "/" + testName.getMethodName() + "/" + generateRandomString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/test/resources/log4j.properties b/components/camel-consul/src/test/resources/log4j.properties
new file mode 100644
index 0000000..4103089
--- /dev/null
+++ b/components/camel-consul/src/test/resources/log4j.properties
@@ -0,0 +1,20 @@
+#
+# The logging properties used
+#
+log4j.rootLogger=INFO, file
+
+# uncomment the following line to turn on Camel debugging
+log4j.logger.org.apache.camel.component.consul=DEBUG
+
+# CONSOLE appender not used by default
+log4j.appender.out=org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
+#log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+# File appender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.file.file=target/camel-consul-test.log
+log4j.appender.file.append=true
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/pom.xml
----------------------------------------------------------------------
diff --git a/components/pom.xml b/components/pom.xml
index 66ff56a..e84bf7f 100644
--- a/components/pom.xml
+++ b/components/pom.xml
@@ -86,6 +86,7 @@
<module>camel-cmis</module>
<module>camel-coap</module>
<module>camel-cometd</module>
+ <module>camel-consul</module>
<module>camel-context</module>
<module>camel-couchdb</module>
<module>camel-crypto</module>
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index d58e2d9..5aa1953 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -131,6 +131,8 @@
<commons-pool2-version>2.4.2</commons-pool2-version>
<commons-vfs2-version>2.0</commons-vfs2-version>
<compress-lzf-version>1.0.3</compress-lzf-version>
+ <consul-client-version>0.12.0</consul-client-version>
+ <consul-client-bundle-version>0.12.0_1</consul-client-bundle-version>
<cobertura-maven-plugin-version>2.7</cobertura-maven-plugin-version>
<cxf-version>3.1.6</cxf-version>
<cxf-version-range>[3.0,4.0)</cxf-version-range>
@@ -822,6 +824,11 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
+ <artifactId>camel-consul</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
<artifactId>camel-context</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/platforms/karaf/features/src/main/resources/bundles.properties
----------------------------------------------------------------------
diff --git a/platforms/karaf/features/src/main/resources/bundles.properties b/platforms/karaf/features/src/main/resources/bundles.properties
index affa23a..022290b 100644
--- a/platforms/karaf/features/src/main/resources/bundles.properties
+++ b/platforms/karaf/features/src/main/resources/bundles.properties
@@ -38,6 +38,7 @@ org.apache.servicemix.bundles/org.apache.servicemix.bundles.commons-csv/${common
org.apache.servicemix.bundles/org.apache.servicemix.bundles.commons-dbcp/${commons-dbcp-bundle-version}/jar
org.apache.servicemix.bundles/org.apache.servicemix.bundles.commons-httpclient/${commons-httpclient-bundle-version}/jar
org.apache.servicemix.bundles/org.apache.servicemix.bundles.commons-io/${commons-io-bundle-version}/jar
+org.apache.servicemix.bundles/org.apache.servicemix.bundles.orbitz-consul-client/${consul-client-bundle-version}/jar
org.apache.servicemix.bundles/org.apache.servicemix.bundles.dom4j/${dom4j-bundle-version}/jar
org.apache.servicemix.bundles/org.apache.servicemix.bundles.flatpack/${flatpack-bundle-version}/jar
org.apache.servicemix.bundles/org.apache.servicemix.bundles.fop/${fop-bundle-version}/jar
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/platforms/karaf/features/src/main/resources/features.xml
----------------------------------------------------------------------
diff --git a/platforms/karaf/features/src/main/resources/features.xml b/platforms/karaf/features/src/main/resources/features.xml
index 97f04e1..3c9b2de 100644
--- a/platforms/karaf/features/src/main/resources/features.xml
+++ b/platforms/karaf/features/src/main/resources/features.xml
@@ -328,6 +328,12 @@
<bundle dependency='true'>mvn:org.cometd.java/cometd-java-common/${cometd-java-server}</bundle>
<bundle>mvn:org.apache.camel/camel-cometd/${project.version}</bundle>
</feature>
+ <feature name='camel-consul' version='${project.version}' resolver='(obr)' start-level='50'>
+ <feature version='${project.version}'>camel-core</feature>
+ <bundle dependency='true'>mvn:com.google.guava/guava/${google-guava-version}</bundle>
+ <bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.orbitz-consul-client/${consul-client-bundle-version}</bundle>
+ <bundle>mvn:org.apache.camel/camel-consul/${project.version}</bundle>
+ </feature>
<feature name='camel-context' version='${project.version}' resolver='(obr)' start-level='50'>
<feature version='${project.version}'>camel-core</feature>
<bundle>mvn:org.apache.camel/camel-context/${project.version}</bundle>
[2/2] camel git commit: CAMEL-9888: Create a camel-consul component
Posted by lb...@apache.org.
CAMEL-9888: Create a camel-consul component
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/38d5374a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/38d5374a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/38d5374a
Branch: refs/heads/master
Commit: 38d5374aaf945ba587d7fcc06de8bac2ef4e0a48
Parents: dba22f9
Author: lburgazzoli <lb...@gmail.com>
Authored: Mon May 23 17:15:34 2016 +0200
Committer: lburgazzoli <lb...@gmail.com>
Committed: Mon May 23 17:22:25 2016 +0200
----------------------------------------------------------------------
apache-camel/pom.xml | 4 +
.../src/main/descriptors/common-bin.xml | 1 +
components/camel-consul/pom.xml | 120 ++++++++
.../consul/AbstractConsulConsumer.java | 110 +++++++
.../consul/AbstractConsulEndpoint.java | 94 ++++++
.../consul/AbstractConsulProducer.java | 129 ++++++++
.../camel/component/consul/ConsulComponent.java | 65 ++++
.../component/consul/ConsulConfiguration.java | 241 +++++++++++++++
.../camel/component/consul/ConsulConstants.java | 42 +++
.../component/consul/ConsulEndpointFactory.java | 25 ++
.../consul/enpoint/ConsulAgentActions.java | 25 ++
.../consul/enpoint/ConsulAgentEndpoint.java | 43 +++
.../consul/enpoint/ConsulAgentProducer.java | 33 ++
.../consul/enpoint/ConsulEventActions.java | 23 ++
.../consul/enpoint/ConsulEventConsumer.java | 131 ++++++++
.../consul/enpoint/ConsulEventEndpoint.java | 44 +++
.../consul/enpoint/ConsulEventProducer.java | 56 ++++
.../consul/enpoint/ConsulKeyValueActions.java | 30 ++
.../consul/enpoint/ConsulKeyValueConsumer.java | 128 ++++++++
.../consul/enpoint/ConsulKeyValueEndpoint.java | 43 +++
.../consul/enpoint/ConsulKeyValueProducer.java | 126 ++++++++
.../consul/policy/ConsulRoutePolicy.java | 308 +++++++++++++++++++
.../src/main/resources/META-INF/LICENSE.txt | 203 ++++++++++++
.../src/main/resources/META-INF/NOTICE.txt | 11 +
.../services/org/apache/camel/component/consul | 1 +
.../camel/component/consul/ConsulEventTest.java | 68 ++++
.../component/consul/ConsulEventWatchTest.java | 60 ++++
.../component/consul/ConsulKeyValueTest.java | 62 ++++
.../consul/ConsulKeyValueWatchTest.java | 66 ++++
.../component/consul/ConsulTestSupport.java | 64 ++++
.../src/test/resources/log4j.properties | 20 ++
components/pom.xml | 1 +
parent/pom.xml | 7 +
.../src/main/resources/bundles.properties | 1 +
.../features/src/main/resources/features.xml | 6 +
35 files changed, 2391 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/apache-camel/pom.xml
----------------------------------------------------------------------
diff --git a/apache-camel/pom.xml b/apache-camel/pom.xml
index 8f8c2f7..a556261 100644
--- a/apache-camel/pom.xml
+++ b/apache-camel/pom.xml
@@ -165,6 +165,10 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
+ <artifactId>camel-consul</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
<artifactId>camel-context</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/apache-camel/src/main/descriptors/common-bin.xml
----------------------------------------------------------------------
diff --git a/apache-camel/src/main/descriptors/common-bin.xml b/apache-camel/src/main/descriptors/common-bin.xml
index 3587254..5745702 100644
--- a/apache-camel/src/main/descriptors/common-bin.xml
+++ b/apache-camel/src/main/descriptors/common-bin.xml
@@ -54,6 +54,7 @@
<include>org.apache.camel:camel-core</include>
<include>org.apache.camel:camel-core-osgi</include>
<include>org.apache.camel:camel-cometd</include>
+ <include>org.apache.camel:camel-consul</include>
<include>org.apache.camel:camel-context</include>
<include>org.apache.camel:camel-couchdb</include>
<include>org.apache.camel:camel-crypto</include>
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-consul/pom.xml b/components/camel-consul/pom.xml
new file mode 100644
index 0000000..9b9dca9
--- /dev/null
+++ b/components/camel-consul/pom.xml
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>components</artifactId>
+ <version>2.18-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>camel-consul</artifactId>
+ <packaging>jar</packaging>
+ <name>Camel :: Consul</name>
+ <description>Camel Consul support</description>
+
+ <properties>
+ <camel.osgi.export.pkg>
+ org.apache.camel.component.consul.*,
+ </camel.osgi.export.pkg>
+ <camel.osgi.export.service>
+ org.apache.camel.spi.ComponentResolver;component=consul
+ </camel.osgi.export.service>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.orbitz.consul</groupId>
+ <artifactId>consul-client</artifactId>
+ <version>${consul-client-version}</version>
+ </dependency>
+
+ <!-- testing -->
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>${mockito-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>java-hamcrest</artifactId>
+ <version>${hamcrest-version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+
+ <profiles>
+ <profile>
+ <id>consul-skip-tests</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <skipTests>true</skipTests>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>consul-tests</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <skipTests>false</skipTests>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+
+ </profiles>
+
+</project>
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulConsumer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulConsumer.java
new file mode 100644
index 0000000..c3b6545
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulConsumer.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import java.math.BigInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import com.orbitz.consul.Consul;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * @author lburgazzoli
+ */
+public abstract class AbstractConsulConsumer<C> extends DefaultConsumer {
+ protected final AbstractConsulEndpoint endpoint;
+ protected final ConsulConfiguration configuration;
+ protected final String key;
+ protected final AtomicReference<BigInteger> index;
+
+ private final Function<Consul, C> clientSupplier;
+ private Runnable watcher;
+
+ protected AbstractConsulConsumer(AbstractConsulEndpoint endpoint, ConsulConfiguration configuration, Processor processor, Function<Consul, C> clientSupplier) {
+ super(endpoint, processor);
+
+ this.endpoint = endpoint;
+ this.configuration = configuration;
+ this.key = ObjectHelper.notNull(configuration.getKey(), ConsulConstants.CONSUL_KEY);
+ this.index = new AtomicReference<>(BigInteger.valueOf(configuration.getFirstIndex()));
+ this.clientSupplier = clientSupplier;
+ this.watcher = null;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+
+ watcher = createWatcher(clientSupplier.apply(endpoint.getConsul()));
+ watcher.run();
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ watcher = null;
+
+ super.doStop();
+ }
+
+ // *************************************************************************
+ //
+ // *************************************************************************
+
+ protected abstract Runnable createWatcher(C client) throws Exception;
+
+ // *************************************************************************
+ // Handlers
+ // *************************************************************************
+
+ protected abstract class AbstractWatcher implements Runnable {
+ private final C client;
+
+ public AbstractWatcher(C client) {
+ this.client = client;
+ }
+
+ protected void onError(Throwable throwable) {
+ if (isRunAllowed()) {
+ getExceptionHandler().handleException("Error watching for event " + key, throwable);
+ }
+ }
+
+ protected final void setIndex(BigInteger responseIndex) {
+ index.set(responseIndex);
+ }
+
+ @Override
+ public final void run() {
+ if (isRunAllowed()) {
+ watch(client);
+ }
+ }
+
+ protected final C client() {
+ return client;
+ }
+
+ protected final void watch() {
+ watch(client);
+ }
+
+ protected abstract void watch(C client);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulEndpoint.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulEndpoint.java
new file mode 100644
index 0000000..981c314
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulEndpoint.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import com.orbitz.consul.Consul;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.util.ObjectHelper;
+
+public abstract class AbstractConsulEndpoint extends DefaultEndpoint {
+
+ @UriPath(description = "The consul configuration")
+ @Metadata(required = "true")
+ private final ConsulConfiguration configuration;
+
+ @UriParam(description = "The API endpoint")
+ @Metadata(required = "true")
+ private final String apiEndpoint;
+
+ private Consul consul;
+
+ protected AbstractConsulEndpoint(String apiEndpoint, String uri, ConsulComponent component, ConsulConfiguration configuration) {
+ super(uri, component);
+
+ this.configuration = configuration;
+ this.apiEndpoint = apiEndpoint;
+ }
+
+ @Override
+ public boolean isSingleton() {
+ return true;
+ }
+
+ // *************************************************************************
+ //
+ // *************************************************************************
+
+ public ConsulConfiguration getConfiguration() {
+ return this.configuration;
+ }
+
+ public String getApiEndpoint() {
+ return this.apiEndpoint;
+ }
+
+ public synchronized Consul getConsul() throws Exception {
+ if (consul == null) {
+ Consul.Builder builder = Consul.builder();
+ builder.withPing(configuration.isPingInstance());
+
+ if (ObjectHelper.isNotEmpty(configuration.getUrl())) {
+ builder.withUrl(configuration.getUrl());
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getSslContextParameters())) {
+ builder.withSslContext(configuration.getSslContextParameters().createSSLContext(getCamelContext()));
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getAclToken())) {
+ builder.withAclToken(configuration.getAclToken());
+ }
+ if (configuration.requiresBasicAuthentication()) {
+ builder.withBasicAuth(configuration.getUserName(), configuration.getPassword());
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getConnectTimeoutMillis())) {
+ builder.withConnectTimeoutMillis(configuration.getConnectTimeoutMillis());
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getReadTimeoutMillis())) {
+ builder.withReadTimeoutMillis(configuration.getReadTimeoutMillis());
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getWriteTimeoutMillis())) {
+ builder.withWriteTimeoutMillis(configuration.getWriteTimeoutMillis());
+ }
+
+ consul = builder.build();
+ }
+
+ return consul;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulProducer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulProducer.java
new file mode 100644
index 0000000..2be260c
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulProducer.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import java.util.function.Function;
+
+import com.orbitz.consul.Consul;
+import org.apache.camel.Message;
+import org.apache.camel.NoSuchHeaderException;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.HeaderSelectorProducer;
+
+
+public abstract class AbstractConsulProducer<C> extends HeaderSelectorProducer {
+ private final AbstractConsulEndpoint endpoint;
+ private final ConsulConfiguration configuration;
+ private final Function<Consul, C> clientSupplier;
+ private C client;
+
+ protected AbstractConsulProducer(AbstractConsulEndpoint endpoint, ConsulConfiguration configuration, Function<Consul, C> clientSupplier) {
+ super(endpoint, ConsulConstants.CONSUL_ACTION, configuration.getAction());
+
+ this.endpoint = endpoint;
+ this.configuration = configuration;
+ this.clientSupplier = clientSupplier;
+ this.client = null;
+ }
+
+ // *************************************************************************
+ //
+ // *************************************************************************
+
+ protected Consul getConsul() throws Exception {
+ return endpoint.getConsul();
+ }
+
+ protected C getClient() throws Exception {
+ if (client == null) {
+ client = clientSupplier.apply(getConsul());
+ }
+
+ return client;
+ }
+
+ protected ConsulConfiguration getConfiguration() {
+ return configuration;
+ }
+
+ protected <D> D getHeader(Message message, String header, D defaultValue, Class<D> type) {
+ return message.getHeader(header, defaultValue, type);
+ }
+
+ protected <D> D getMandatoryHeader(Message message, String header, Class<D> type) throws Exception {
+ return getMandatoryHeader(message, header, null, type);
+ }
+
+ protected <D> D getMandatoryHeader(Message message, String header, D defaultValue, Class<D> type) throws Exception {
+ D value = getHeader(message, header, defaultValue, type);
+ if (value == null) {
+ throw new NoSuchHeaderException(message.getExchange(), header, type);
+ }
+
+ return value;
+ }
+
+ protected String getKey(Message message) {
+ return message.getHeader(
+ ConsulConstants.CONSUL_KEY,
+ configuration.getKey(),
+ String.class);
+ }
+
+ protected String getMandatoryKey(Message message) throws Exception {
+ return getMandatoryHeader(
+ message,
+ ConsulConstants.CONSUL_KEY,
+ configuration.getKey(),
+ String.class);
+ }
+
+ protected <T> T getOption(Message message, T defaultValue, Class<T> type) {
+ return message.getHeader(ConsulConstants.CONSUL_OPTIONS, defaultValue, type);
+ }
+
+ protected boolean isValueAsString(Message message) throws Exception {
+ return message.getHeader(
+ ConsulConstants.CONSUL_VALUE_AS_STRING,
+ configuration.isValueAsString(),
+ Boolean.class);
+ }
+
+ protected <T> T getBody(Message message, T defaultValue, Class<T> type) throws Exception {
+ T body = message.getBody(type);
+ if (body == null) {
+ body = defaultValue;
+ }
+
+ return body;
+ }
+
+ protected void setBodyAndResult(Message message, Object body) throws Exception {
+ setBodyAndResult(message, body, body != null);
+ }
+
+ protected void setBodyAndResult(Message message, Object body, boolean result) throws Exception {
+ message.setHeader(ConsulConstants.CONSUL_RESULT, result);
+ if (body != null) {
+ message.setBody(body);
+ }
+ }
+
+ protected Processor wrap(Function<C, Object> supplier) {
+ return exchange -> setBodyAndResult(exchange.getIn(), supplier.apply(getClient()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulComponent.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulComponent.java
new file mode 100644
index 0000000..bbd6a1c
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulComponent.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.component.consul.enpoint.ConsulAgentEndpoint;
+import org.apache.camel.component.consul.enpoint.ConsulEventEndpoint;
+import org.apache.camel.component.consul.enpoint.ConsulKeyValueEndpoint;
+import org.apache.camel.impl.UriEndpointComponent;
+
+/**
+ * Represents the component that manages {@link AbstractConsulEndpoint}.
+ */
+public class ConsulComponent extends UriEndpointComponent {
+
+ public ConsulComponent() {
+ super(AbstractConsulEndpoint.class);
+ }
+
+ public ConsulComponent(CamelContext context) {
+ super(context, AbstractConsulEndpoint.class);
+ }
+
+ @Override
+ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+ ConsulConfiguration configuration = new ConsulConfiguration();
+ setProperties(configuration, parameters);
+
+ return ConsulApiEndpoint.valueOf(remaining).create(uri, this, configuration);
+ }
+
+ private enum ConsulApiEndpoint implements ConsulEndpointFactory {
+ kv(ConsulKeyValueEndpoint::new),
+ event(ConsulEventEndpoint::new),
+ agent(ConsulAgentEndpoint::new);
+
+ private final ConsulEndpointFactory factory;
+
+ ConsulApiEndpoint(ConsulEndpointFactory factory) {
+ this.factory = factory;
+ }
+
+ @Override
+ public Endpoint create(String uri, ConsulComponent component, ConsulConfiguration configuration) throws Exception {
+ return factory.create(uri, component, configuration);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConfiguration.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConfiguration.java
new file mode 100644
index 0000000..2c0e576
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConfiguration.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.jsse.SSLContextParameters;
+
+@UriParams
+public class ConsulConfiguration {
+ @UriParam
+ private String url;
+
+ @UriParam(label = "security")
+ private SSLContextParameters sslContextParameters;
+ @UriParam(label = "security")
+ private String aclToken;
+ @UriParam(label = "security")
+ private String userName;
+ @UriParam(label = "security")
+ private String password;
+
+ @UriParam
+ private Long connectTimeoutMillis;
+ @UriParam
+ private Long readTimeoutMillis;
+ @UriParam
+ private Long writeTimeoutMillis;
+ @UriParam(defaultValue = "true")
+ private boolean pingInstance = true;
+
+
+ @UriParam(label = "producer")
+ private String action;
+
+ @UriParam(label = "producer,kv", defaultValue = "false")
+ private boolean valueAsString;
+
+ @UriParam
+ private String key;
+
+ @UriParam(label = "consumer,watch", defaultValue = "10")
+ private Integer blockSeconds = 10;
+
+ @UriParam(label = "consumer,watch", defaultValue = "0")
+ private long firstIndex;
+
+ @UriParam(label = "consumer,watch", defaultValue = "false")
+ private boolean recursive;
+
+
+ public String getUrl() {
+ return url;
+ }
+
+ /**
+ * The Consul agent URL
+ */
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public SSLContextParameters getSslContextParameters() {
+ return sslContextParameters;
+ }
+
+ /**
+ * SSL configuration using an org.apache.camel.util.jsse.SSLContextParameters
+ * instance.
+ */
+ public void setSslContextParameters(SSLContextParameters sslContextParameters) {
+ this.sslContextParameters = sslContextParameters;
+ }
+
+ public String getAclToken() {
+ return aclToken;
+ }
+
+ /**
+ * Sets the ACL token to be used with Consul
+ */
+ public void setAclToken(String aclToken) {
+ this.aclToken = aclToken;
+ }
+
+ public String getAction() {
+ return action;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ /**
+ * Sets the username to be used for basic authentication
+ */
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ /**
+ * Sets the password to be used for basic authentication
+ */
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public boolean requiresBasicAuthentication() {
+ return ObjectHelper.isNotEmpty(userName) && ObjectHelper.isNotEmpty(password);
+ }
+
+ public Long getConnectTimeoutMillis() {
+ return connectTimeoutMillis;
+ }
+
+ /**
+ * Connect timeout for OkHttpClient
+ */
+ public void setConnectTimeoutMillis(Long connectTimeoutMillis) {
+ this.connectTimeoutMillis = connectTimeoutMillis;
+ }
+
+ public Long getReadTimeoutMillis() {
+ return readTimeoutMillis;
+ }
+
+ /**
+ * Read timeout for OkHttpClient
+ */
+ public void setReadTimeoutMillis(Long readTimeoutMillis) {
+ this.readTimeoutMillis = readTimeoutMillis;
+ }
+
+ public Long getWriteTimeoutMillis() {
+ return writeTimeoutMillis;
+ }
+
+ /**
+ * Write timeout for OkHttpClient
+ */
+ public void setWritTeimeoutMillis(Long writeTimeoutMillis) {
+ this.writeTimeoutMillis = writeTimeoutMillis;
+ }
+
+ public void setWriteTimeoutMillis(Long writeTimeoutMillis) {
+ this.writeTimeoutMillis = writeTimeoutMillis;
+ }
+
+ public boolean isPingInstance() {
+ return pingInstance;
+ }
+
+ /**
+ * Configure if the AgentClient should attempt a ping before returning the Consul instance
+ */
+ public void setPingInstance(boolean pingInstance) {
+ this.pingInstance = pingInstance;
+ }
+
+ /**
+ * The default action. Can be overridden by CamelConsulAction
+ */
+ public void setAction(String action) {
+ this.action = action;
+ }
+
+ public boolean isValueAsString() {
+ return valueAsString;
+ }
+
+ /**
+ * Default to transform values retrieved from Consul i.e. on KV endpoint to
+ * string.
+ */
+ public void setValueAsString(boolean valueAsString) {
+ this.valueAsString = valueAsString;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ /**
+ * The default action. Can be overridden by CamelConsulKey
+ */
+ public void setKey(String key) {
+ this.key = key;
+ }
+
+ public Integer getBlockSeconds() {
+ return blockSeconds;
+ }
+
+ /**
+ * The second to wait for a watch event, default 10 seconds
+ */
+ public void setBlockSeconds(Integer blockSeconds) {
+ this.blockSeconds = blockSeconds;
+ }
+
+ public long getFirstIndex() {
+ return firstIndex;
+ }
+
+ /**
+ * The first index for watch for, default 0
+ */
+ public void setFirstIndex(long firstIndex) {
+ this.firstIndex = firstIndex;
+ }
+
+ public boolean isRecursive() {
+ return recursive;
+ }
+
+ /**
+ * Recursively watch, default false
+ */
+ public void setRecursive(boolean recursive) {
+ this.recursive = recursive;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java
new file mode 100644
index 0000000..923e1d7
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+public interface ConsulConstants {
+ String CONSUL_ENDPOINT_KV = "kv";
+
+ String CONSUL_ACTION = "CamelConsulAction";
+ String CONSUL_KEY = "CamelConsulKey";
+ String CONSUL_EVENT_ID = "CamelConsulEventId";
+ String CONSUL_EVENT_NAME = "CamelConsulEventName";
+ String CONSUL_EVENT_LTIME = "CamelConsulEventLTime";
+ String CONSUL_NODE_FILTER = "CamelConsulNodeFilter";
+ String CONSUL_TAG_FILTER = "CamelConsulTagFilter";
+ String CONSUL_SERVICE_FILTER = "CamelConsulSessionFilter";
+ String CONSUL_VERSION = "CamelConsulVersion";
+ String CONSUL_VALUE = "CamelConsulValue";
+ String CONSUL_VALUES = "CamelConsulValues";
+ String CONSUL_FLAGS = "CamelConsulFlags";
+ String CONSUL_CREATE_INDEX = "CamelConsulCreateIndex";
+ String CONSUL_LOCK_INDEX = "CamelConsulCreateIndex";
+ String CONSUL_MODIFY_INDEX = "CamelConsulModifyIndex";
+ String CONSUL_OPTIONS = "CamelConsulOptions";
+ String CONSUL_RESULT = "CamelConsulResult";
+ String CONSUL_SESSION = "CamelConsulSession";
+ String CONSUL_OPERATION = "CamelConsulOperation";
+ String CONSUL_VALUE_AS_STRING = "CamelConsulValueAsString";
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulEndpointFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulEndpointFactory.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulEndpointFactory.java
new file mode 100644
index 0000000..3401ff8
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulEndpointFactory.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+
+import org.apache.camel.Endpoint;
+
+@FunctionalInterface
+public interface ConsulEndpointFactory {
+ Endpoint create(String uri, ConsulComponent component, ConsulConfiguration configuration) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentActions.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentActions.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentActions.java
new file mode 100644
index 0000000..aff15ab
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentActions.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.consul.enpoint;
+
+public interface ConsulAgentActions {
+ String CHECKS = "CHECKS";
+ String SERVICES = "SERVICES";
+ String MEMBERS = "MEMBERS";
+ String AGENT = "AGENT";
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentEndpoint.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentEndpoint.java
new file mode 100644
index 0000000..9d5742d
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentEndpoint.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.consul.enpoint;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.consul.AbstractConsulEndpoint;
+import org.apache.camel.component.consul.ConsulComponent;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.spi.UriEndpoint;
+
+@UriEndpoint(scheme = "consul", title = "Consul Agent", syntax = "consul://agent", producerOnly = true, label = "api,cloud")
+public class ConsulAgentEndpoint extends AbstractConsulEndpoint {
+ public ConsulAgentEndpoint(String uri, ConsulComponent component, ConsulConfiguration configuration) {
+ super("agent", uri, component, configuration);
+ }
+
+ @Override
+ public Producer createProducer() throws Exception {
+ return new ConsulAgentProducer(this, getConfiguration());
+ }
+
+ @Override
+ public Consumer createConsumer(Processor processor) throws Exception {
+ throw new IllegalArgumentException("Not implemented");
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentProducer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentProducer.java
new file mode 100644
index 0000000..940095a
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentProducer.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.enpoint;
+
+import com.orbitz.consul.AgentClient;
+import org.apache.camel.component.consul.AbstractConsulEndpoint;
+import org.apache.camel.component.consul.AbstractConsulProducer;
+import org.apache.camel.component.consul.ConsulConfiguration;
+
+public class ConsulAgentProducer extends AbstractConsulProducer<AgentClient> {
+ ConsulAgentProducer(AbstractConsulEndpoint endpoint, ConsulConfiguration configuration) {
+ super(endpoint, configuration, c -> c.agentClient());
+
+ bind(ConsulAgentActions.CHECKS, wrap(c -> c.getChecks()));
+ bind(ConsulAgentActions.SERVICES, wrap(c -> c.getServices()));
+ bind(ConsulAgentActions.MEMBERS, wrap(c -> c.getMembers()));
+ bind(ConsulAgentActions.AGENT, wrap(c -> c.getAgent()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventActions.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventActions.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventActions.java
new file mode 100644
index 0000000..76e63ec
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventActions.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.consul.enpoint;
+
+public interface ConsulEventActions {
+ String FIRE = "FIRE";
+ String LIST = "LIST";
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventConsumer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventConsumer.java
new file mode 100644
index 0000000..f6f658c
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventConsumer.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.enpoint;
+
+import java.math.BigInteger;
+import java.util.List;
+
+import com.orbitz.consul.EventClient;
+import com.orbitz.consul.async.EventResponseCallback;
+import com.orbitz.consul.model.EventResponse;
+import com.orbitz.consul.model.event.Event;
+import com.orbitz.consul.option.QueryOptions;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.component.consul.AbstractConsulConsumer;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.component.consul.ConsulConstants;
+
+public class ConsulEventConsumer extends AbstractConsulConsumer<EventClient> {
+ protected ConsulEventConsumer(ConsulEventEndpoint endpoint, ConsulConfiguration configuration, Processor processor) {
+ super(endpoint, configuration, processor, c -> c.eventClient());
+ }
+
+ @Override
+ protected Runnable createWatcher(EventClient client) throws Exception {
+ return new EventWatcher(client);
+ }
+
+ // *************************************************************************
+ // Watch
+ // *************************************************************************
+
+ private class EventWatcher extends AbstractWatcher implements EventResponseCallback {
+ EventWatcher(EventClient client) {
+ super(client);
+ }
+
+ @Override
+ public void watch(EventClient client) {
+ client.listEvents(
+ key,
+ QueryOptions.blockSeconds(configuration.getBlockSeconds(), index.get()).build(),
+ this
+ );
+ }
+
+ @Override
+ public void onComplete(EventResponse eventResponse) {
+ if (isRunAllowed()) {
+ List<Event> events = filterEvents(eventResponse.getEvents(), index.get());
+ events.forEach(this::onEvent);
+
+ setIndex(eventResponse.getIndex());
+
+ watch();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ onError(throwable);
+ }
+
+ private void onEvent(Event event) {
+ final Exchange exchange = endpoint.createExchange();
+ final Message message = exchange.getIn();
+
+ message.setHeader(ConsulConstants.CONSUL_KEY, key);
+ message.setHeader(ConsulConstants.CONSUL_RESULT, true);
+ message.setHeader(ConsulConstants.CONSUL_EVENT_ID, event.getId());
+ message.setHeader(ConsulConstants.CONSUL_EVENT_NAME, event.getName());
+ message.setHeader(ConsulConstants.CONSUL_EVENT_LTIME, event.getLTime());
+ message.setHeader(ConsulConstants.CONSUL_NODE_FILTER, event.getNodeFilter());
+ message.setHeader(ConsulConstants.CONSUL_SERVICE_FILTER, event.getServiceFilter());
+ message.setHeader(ConsulConstants.CONSUL_TAG_FILTER, event.getTagFilter());
+ message.setHeader(ConsulConstants.CONSUL_VERSION, event.getVersion());
+ message.setBody(event.getPayload().orNull());
+
+ try {
+ getProcessor().process(exchange);
+ } catch (Exception e) {
+ getExceptionHandler().handleException("Error processing exchange", exchange, e);
+ }
+ }
+
+ /**
+ * from spring-cloud-consul (https://github.com/spring-cloud/spring-cloud-consul):
+ * spring-cloud-consul-bus/src/main/java/org/springframework/cloud/consul/bus/EventService.java
+ */
+ private List<Event> filterEvents(List<Event> toFilter, BigInteger lastIndex) {
+ List<Event> events = toFilter;
+ if (lastIndex != null) {
+ for (int i = 0; i < events.size(); i++) {
+ Event event = events.get(i);
+ BigInteger eventIndex = getEventIndexFromId(event);
+ if (eventIndex.equals(lastIndex)) {
+ events = events.subList(i + 1, events.size());
+ break;
+ }
+ }
+ }
+ return events;
+ }
+
+ private BigInteger getEventIndexFromId(Event event) {
+ String eventId = event.getId();
+ String lower = eventId.substring(0, 8) + eventId.substring(9, 13) + eventId.substring(14, 18);
+ String upper = eventId.substring(19, 23) + eventId.substring(24, 36);
+
+ BigInteger lowVal = new BigInteger(lower, 16);
+ BigInteger highVal = new BigInteger(upper, 16);
+
+ return lowVal.xor(highVal);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventEndpoint.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventEndpoint.java
new file mode 100644
index 0000000..df3254c
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventEndpoint.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.consul.enpoint;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.consul.AbstractConsulEndpoint;
+import org.apache.camel.component.consul.ConsulComponent;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.spi.UriEndpoint;
+
+
+@UriEndpoint(scheme = "consul", title = "Consul KeyValue", syntax = "consul://event", consumerClass = ConsulEventConsumer.class, label = "api,cloud")
+public class ConsulEventEndpoint extends AbstractConsulEndpoint {
+ public ConsulEventEndpoint(String uri, ConsulComponent component, ConsulConfiguration configuration) {
+ super("event", uri, component, configuration);
+ }
+
+ @Override
+ public Producer createProducer() throws Exception {
+ return new ConsulEventProducer(this, getConfiguration());
+ }
+
+ @Override
+ public Consumer createConsumer(Processor processor) throws Exception {
+ return new ConsulEventConsumer(this, getConfiguration(), processor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventProducer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventProducer.java
new file mode 100644
index 0000000..26b6595
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventProducer.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.consul.enpoint;
+
+import com.orbitz.consul.EventClient;
+import com.orbitz.consul.option.EventOptions;
+import com.orbitz.consul.option.QueryOptions;
+import org.apache.camel.InvokeOnHeader;
+import org.apache.camel.Message;
+import org.apache.camel.component.consul.AbstractConsulEndpoint;
+import org.apache.camel.component.consul.AbstractConsulProducer;
+import org.apache.camel.component.consul.ConsulConfiguration;
+
+public class ConsulEventProducer extends AbstractConsulProducer<EventClient> {
+ ConsulEventProducer(AbstractConsulEndpoint endpoint, ConsulConfiguration configuration) {
+ super(endpoint, configuration, c -> c.eventClient());
+ }
+
+ @InvokeOnHeader(ConsulEventActions.FIRE)
+ protected void fire(Message message) throws Exception {
+ setBodyAndResult(
+ message,
+ getClient().fireEvent(
+ getMandatoryKey(message),
+ getOption(message, EventOptions.BLANK, EventOptions.class),
+ message.getBody(String.class)
+ )
+ );
+ }
+
+ @InvokeOnHeader(ConsulEventActions.LIST)
+ protected void list(Message message) throws Exception {
+ setBodyAndResult(
+ message,
+ getClient().listEvents(
+ getKey(message),
+ getOption(message, QueryOptions.BLANK, QueryOptions.class)
+ )
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueActions.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueActions.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueActions.java
new file mode 100644
index 0000000..d014bbd
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueActions.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.consul.enpoint;
+
+public interface ConsulKeyValueActions {
+ String PUT = "PUT";
+ String GET_VALUE = "GET_VALUE";
+ String GET_VALUES = "GET_VALUES";
+ String GET_KEYS = "GET_KEYS";
+ String GET_SESSIONS = "GET_SESSIONS";
+ String DELETE_KEY = "DELETE_KEY";
+ String DELETE_KEYS = "DELETE_KEYS";
+ String LOCK = "LOCK";
+ String UNLOCK = "UNLOCK";
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueConsumer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueConsumer.java
new file mode 100644
index 0000000..a90d8cb
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueConsumer.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.enpoint;
+
+import java.util.List;
+
+import com.google.common.base.Optional;
+import com.orbitz.consul.KeyValueClient;
+import com.orbitz.consul.async.ConsulResponseCallback;
+import com.orbitz.consul.model.ConsulResponse;
+import com.orbitz.consul.model.kv.Value;
+import com.orbitz.consul.option.QueryOptions;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.component.consul.AbstractConsulConsumer;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.component.consul.ConsulConstants;
+
+public class ConsulKeyValueConsumer extends AbstractConsulConsumer<KeyValueClient> {
+
+ protected ConsulKeyValueConsumer(ConsulKeyValueEndpoint endpoint, ConsulConfiguration configuration, Processor processor) {
+ super(endpoint, configuration, processor, c -> c.keyValueClient());
+ }
+
+ @Override
+ protected Runnable createWatcher(KeyValueClient client) throws Exception {
+ return configuration.isRecursive() ? new RecursivePathWatcher(client) : new PathWatcher(client);
+ }
+
+ // *************************************************************************
+ // Watch
+ // *************************************************************************
+
+ private abstract class AbstractPathWatcher<T> extends AbstractWatcher implements ConsulResponseCallback<T> {
+ protected AbstractPathWatcher(KeyValueClient client) {
+ super(client);
+ }
+
+ protected QueryOptions queryOptions() {
+ return QueryOptions.blockSeconds(configuration.getBlockSeconds(), index.get()).build();
+ }
+
+ @Override
+ public void onComplete(ConsulResponse<T> consulResponse) {
+ if (isRunAllowed()) {
+ onResponse(consulResponse.getResponse());
+ setIndex(consulResponse.getIndex());
+ watch();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ onError(throwable);
+ }
+
+ protected void onValue(Value value) {
+ final Exchange exchange = endpoint.createExchange();
+ final Message message = exchange.getIn();
+
+ message.setHeader(ConsulConstants.CONSUL_KEY, value.getKey());
+ message.setHeader(ConsulConstants.CONSUL_RESULT, true);
+ message.setHeader(ConsulConstants.CONSUL_FLAGS, value.getFlags());
+ message.setHeader(ConsulConstants.CONSUL_CREATE_INDEX, value.getCreateIndex());
+ message.setHeader(ConsulConstants.CONSUL_LOCK_INDEX, value.getLockIndex());
+ message.setHeader(ConsulConstants.CONSUL_MODIFY_INDEX, value.getModifyIndex());
+ message.setHeader(ConsulConstants.CONSUL_SESSION, value.getSession().orNull());
+ message.setBody(configuration.isValueAsString() ? value.getValueAsString().orNull() : value.getValue().orNull());
+
+ try {
+ getProcessor().process(exchange);
+ } catch (Exception e) {
+ getExceptionHandler().handleException("Error processing exchange", exchange, e);
+ }
+ }
+
+ protected abstract void onResponse(T consulResponse);
+ }
+
+ private class PathWatcher extends AbstractPathWatcher<Optional<Value>> {
+ PathWatcher(KeyValueClient client) {
+ super(client);
+ }
+
+ @Override
+ public void watch(KeyValueClient client) {
+ client.getValue(key, queryOptions(), this);
+ }
+
+ @Override
+ public void onResponse(Optional<Value> value) {
+ if (value.isPresent()) {
+ onValue(value.get());
+ }
+ }
+ }
+
+ private class RecursivePathWatcher extends AbstractPathWatcher<List<Value>> {
+ RecursivePathWatcher(KeyValueClient client) {
+ super(client);
+ }
+
+ @Override
+ public void watch(KeyValueClient client) {
+ client.getValues(key, queryOptions(), this);
+ }
+
+ @Override
+ public void onResponse(List<Value> values) {
+ values.forEach(this::onValue);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueEndpoint.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueEndpoint.java
new file mode 100644
index 0000000..2910910
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueEndpoint.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.consul.enpoint;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.consul.AbstractConsulEndpoint;
+import org.apache.camel.component.consul.ConsulComponent;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.spi.UriEndpoint;
+
+@UriEndpoint(scheme = "consul", title = "Consul KeyValue", syntax = "consul://kv", consumerClass = ConsulKeyValueConsumer.class, label = "api,cloud")
+public class ConsulKeyValueEndpoint extends AbstractConsulEndpoint {
+ public ConsulKeyValueEndpoint(String uri, ConsulComponent component, ConsulConfiguration configuration) {
+ super("kv", uri, component, configuration);
+ }
+
+ @Override
+ public Producer createProducer() throws Exception {
+ return new ConsulKeyValueProducer(this, getConfiguration());
+ }
+
+ @Override
+ public Consumer createConsumer(Processor processor) throws Exception {
+ return new ConsulKeyValueConsumer(this, getConfiguration(), processor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueProducer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueProducer.java
new file mode 100644
index 0000000..9596f4c
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueProducer.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.enpoint;
+
+import com.orbitz.consul.KeyValueClient;
+import com.orbitz.consul.option.PutOptions;
+import com.orbitz.consul.option.QueryOptions;
+import org.apache.camel.InvokeOnHeader;
+import org.apache.camel.Message;
+import org.apache.camel.component.consul.AbstractConsulEndpoint;
+import org.apache.camel.component.consul.AbstractConsulProducer;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.component.consul.ConsulConstants;
+
+public class ConsulKeyValueProducer extends AbstractConsulProducer<KeyValueClient> {
+
+ ConsulKeyValueProducer(AbstractConsulEndpoint endpoint, ConsulConfiguration configuration) {
+ super(endpoint, configuration, c -> c.keyValueClient());
+ }
+
+ @InvokeOnHeader(ConsulKeyValueActions.PUT)
+ protected void put(Message message) throws Exception {
+ message.setHeader(
+ ConsulConstants.CONSUL_RESULT,
+ getClient().putValue(
+ getMandatoryKey(message),
+ message.getBody(String.class),
+ message.getHeader(ConsulConstants.CONSUL_FLAGS, 0L, Long.class),
+ getOption(message, PutOptions.BLANK, PutOptions.class)
+ )
+ );
+ }
+
+ @InvokeOnHeader(ConsulKeyValueActions.GET_VALUE)
+ protected void getValue(Message message) throws Exception {
+ Object result;
+
+ if (isValueAsString(message)) {
+ result = getClient().getValueAsString(
+ getMandatoryKey(message)
+ ).orNull();
+ } else {
+ result = getClient().getValue(
+ getMandatoryKey(message),
+ getOption(message, QueryOptions.BLANK, QueryOptions.class)
+ ).orNull();
+ }
+
+ setBodyAndResult(message, result);
+ }
+
+ @InvokeOnHeader(ConsulKeyValueActions.GET_VALUES)
+ protected void getValues(Message message) throws Exception {
+ Object result;
+
+ if (isValueAsString(message)) {
+ result = getClient().getValuesAsString(
+ getMandatoryKey(message)
+ );
+ } else {
+ result = getClient().getValues(
+ getMandatoryKey(message),
+ getOption(message, QueryOptions.BLANK, QueryOptions.class)
+ );
+ }
+
+ setBodyAndResult(message, result);
+ }
+
+ @InvokeOnHeader(ConsulKeyValueActions.GET_KEYS)
+ protected void getKeys(Message message) throws Exception {
+ setBodyAndResult(message, getClient().getKeys(getMandatoryKey(message)));
+ }
+
+ @InvokeOnHeader(ConsulKeyValueActions.GET_SESSIONS)
+ protected void getSessions(Message message) throws Exception {
+ setBodyAndResult(message, getClient().getSession(getMandatoryKey(message)));
+ }
+
+ @InvokeOnHeader(ConsulKeyValueActions.DELETE_KEY)
+ protected void deleteKey(Message message) throws Exception {
+ getClient().deleteKey(getMandatoryKey(message));
+ message.setHeader(ConsulConstants.CONSUL_RESULT, true);
+ }
+
+ @InvokeOnHeader(ConsulKeyValueActions.DELETE_KEYS)
+ protected void deleteKeys(Message message) throws Exception {
+ getClient().deleteKeys(getMandatoryKey(message));
+ message.setHeader(ConsulConstants.CONSUL_RESULT, true);
+ }
+
+ @InvokeOnHeader(ConsulKeyValueActions.LOCK)
+ protected void lock(Message message) throws Exception {
+ message.setHeader(ConsulConstants.CONSUL_RESULT,
+ getClient().acquireLock(
+ getMandatoryKey(message),
+ getBody(message, null, String.class),
+ message.getHeader(ConsulConstants.CONSUL_SESSION, "", String.class)
+ )
+ );
+ }
+
+ @InvokeOnHeader(ConsulKeyValueActions.UNLOCK)
+ protected void unlock(Message message) throws Exception {
+ message.setHeader(ConsulConstants.CONSUL_RESULT,
+ getClient().releaseLock(
+ getMandatoryKey(message),
+ getMandatoryHeader(message, ConsulConstants.CONSUL_SESSION, String.class)
+ )
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java
new file mode 100644
index 0000000..0b44e86
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.policy;
+
+import java.math.BigInteger;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.base.Optional;
+import com.orbitz.consul.Consul;
+import com.orbitz.consul.KeyValueClient;
+import com.orbitz.consul.SessionClient;
+import com.orbitz.consul.async.ConsulResponseCallback;
+import com.orbitz.consul.model.ConsulResponse;
+import com.orbitz.consul.model.kv.Value;
+import com.orbitz.consul.model.session.ImmutableSession;
+import com.orbitz.consul.option.QueryOptions;
+import org.apache.camel.Exchange;
+import org.apache.camel.NonManagedService;
+import org.apache.camel.Route;
+import org.apache.camel.support.RoutePolicySupport;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConsulRoutePolicy extends RoutePolicySupport implements NonManagedService {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConsulRoutePolicy.class);
+
+ private final Object lock;
+ private final Consul consul;
+ private final SessionClient sessionClient;
+ private final KeyValueClient keyValueClient;
+ private final AtomicBoolean leader;
+ private final Set<Route> suspendedRoutes;
+ private final AtomicReference<BigInteger> index;
+
+ private String serviceName;
+ private String servicePath;
+ private int ttl;
+ private int lockDelay;
+ private ExecutorService executorService;
+ private boolean shouldStopConsumer;
+
+ private String sessionId;
+
+ public ConsulRoutePolicy() {
+ this(Consul.builder().build());
+ }
+
+ public ConsulRoutePolicy(Consul consul) {
+ this.consul = consul;
+ this.sessionClient = consul.sessionClient();
+ this.keyValueClient = consul.keyValueClient();
+ this.suspendedRoutes = new HashSet<>();
+ this.leader = new AtomicBoolean(false);
+ this.lock = new Object();
+ this.index = new AtomicReference<>(BigInteger.valueOf(0));
+ this.serviceName = null;
+ this.servicePath = null;
+ this.ttl = 60;
+ this.lockDelay = 10;
+ this.executorService = null;
+ this.shouldStopConsumer = true;
+ this.sessionId = null;
+ }
+
+ @Override
+ public void onExchangeBegin(Route route, Exchange exchange) {
+ if (leader.get()) {
+ if (shouldStopConsumer) {
+ startConsumer(route);
+ }
+ } else {
+ if (shouldStopConsumer) {
+ stopConsumer(route);
+ }
+
+ exchange.setException(new IllegalStateException(
+ "Consul based route policy prohibits processing exchanges, stopping route and failing the exchange")
+ );
+ }
+ }
+
+ @Override
+ public void onStop(Route route) {
+ synchronized (lock) {
+ suspendedRoutes.remove(route);
+ }
+ }
+
+ @Override
+ public synchronized void onSuspend(Route route) {
+ synchronized (lock) {
+ suspendedRoutes.remove(route);
+ }
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ if (sessionId == null) {
+ sessionId = sessionClient.createSession(
+ ImmutableSession.builder()
+ .name(serviceName)
+ .ttl(ttl + "s")
+ .lockDelay(lockDelay + "s")
+ .build()
+ ).getId();
+
+ LOGGER.debug("SessionID = {}", sessionId);
+ if (executorService == null) {
+ executorService = Executors.newSingleThreadExecutor();
+ }
+
+ setLeader(keyValueClient.acquireLock(servicePath, sessionId));
+
+ executorService.submit(new Watcher());
+ }
+
+ super.doStart();
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+
+ if (sessionId != null) {
+ sessionClient.destroySession(sessionId);
+ sessionId = null;
+
+ if (executorService != null) {
+ executorService.shutdown();
+ executorService.awaitTermination(ttl / 3, TimeUnit.SECONDS);
+ }
+ }
+ }
+
+ // *************************************************************************
+ //
+ // *************************************************************************
+
+ protected void setLeader(boolean isLeader) {
+ if (isLeader && leader.compareAndSet(false, isLeader)) {
+ LOGGER.debug("Leadership taken ({}, {})", serviceName, sessionId);
+ startAllStoppedConsumers();
+ } else {
+ if (!leader.getAndSet(isLeader) && isLeader) {
+ LOGGER.debug("Leadership lost ({}, {})", serviceName, sessionId);
+ }
+ }
+ }
+
+ private void startConsumer(Route route) {
+ synchronized (lock) {
+ try {
+ if (suspendedRoutes.contains(route)) {
+ startConsumer(route.getConsumer());
+ suspendedRoutes.remove(route);
+ }
+ } catch (Exception e) {
+ handleException(e);
+ }
+ }
+ }
+
+ private void stopConsumer(Route route) {
+ synchronized (lock) {
+ try {
+ if (!suspendedRoutes.contains(route)) {
+ LOGGER.debug("Stopping consumer for {} ({})", route.getId(), route.getConsumer());
+ stopConsumer(route.getConsumer());
+ suspendedRoutes.add(route);
+ }
+ } catch (Exception e) {
+ handleException(e);
+ }
+ }
+ }
+
+ private void startAllStoppedConsumers() {
+ synchronized (lock) {
+ try {
+ for (Route route : suspendedRoutes) {
+ LOGGER.debug("Starting consumer for {} ({})", route.getId(), route.getConsumer());
+ startConsumer(route.getConsumer());
+ }
+
+ suspendedRoutes.clear();
+ } catch (Exception e) {
+ handleException(e);
+ }
+ }
+ }
+
+ // *************************************************************************
+ // Getter/Setters
+ // *************************************************************************
+
+ public Consul getConsul() {
+ return consul;
+ }
+
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ public void setServiceName(String serviceName) {
+ this.serviceName = serviceName;
+ this.servicePath = String.format("/service/%s/leader", serviceName);
+ }
+
+ public int getTtl() {
+ return ttl;
+ }
+
+ public void setTtl(int ttl) {
+ this.ttl = ttl > 10 ? ttl : 10;
+ }
+
+ public int getLockDelay() {
+ return lockDelay;
+ }
+
+ public void setLockDelay(int lockDelay) {
+ this.lockDelay = lockDelay > 10 ? lockDelay : 10;
+ }
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public void setExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
+
+ public boolean isShouldStopConsumer() {
+ return shouldStopConsumer;
+ }
+
+ public void setShouldStopConsumer(boolean shouldStopConsumer) {
+ this.shouldStopConsumer = shouldStopConsumer;
+ }
+
+ // *************************************************************************
+ // Watch
+ // *************************************************************************
+
+ private class Watcher implements Runnable, ConsulResponseCallback<Optional<Value>> {
+
+ @Override
+ public void onComplete(ConsulResponse<Optional<Value>> consulResponse) {
+ if (isRunAllowed()) {
+ Value response = consulResponse.getResponse().orNull();
+ if (response != null) {
+ String sid = response.getSession().orNull();
+ if (ObjectHelper.isEmpty(sid)) {
+ // If the key is not held by any session, try acquire a
+ // lock (become leader)
+ LOGGER.debug("Try to take leadership ...");
+ setLeader(keyValueClient.acquireLock(servicePath, sessionId));
+ } else if (!sessionId.equals(sid) && leader.get()) {
+ // Looks like I've lost leadership
+ setLeader(false);
+ }
+ }
+
+ index.set(consulResponse.getIndex());
+ run();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ handleException(throwable);
+ }
+
+ @Override
+ public void run() {
+ if (isRunAllowed()) {
+ // Refresh session
+ sessionClient.renewSession(sessionId);
+
+ keyValueClient.getValue(
+ servicePath,
+ QueryOptions.blockSeconds(ttl / 3, index.get()).build(),
+ this);
+ }
+ }
+ }
+}