You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2017/06/28 13:04:03 UTC
[1/2] camel git commit: Fix test name and logging
Repository: camel
Updated Branches:
refs/heads/master f9a911361 -> e6a7caecc
Fix test name and logging
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e59b00b2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e59b00b2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e59b00b2
Branch: refs/heads/master
Commit: e59b00b22d11fa8e2ac14b0911c4024b4cb33cf4
Parents: f9a9113
Author: lburgazzoli <lb...@gmail.com>
Authored: Wed Jun 28 10:35:00 2017 +0200
Committer: lburgazzoli <lb...@gmail.com>
Committed: Wed Jun 28 15:03:32 2017 +0200
----------------------------------------------------------------------
.../ha/ZooKeeperClientRoutePolicyTest.java | 122 -------------------
.../ha/ZooKeeperClusteredRoutePolicyTest.java | 122 +++++++++++++++++++
.../src/test/resources/log4j2.properties | 2 +-
3 files changed, 123 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/e59b00b2/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClientRoutePolicyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClientRoutePolicyTest.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClientRoutePolicyTest.java
deleted file mode 100644
index 564940d..0000000
--- a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClientRoutePolicyTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper.ha;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
-import org.apache.camel.component.zookeeper.ZooKeeperTestSupport.TestZookeeperServer;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
-import org.apache.camel.test.AvailablePortFinder;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public final class ZooKeeperClientRoutePolicyTest {
- private static final int PORT = AvailablePortFinder.getNextAvailable();
- private static final Logger LOGGER = LoggerFactory.getLogger(ZooKeeperClientRoutePolicyTest.class);
- private static final List<String> CLIENTS = IntStream.range(0, 3).mapToObj(Integer::toString).collect(Collectors.toList());
- private static final List<String> RESULTS = new ArrayList<>();
- private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(CLIENTS.size() * 2);
- private static final CountDownLatch LATCH = new CountDownLatch(CLIENTS.size());
-
- // ************************************
- // Test
- // ************************************
-
- @Test
- public void test() throws Exception {
- TestZookeeperServer server = null;
-
- try {
- server = new TestZookeeperServer(PORT, true);
- ZooKeeperTestSupport.waitForServerUp("localhost:" + PORT, 1000);
-
- for (String id : CLIENTS) {
- SCHEDULER.submit(() -> run(id));
- }
-
- LATCH.await(1, TimeUnit.MINUTES);
- SCHEDULER.shutdownNow();
-
- Assert.assertEquals(CLIENTS.size(), RESULTS.size());
- Assert.assertTrue(RESULTS.containsAll(CLIENTS));
- } finally {
- if (server != null) {
- server.shutdown();
- }
- }
- }
-
- // ************************************
- // Run a Camel node
- // ************************************
-
- private static void run(String id) {
- try {
- CountDownLatch contextLatch = new CountDownLatch(1);
-
- ZooKeeperClusterService service = new ZooKeeperClusterService();
- service.setId("node-" + id);
- service.setNodes("localhost:" + PORT);
- service.setNamespace(null );
-
- DefaultCamelContext context = new DefaultCamelContext();
- context.disableJMX();
- context.setName("context-" + id);
- context.addService(service);
- context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("/my-ns"));
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("timer:zookeeper?delay=1s&period=1s&repeatCount=1")
- .routeId("route-" + id)
- .process(e -> {
- LOGGER.debug("Node {} done", id);
- RESULTS.add(id);
- // Shutdown the context later on to give a chance to
- // other members to catch-up
- SCHEDULER.schedule(contextLatch::countDown, 2 + ThreadLocalRandom.current().nextInt(3), TimeUnit.SECONDS);
- });
- }
- });
-
- // Start the context after some random time so the startup order
- // changes for each test.
- Thread.sleep(ThreadLocalRandom.current().nextInt(500));
- context.start();
-
- contextLatch.await();
- context.stop();
-
- LATCH.countDown();
- } catch (Exception e) {
- LOGGER.warn("", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/e59b00b2/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyTest.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyTest.java
new file mode 100644
index 0000000..d73648b
--- /dev/null
+++ b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.ha;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
+import org.apache.camel.component.zookeeper.ZooKeeperTestSupport.TestZookeeperServer;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
+import org.apache.camel.test.AvailablePortFinder;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class ZooKeeperClusteredRoutePolicyTest {
+ private static final int PORT = AvailablePortFinder.getNextAvailable();
+ private static final Logger LOGGER = LoggerFactory.getLogger(ZooKeeperClusteredRoutePolicyTest.class);
+ private static final List<String> CLIENTS = IntStream.range(0, 3).mapToObj(Integer::toString).collect(Collectors.toList());
+ private static final List<String> RESULTS = new ArrayList<>();
+ private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(CLIENTS.size() * 2);
+ private static final CountDownLatch LATCH = new CountDownLatch(CLIENTS.size());
+
+ // ************************************
+ // Test
+ // ************************************
+
+ @Test
+ public void test() throws Exception {
+ TestZookeeperServer server = null;
+
+ try {
+ server = new TestZookeeperServer(PORT, true);
+ ZooKeeperTestSupport.waitForServerUp("localhost:" + PORT, 1000);
+
+ for (String id : CLIENTS) {
+ SCHEDULER.submit(() -> run(id));
+ }
+
+ LATCH.await(1, TimeUnit.MINUTES);
+ SCHEDULER.shutdownNow();
+
+ Assert.assertEquals(CLIENTS.size(), RESULTS.size());
+ Assert.assertTrue(RESULTS.containsAll(CLIENTS));
+ } finally {
+ if (server != null) {
+ server.shutdown();
+ }
+ }
+ }
+
+ // ************************************
+ // Run a Camel node
+ // ************************************
+
+ private static void run(String id) {
+ try {
+ CountDownLatch contextLatch = new CountDownLatch(1);
+
+ ZooKeeperClusterService service = new ZooKeeperClusterService();
+ service.setId("node-" + id);
+ service.setNodes("localhost:" + PORT);
+ service.setNamespace(null);
+
+ DefaultCamelContext context = new DefaultCamelContext();
+ context.disableJMX();
+ context.setName("context-" + id);
+ context.addService(service);
+ context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("/my-ns"));
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("timer:zookeeper?delay=1s&period=1s&repeatCount=1")
+ .routeId("route-" + id)
+ .process(e -> {
+ LOGGER.debug("Node {} done", id);
+ RESULTS.add(id);
+ // Shutdown the context later on to give a chance to
+ // other members to catch-up
+ SCHEDULER.schedule(contextLatch::countDown, 2 + ThreadLocalRandom.current().nextInt(3), TimeUnit.SECONDS);
+ });
+ }
+ });
+
+ // Start the context after some random time so the startup order
+ // changes for each test.
+ Thread.sleep(ThreadLocalRandom.current().nextInt(500));
+ context.start();
+
+ contextLatch.await();
+ context.stop();
+
+ LATCH.countDown();
+ } catch (Exception e) {
+ LOGGER.warn("", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e59b00b2/components/camel-zookeeper/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/test/resources/log4j2.properties b/components/camel-zookeeper/src/test/resources/log4j2.properties
index 2e7cfa5..d7bb987 100644
--- a/components/camel-zookeeper/src/test/resources/log4j2.properties
+++ b/components/camel-zookeeper/src/test/resources/log4j2.properties
@@ -43,6 +43,6 @@ logger.springframework.name = org.springframework
logger.springframework.level = WARN
rootLogger.level = INFO
#rootLogger.appenderRef.stdout.ref = out
-rootLogger.appenderRef.file.ref = out
+rootLogger.appenderRef.file.ref = file
[2/2] camel git commit: CAMEL-10715: service-call : create ZooKeeper
based ServiceDiscovery
Posted by lb...@apache.org.
CAMEL-10715: service-call : create ZooKeeper based ServiceDiscovery
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e6a7caec
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e6a7caec
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e6a7caec
Branch: refs/heads/master
Commit: e6a7caecc50621de4c9368d70e17f9ab5eba01e4
Parents: e59b00b
Author: lburgazzoli <lb...@gmail.com>
Authored: Wed Jun 28 12:54:34 2017 +0200
Committer: lburgazzoli <lb...@gmail.com>
Committed: Wed Jun 28 15:03:33 2017 +0200
----------------------------------------------------------------------
.../ServiceCallConfigurationDefinition.java | 20 +-
.../model/cloud/ServiceCallDefinition.java | 20 +-
...erviceCallServiceDiscoveryConfiguration.java | 193 +++++++++++++++
components/camel-zookeeper/pom.xml | 11 +-
.../ZooKeeperCuratorConfiguration.java | 247 +++++++++++++++++++
.../zookeeper/ZooKeeperCuratorHelper.java | 71 ++++++
.../cloud/ZooKeeperServiceDiscovery.java | 144 +++++++++++
.../cloud/ZooKeeperServiceDiscoveryFactory.java | 225 +++++++++++++++++
.../zookeeper/ha/ZooKeeperClusterService.java | 196 +++++++--------
.../zookeeper/ha/ZooKeeperClusterView.java | 7 +-
.../camel/cloud/zookeeper-service-discovery | 17 ++
.../SpringZooKeeperServiceCallRouteTest.java | 128 ++++++++++
.../cloud/ZooKeeperServiceCallRouteTest.java | 150 +++++++++++
.../cloud/ZooKeeperServiceDiscoveryTest.java | 111 +++++++++
.../ha/ZooKeeperClusteredRoutePolicyTest.java | 4 +-
.../SpringZooKeeperServiceCallRouteTest.xml | 65 +++++
parent/pom.xml | 2 +-
.../features/src/main/resources/features.xml | 3 +
18 files changed, 1493 insertions(+), 121 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/camel-core/src/main/java/org/apache/camel/model/cloud/ServiceCallConfigurationDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/cloud/ServiceCallConfigurationDefinition.java b/camel-core/src/main/java/org/apache/camel/model/cloud/ServiceCallConfigurationDefinition.java
index d74b3a8..abd1eb4 100644
--- a/camel-core/src/main/java/org/apache/camel/model/cloud/ServiceCallConfigurationDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/cloud/ServiceCallConfigurationDefinition.java
@@ -74,7 +74,8 @@ public class ServiceCallConfigurationDefinition extends IdentifiedType {
@XmlElement(name = "dnsServiceDiscovery", type = DnsServiceCallServiceDiscoveryConfiguration.class),
@XmlElement(name = "etcdServiceDiscovery", type = EtcdServiceCallServiceDiscoveryConfiguration.class),
@XmlElement(name = "kubernetesServiceDiscovery", type = KubernetesServiceCallServiceDiscoveryConfiguration.class),
- @XmlElement(name = "staticServiceDiscovery", type = StaticServiceCallServiceDiscoveryConfiguration.class)}
+ @XmlElement(name = "staticServiceDiscovery", type = StaticServiceCallServiceDiscoveryConfiguration.class),
+ @XmlElement(name = "zookeeperServiceDiscovery", type = ZooKeeperServiceCallServiceDiscoveryConfiguration.class)}
)
private ServiceCallServiceDiscoveryConfiguration serviceDiscoveryConfiguration;
@@ -562,6 +563,23 @@ public class ServiceCallConfigurationDefinition extends IdentifiedType {
return this;
}
+ public ZooKeeperServiceCallServiceDiscoveryConfiguration zookeeperServiceDiscovery() {
+ ZooKeeperServiceCallServiceDiscoveryConfiguration conf = new ZooKeeperServiceCallServiceDiscoveryConfiguration();
+ setServiceDiscoveryConfiguration(conf);
+
+ return conf;
+ }
+
+ public ServiceCallConfigurationDefinition zookeeperServiceDiscovery(String nodes, String basePath) {
+ ZooKeeperServiceCallServiceDiscoveryConfiguration conf = new ZooKeeperServiceCallServiceDiscoveryConfiguration();
+ conf.setNodes(nodes);
+ conf.setBasePath(basePath);
+
+ setServiceDiscoveryConfiguration(conf);
+
+ return this;
+ }
+
// *****************************
// Shortcuts - ServiceFilter
// *****************************
http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/camel-core/src/main/java/org/apache/camel/model/cloud/ServiceCallDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/cloud/ServiceCallDefinition.java b/camel-core/src/main/java/org/apache/camel/model/cloud/ServiceCallDefinition.java
index 283d701..74ee07d 100644
--- a/camel-core/src/main/java/org/apache/camel/model/cloud/ServiceCallDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/cloud/ServiceCallDefinition.java
@@ -103,7 +103,8 @@ public class ServiceCallDefinition extends NoOutputDefinition<ServiceCallDefinit
@XmlElement(name = "dnsServiceDiscovery", type = DnsServiceCallServiceDiscoveryConfiguration.class),
@XmlElement(name = "etcdServiceDiscovery", type = EtcdServiceCallServiceDiscoveryConfiguration.class),
@XmlElement(name = "kubernetesServiceDiscovery", type = KubernetesServiceCallServiceDiscoveryConfiguration.class),
- @XmlElement(name = "staticServiceDiscovery", type = StaticServiceCallServiceDiscoveryConfiguration.class)}
+ @XmlElement(name = "staticServiceDiscovery", type = StaticServiceCallServiceDiscoveryConfiguration.class),
+ @XmlElement(name = "zookeeperServiceDiscovery", type = ZooKeeperServiceCallServiceDiscoveryConfiguration.class)}
)
private ServiceCallServiceDiscoveryConfiguration serviceDiscoveryConfiguration;
@@ -651,6 +652,23 @@ public class ServiceCallDefinition extends NoOutputDefinition<ServiceCallDefinit
return conf;
}
+ public ZooKeeperServiceCallServiceDiscoveryConfiguration zookeeperServiceDiscovery() {
+ ZooKeeperServiceCallServiceDiscoveryConfiguration conf = new ZooKeeperServiceCallServiceDiscoveryConfiguration(this);
+ setServiceDiscoveryConfiguration(conf);
+
+ return conf;
+ }
+
+ public ServiceCallDefinition zookeeperServiceDiscovery(String nodes, String basePath) {
+ ZooKeeperServiceCallServiceDiscoveryConfiguration conf = new ZooKeeperServiceCallServiceDiscoveryConfiguration(this);
+ conf.setNodes(nodes);
+ conf.setBasePath(basePath);
+
+ setServiceDiscoveryConfiguration(conf);
+
+ return this;
+ }
+
// *****************************
// Shortcuts - ServiceFilter
// *****************************
http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/camel-core/src/main/java/org/apache/camel/model/cloud/ZooKeeperServiceCallServiceDiscoveryConfiguration.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/cloud/ZooKeeperServiceCallServiceDiscoveryConfiguration.java b/camel-core/src/main/java/org/apache/camel/model/cloud/ZooKeeperServiceCallServiceDiscoveryConfiguration.java
new file mode 100644
index 0000000..3bc9ecb
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/model/cloud/ZooKeeperServiceCallServiceDiscoveryConfiguration.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model.cloud;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.camel.spi.Metadata;
+
+@Metadata(label = "routing,cloud,service-discovery")
+@XmlRootElement(name = "zookeeperServiceDiscovery")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class ZooKeeperServiceCallServiceDiscoveryConfiguration extends ServiceCallServiceDiscoveryConfiguration {
+ @XmlAttribute(required = true)
+ private String nodes;
+ @XmlAttribute
+ private String namespace;
+ @XmlAttribute
+ private String reconnectBaseSleepTime;
+ @XmlAttribute
+ private String reconnectMaxSleepTime;
+ @XmlAttribute
+ private Integer reconnectMaxRetries;
+ @XmlAttribute
+ private String sessionTimeout;
+ @XmlAttribute
+ private String connectionTimeout;
+ @XmlAttribute(required = true)
+ private String basePath;
+
+
+ public ZooKeeperServiceCallServiceDiscoveryConfiguration() {
+ this(null);
+ }
+
+ public ZooKeeperServiceCallServiceDiscoveryConfiguration(ServiceCallDefinition parent) {
+ super(parent, "zookeeper-service-discovery");
+ }
+
+ // *************************************************************************
+ // Getter/Setter
+ // *************************************************************************
+
+ public String getNodes() {
+ return nodes;
+ }
+
+ /**
+ * A comma separate list of servers to connect to in the form host:port
+ */
+ public void setNodes(String nodes) {
+ this.nodes = nodes;
+ }
+
+ public String getNamespace() {
+ return namespace;
+ }
+
+ /**
+ * As ZooKeeper is a shared space, users of a given cluster should stay within
+ * a pre-defined namespace. If a namespace is set here, all paths will get pre-pended
+ * with the namespace
+ */
+ public void setNamespace(String namespace) {
+ this.namespace = namespace;
+ }
+
+ public String getReconnectBaseSleepTime() {
+ return reconnectBaseSleepTime;
+ }
+
+ /**
+ * Initial amount of time to wait between retries.
+ */
+ public void setReconnectBaseSleepTime(String reconnectBaseSleepTime) {
+ this.reconnectBaseSleepTime = reconnectBaseSleepTime;
+ }
+
+ public String getReconnectMaxSleepTime() {
+ return reconnectMaxSleepTime;
+ }
+
+ /**
+ * Max time in ms to sleep on each retry
+ */
+ public void setReconnectMaxSleepTime(String reconnectMaxSleepTime) {
+ this.reconnectMaxSleepTime = reconnectMaxSleepTime;
+ }
+
+ public Integer getReconnectMaxRetries() {
+ return reconnectMaxRetries;
+ }
+
+ /**
+ * Max number of times to retry
+ */
+ public void setReconnectMaxRetries(Integer reconnectMaxRetries) {
+ this.reconnectMaxRetries = reconnectMaxRetries;
+ }
+
+ public String getSessionTimeout() {
+ return sessionTimeout;
+ }
+
+ /**
+ * Session timeout.
+ */
+ public void setSessionTimeout(String sessionTimeout) {
+ this.sessionTimeout = sessionTimeout;
+ }
+
+ public String getConnectionTimeout() {
+ return connectionTimeout;
+ }
+
+ /**
+ * Connection timeout.
+ */
+ public void setConnectionTimeout(String connectionTimeout) {
+ this.connectionTimeout = connectionTimeout;
+ }
+
+ public String getBasePath() {
+ return basePath;
+ }
+
+ /**
+ * Set the base path to store in ZK
+ */
+ public void setBasePath(String basePath) {
+ this.basePath = basePath;
+ }
+
+ // *************************************************************************
+ // Fluent API
+ // *************************************************************************
+
+ public ZooKeeperServiceCallServiceDiscoveryConfiguration nodes(String nodes) {
+ setNodes(nodes);
+ return this;
+ }
+
+ public ZooKeeperServiceCallServiceDiscoveryConfiguration namespace(String namespace) {
+ setNamespace(namespace);
+ return this;
+ }
+
+ public ZooKeeperServiceCallServiceDiscoveryConfiguration reconnectBaseSleepTime(String reconnectBaseSleepTime) {
+ setReconnectBaseSleepTime(reconnectBaseSleepTime);
+ return this;
+ }
+
+ public ZooKeeperServiceCallServiceDiscoveryConfiguration reconnectMaxSleepTime(String reconnectMaxSleepTime) {
+ setReconnectMaxSleepTime(reconnectMaxSleepTime);
+ return this;
+ }
+
+ public ZooKeeperServiceCallServiceDiscoveryConfiguration reconnectMaxRetries(int reconnectMaxRetries) {
+ setReconnectMaxRetries(reconnectMaxRetries);
+ return this;
+ }
+
+ public ZooKeeperServiceCallServiceDiscoveryConfiguration sessionTimeout(String sessionTimeout) {
+ setSessionTimeout(sessionTimeout);
+ return this;
+ }
+
+ public ZooKeeperServiceCallServiceDiscoveryConfiguration connectionTimeout(String connectionTimeout) {
+ setConnectionTimeout(connectionTimeout);
+ return this;
+ }
+
+ public ZooKeeperServiceCallServiceDiscoveryConfiguration basePath(String basePath) {
+ setBasePath(basePath);
+ return this;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/components/camel-zookeeper/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/pom.xml b/components/camel-zookeeper/pom.xml
index a3278b3..afb1b16 100644
--- a/components/camel-zookeeper/pom.xml
+++ b/components/camel-zookeeper/pom.xml
@@ -81,11 +81,20 @@
<version>${curator-version}</version>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-x-discovery</artifactId>
+ <version>${curator-version}</version>
+ </dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.camel</groupId>
+ <artifactId>camel-jetty</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
<artifactId>camel-test-spring</artifactId>
<scope>test</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperCuratorConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperCuratorConfiguration.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperCuratorConfiguration.java
new file mode 100644
index 0000000..7b773d0
--- /dev/null
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperCuratorConfiguration.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.AuthInfo;
+import org.apache.curator.framework.CuratorFramework;
+
+public class ZooKeeperCuratorConfiguration implements Cloneable {
+ private CuratorFramework curatorFramework;
+ private List<String> nodes;
+ private String namespace;
+ private long reconnectBaseSleepTime;
+ private TimeUnit reconnectBaseSleepTimeUnit;
+ private int reconnectMaxRetries;
+ private long reconnectMaxSleepTime;
+ private TimeUnit reconnectMaxSleepTimeUnit;
+ private long sessionTimeout;
+ private TimeUnit sessionTimeoutUnit;
+ private long connectionTimeout;
+ private TimeUnit connectionTimeoutUnit;
+ private List<AuthInfo> authInfoList;
+ private long maxCloseWait;
+ private TimeUnit maxCloseWaitUnit;
+ private RetryPolicy retryPolicy;
+ private String basePath;
+
+ public ZooKeeperCuratorConfiguration() {
+ this.reconnectBaseSleepTime = 1000;
+ this.reconnectBaseSleepTimeUnit = TimeUnit.MILLISECONDS;
+ this.reconnectMaxSleepTime = Integer.MAX_VALUE;
+ this.reconnectMaxSleepTimeUnit = TimeUnit.MILLISECONDS;
+ this.reconnectMaxRetries = 3;
+
+ // from org.apache.curator.framework.CuratorFrameworkFactory
+ this.sessionTimeout = Integer.getInteger("curator-default-session-timeout", 60 * 1000);
+ this.sessionTimeoutUnit = TimeUnit.MILLISECONDS;
+
+ // from org.apache.curator.framework.CuratorFrameworkFactory
+ this.connectionTimeout = Integer.getInteger("curator-default-connection-timeout", 15 * 1000);
+ this.connectionTimeoutUnit = TimeUnit.MILLISECONDS;
+
+ // from org.apache.curator.framework.CuratorFrameworkFactory
+ this.maxCloseWait = 1000;
+ this.maxCloseWaitUnit = TimeUnit.MILLISECONDS;
+ }
+
+ // *******************************
+ // Properties
+ // *******************************
+
+ public CuratorFramework getCuratorFramework() {
+ return curatorFramework;
+ }
+
+ public void setCuratorFramework(CuratorFramework curatorFramework) {
+ this.curatorFramework = curatorFramework;
+ }
+
+ public List<String> getNodes() {
+ return nodes;
+ }
+
+ public void setNodes(String nodes) {
+ this.nodes = Collections.unmodifiableList(
+ Arrays.stream(nodes.split(",")).collect(Collectors.toList())
+ );
+ }
+
+ public void setNodes(List<String> nodes) {
+ this.nodes = Collections.unmodifiableList(new ArrayList<>(nodes));
+ }
+
+ public String getNamespace() {
+ return namespace;
+ }
+
+ public void setNamespace(String namespace) {
+ this.namespace = namespace;
+ }
+
+ public long getReconnectBaseSleepTime() {
+ return reconnectBaseSleepTime;
+ }
+
+ public void setReconnectBaseSleepTime(long reconnectBaseSleepTime) {
+ this.reconnectBaseSleepTime = reconnectBaseSleepTime;
+ }
+
+ public void setReconnectBaseSleepTime(long reconnectBaseSleepTime, TimeUnit reconnectBaseSleepTimeUnit) {
+ this.reconnectBaseSleepTime = reconnectBaseSleepTime;
+ this.reconnectBaseSleepTimeUnit = reconnectBaseSleepTimeUnit;
+ }
+
+ public TimeUnit getReconnectBaseSleepTimeUnit() {
+ return reconnectBaseSleepTimeUnit;
+ }
+
+ public void setReconnectBaseSleepTimeUnit(TimeUnit reconnectBaseSleepTimeUnit) {
+ this.reconnectBaseSleepTimeUnit = reconnectBaseSleepTimeUnit;
+ }
+
+ public long getReconnectMaxSleepTime() {
+ return reconnectMaxSleepTime;
+ }
+
+ public void setReconnectMaxSleepTime(long reconnectMaxSleepTime) {
+ this.reconnectMaxSleepTime = reconnectMaxSleepTime;
+ }
+
+ public void setReconnectMaxSleepTime(long reconnectMaxSleepTime, TimeUnit reconnectBaseSleepTimeUnit) {
+ this.reconnectMaxSleepTime = reconnectMaxSleepTime;
+ this.reconnectBaseSleepTimeUnit = reconnectBaseSleepTimeUnit;
+ }
+
+ public TimeUnit getReconnectMaxSleepTimeUnit() {
+ return reconnectMaxSleepTimeUnit;
+ }
+
+ public void setReconnectMaxSleepTimeUnit(TimeUnit reconnectMaxSleepTimeUnit) {
+ this.reconnectMaxSleepTimeUnit = reconnectMaxSleepTimeUnit;
+ }
+
+ public int getReconnectMaxRetries() {
+ return reconnectMaxRetries;
+ }
+
+ public void setReconnectMaxRetries(int reconnectMaxRetries) {
+ this.reconnectMaxRetries = reconnectMaxRetries;
+ }
+
+ public long getSessionTimeout() {
+ return sessionTimeout;
+ }
+
+ public void setSessionTimeout(long sessionTimeout) {
+ this.sessionTimeout = sessionTimeout;
+ }
+
+ public void setSessionTimeout(long sessionTimeout, TimeUnit sessionTimeoutUnit) {
+ this.sessionTimeout = sessionTimeout;
+ this.sessionTimeoutUnit = sessionTimeoutUnit;
+ }
+
+ public TimeUnit getSessionTimeoutUnit() {
+ return sessionTimeoutUnit;
+ }
+
+ public void setSessionTimeoutUnit(TimeUnit sessionTimeoutUnit) {
+ this.sessionTimeoutUnit = sessionTimeoutUnit;
+ }
+
+ public long getConnectionTimeout() {
+ return connectionTimeout;
+ }
+
+ public void setConnectionTimeout(long connectionTimeout) {
+ this.connectionTimeout = connectionTimeout;
+ }
+
+ public void setConnectionTimeout(long connectionTimeout, TimeUnit connectionTimeotUnit) {
+ this.connectionTimeout = connectionTimeout;
+ this.connectionTimeoutUnit = connectionTimeotUnit;
+ }
+
+ public TimeUnit getConnectionTimeoutUnit() {
+ return connectionTimeoutUnit;
+ }
+
+ public void setConnectionTimeoutUnit(TimeUnit connectionTimeoutUnit) {
+ this.connectionTimeoutUnit = connectionTimeoutUnit;
+ }
+
+ public List<AuthInfo> getAuthInfoList() {
+ return authInfoList;
+ }
+
+ public void setAuthInfoList(List<AuthInfo> authInfoList) {
+ this.authInfoList = authInfoList;
+ }
+
+ public long getMaxCloseWait() {
+ return maxCloseWait;
+ }
+
+ public void setMaxCloseWait(long maxCloseWait) {
+ this.maxCloseWait = maxCloseWait;
+ }
+
+ public TimeUnit getMaxCloseWaitUnit() {
+ return maxCloseWaitUnit;
+ }
+
+ public void setMaxCloseWaitUnit(TimeUnit maxCloseWaitUnit) {
+ this.maxCloseWaitUnit = maxCloseWaitUnit;
+ }
+
+ public RetryPolicy getRetryPolicy() {
+ return retryPolicy;
+ }
+
+ public void setRetryPolicy(RetryPolicy retryPolicy) {
+ this.retryPolicy = retryPolicy;
+ }
+
+ public String getBasePath() {
+ return basePath;
+ }
+
+ public void setBasePath(String basePath) {
+ this.basePath = basePath;
+ }
+
+ // *******************************
+ // Clone
+ // *******************************
+
+ public ZooKeeperCuratorConfiguration copy() {
+ try {
+ return (ZooKeeperCuratorConfiguration)clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeCamelException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperCuratorHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperCuratorHelper.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperCuratorHelper.java
new file mode 100644
index 0000000..2ee0385
--- /dev/null
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperCuratorHelper.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper;
+
+import java.util.Optional;
+
+import org.apache.camel.util.ObjectHelper;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.x.discovery.ServiceDiscovery;
+import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
+import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
+
+public final class ZooKeeperCuratorHelper {
+ private ZooKeeperCuratorHelper() {
+ }
+
+ public static CuratorFramework createCurator(ZooKeeperCuratorConfiguration configuration) throws Exception {
+ CuratorFramework curator = configuration.getCuratorFramework();
+ if (curator == null) {
+ // Validate parameters
+ ObjectHelper.notNull(configuration.getNodes(), "ZooKeeper Nodes");
+
+ RetryPolicy retryPolicy = configuration.getRetryPolicy();
+ if (retryPolicy == null) {
+ retryPolicy = new ExponentialBackoffRetry(
+ (int)configuration.getReconnectBaseSleepTimeUnit().toMillis(configuration.getReconnectBaseSleepTime()),
+ (int)configuration.getReconnectMaxSleepTimeUnit().toMillis(configuration.getReconnectMaxSleepTime()),
+ configuration.getReconnectMaxRetries());
+ }
+
+ CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
+ .connectString(String.join(",", configuration.getNodes()))
+ .sessionTimeoutMs((int) configuration.getSessionTimeoutUnit().toMillis(configuration.getSessionTimeout()))
+ .connectionTimeoutMs((int) configuration.getConnectionTimeoutUnit().toMillis(configuration.getConnectionTimeout()))
+ .maxCloseWaitMs((int) configuration.getMaxCloseWaitUnit().toMillis(configuration.getMaxCloseWait()))
+ .retryPolicy(retryPolicy);
+
+ Optional.ofNullable(configuration.getNamespace()).ifPresent(builder::namespace);
+ Optional.ofNullable(configuration.getAuthInfoList()).ifPresent(builder::authorization);
+
+ curator = builder.build();
+ }
+
+ return curator;
+ }
+
+ public static <T> ServiceDiscovery<T> createServiceDiscovery(ZooKeeperCuratorConfiguration configuration, CuratorFramework curator, Class<T> payloadType) {
+ return ServiceDiscoveryBuilder.builder(payloadType)
+ .client(curator)
+ .basePath(configuration.getBasePath())
+ .serializer(new JsonInstanceSerializer<>(payloadType))
+ .build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceDiscovery.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceDiscovery.java
new file mode 100644
index 0000000..3a7dde9
--- /dev/null
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceDiscovery.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.cloud;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.cloud.ServiceDefinition;
+import org.apache.camel.component.zookeeper.ZooKeeperCuratorConfiguration;
+import org.apache.camel.component.zookeeper.ZooKeeperCuratorHelper;
+import org.apache.camel.impl.cloud.DefaultServiceDefinition;
+import org.apache.camel.impl.cloud.DefaultServiceDiscovery;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.x.discovery.ServiceDiscovery;
+import org.codehaus.jackson.map.annotate.JsonRootName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZooKeeperServiceDiscovery extends DefaultServiceDiscovery {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ZooKeeperServiceDiscovery.class);
+
+ private final ZooKeeperCuratorConfiguration configuration;
+ private final boolean managedInstance;
+ private CuratorFramework curator;
+ private ServiceDiscovery<MetaData> serviceDiscovery;
+
+ public ZooKeeperServiceDiscovery(ZooKeeperCuratorConfiguration configuration) {
+ this.configuration = configuration;
+ this.curator = configuration.getCuratorFramework();
+ this.managedInstance = Objects.isNull(curator);
+ }
+
+ // *********************************************
+ // Lifecycle
+ // *********************************************
+
+ @Override
+ protected void doStart() throws Exception {
+ if (curator == null) {
+ // Validation
+ ObjectHelper.notNull(getCamelContext(), "Camel Context");
+ ObjectHelper.notNull(configuration.getBasePath(), "ZooKeeper base path");
+
+ LOGGER.debug("Starting ZooKeeper Curator with namespace '{}', nodes: '{}'",
+ configuration.getNamespace(),
+ String.join(",", configuration.getNodes())
+ );
+
+ curator = ZooKeeperCuratorHelper.createCurator(configuration);
+ curator.start();
+ }
+
+ if (serviceDiscovery == null) {
+ // Validation
+ ObjectHelper.notNull(configuration.getBasePath(), "ZooKeeper base path");
+
+ LOGGER.debug("Starting ZooKeeper ServiceDiscoveryBuilder with base path '{}'",
+ configuration.getBasePath()
+ );
+
+ serviceDiscovery = ZooKeeperCuratorHelper.createServiceDiscovery(configuration, curator, MetaData.class);
+ serviceDiscovery.start();
+ }
+
+ super.doStart();
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+
+ if (serviceDiscovery != null) {
+ try {
+ serviceDiscovery.close();
+ } catch (Exception e) {
+ LOGGER.warn("Error closing Curator ServiceDiscovery", e);
+ }
+ }
+
+ if (curator != null && managedInstance) {
+ curator.close();
+ }
+ }
+
+ // *********************************************
+ // Implementation
+ // *********************************************
+
+ @Override
+ public List<ServiceDefinition> getServices(String name) {
+ if (serviceDiscovery == null) {
+ return Collections.emptyList();
+ }
+
+ try {
+ return serviceDiscovery.queryForInstances(name).stream()
+ .map(si -> {
+ Map<String, String> meta = new HashMap<>();
+ ObjectHelper.ifNotEmpty(si.getPayload(), meta::putAll);
+
+ meta.put("service_name", si.getName());
+ meta.put("service_id", si.getId());
+ meta.put("service_type", si.getServiceType().name());
+
+ return new DefaultServiceDefinition(
+ si.getName(),
+ si.getAddress(),
+ si.getSslPort() != null ? si.getSslPort() : si.getPort(),
+ meta);
+ })
+ .collect(Collectors.toList());
+ } catch (Exception e) {
+ throw new RuntimeCamelException(e);
+ }
+ }
+
+ // *********************************************
+ // Helpers
+ // *********************************************
+
+ @JsonRootName("meta")
+ public static final class MetaData extends HashMap<String, String> {
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceDiscoveryFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceDiscoveryFactory.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceDiscoveryFactory.java
new file mode 100644
index 0000000..b3b0604
--- /dev/null
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceDiscoveryFactory.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.cloud;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.cloud.ServiceDiscovery;
+import org.apache.camel.cloud.ServiceDiscoveryFactory;
+import org.apache.camel.component.zookeeper.ZooKeeperCuratorConfiguration;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.AuthInfo;
+import org.apache.curator.framework.CuratorFramework;
+
+public class ZooKeeperServiceDiscoveryFactory implements ServiceDiscoveryFactory {
+
+ private ZooKeeperCuratorConfiguration configuration;
+
+ public ZooKeeperServiceDiscoveryFactory() {
+ this.configuration = new ZooKeeperCuratorConfiguration();
+ }
+
+ public ZooKeeperServiceDiscoveryFactory(ZooKeeperCuratorConfiguration configuration) {
+ this.configuration = configuration.copy();
+ }
+
+ // *********************************************
+ // Properties
+ // *********************************************
+
+ public ZooKeeperCuratorConfiguration getConfiguration() {
+ return configuration;
+ }
+
+ public void setConfiguration(ZooKeeperCuratorConfiguration configuration) {
+ this.configuration = configuration.copy();
+ }
+
+ public CuratorFramework getCuratorFramework() {
+ return configuration.getCuratorFramework();
+ }
+
+ public void setCuratorFramework(CuratorFramework curatorFramework) {
+ configuration.setCuratorFramework(curatorFramework);
+ }
+
+ public List<String> getNodes() {
+ return configuration.getNodes();
+ }
+
+ public void setNodes(String nodes) {
+ configuration.setNodes(nodes);
+ }
+
+ public void setNodes(List<String> nodes) {
+ configuration.setNodes(nodes);
+ }
+
+ public String getNamespace() {
+ return configuration.getNamespace();
+ }
+
+ public void setNamespace(String namespace) {
+ configuration.setNamespace(namespace);
+ }
+
+ public long getReconnectBaseSleepTime() {
+ return configuration.getReconnectBaseSleepTime();
+ }
+
+ public void setReconnectBaseSleepTime(long reconnectBaseSleepTime) {
+ configuration.setReconnectBaseSleepTime(reconnectBaseSleepTime);
+ }
+
+ public void setReconnectBaseSleepTime(long reconnectBaseSleepTime, TimeUnit reconnectBaseSleepTimeUnit) {
+ configuration.setReconnectBaseSleepTime(reconnectBaseSleepTime, reconnectBaseSleepTimeUnit);
+ }
+
+ public TimeUnit getReconnectBaseSleepTimeUnit() {
+ return configuration.getReconnectBaseSleepTimeUnit();
+ }
+
+ public void setReconnectBaseSleepTimeUnit(TimeUnit reconnectBaseSleepTimeUnit) {
+ configuration.setReconnectBaseSleepTimeUnit(reconnectBaseSleepTimeUnit);
+ }
+
+ public long getReconnectMaxSleepTime() {
+ return configuration.getReconnectMaxSleepTime();
+ }
+
+ public void setReconnectMaxSleepTime(long reconnectMaxSleepTime) {
+ configuration.setReconnectMaxSleepTime(reconnectMaxSleepTime);
+ }
+
+ public void setReconnectMaxSleepTime(long reconnectMaxSleepTime, TimeUnit reconnectBaseSleepTimeUnit) {
+ configuration.setReconnectMaxSleepTime(reconnectMaxSleepTime, reconnectBaseSleepTimeUnit);
+ }
+
+ public TimeUnit getReconnectMaxSleepTimeUnit() {
+ return configuration.getReconnectMaxSleepTimeUnit();
+ }
+
+ public void setReconnectMaxSleepTimeUnit(TimeUnit reconnectMaxSleepTimeUnit) {
+ configuration.setReconnectMaxSleepTimeUnit(reconnectMaxSleepTimeUnit);
+ }
+
+ public int getReconnectMaxRetries() {
+ return configuration.getReconnectMaxRetries();
+ }
+
+ public void setReconnectMaxRetries(int reconnectMaxRetries) {
+ configuration.setReconnectMaxRetries(reconnectMaxRetries);
+ }
+
+ public long getSessionTimeout() {
+ return configuration.getSessionTimeout();
+ }
+
+ public void setSessionTimeout(long sessionTimeout) {
+ configuration.setSessionTimeout(sessionTimeout);
+ }
+
+ public void setSessionTimeout(long sessionTimeout, TimeUnit sessionTimeoutUnit) {
+ configuration.setSessionTimeout(sessionTimeout, sessionTimeoutUnit);
+ }
+
+ public TimeUnit getSessionTimeoutUnit() {
+ return configuration.getSessionTimeoutUnit();
+ }
+
+ public void setSessionTimeoutUnit(TimeUnit sessionTimeoutUnit) {
+ configuration.setSessionTimeoutUnit(sessionTimeoutUnit);
+ }
+
+ public long getConnectionTimeout() {
+ return configuration.getConnectionTimeout();
+ }
+
+ public void setConnectionTimeout(long connectionTimeout) {
+ configuration.setConnectionTimeout(connectionTimeout);
+ }
+
+ public void setConnectionTimeout(long connectionTimeout, TimeUnit connectionTimeotUnit) {
+ configuration.setConnectionTimeout(connectionTimeout, connectionTimeotUnit);
+ }
+
+ public TimeUnit getConnectionTimeoutUnit() {
+ return configuration.getConnectionTimeoutUnit();
+ }
+
+ public void setConnectionTimeoutUnit(TimeUnit connectionTimeoutUnit) {
+ configuration.setConnectionTimeoutUnit(connectionTimeoutUnit);
+ }
+
+ public ZooKeeperCuratorConfiguration copy() {
+ return configuration.copy();
+ }
+
+ public List<AuthInfo> getAuthInfoList() {
+ return configuration.getAuthInfoList();
+ }
+
+ public void setAuthInfoList(List<AuthInfo> authInfoList) {
+ configuration.setAuthInfoList(authInfoList);
+ }
+
+ public long getMaxCloseWait() {
+ return configuration.getMaxCloseWait();
+ }
+
+ public void setMaxCloseWait(long maxCloseWait) {
+ configuration.setMaxCloseWait(maxCloseWait);
+ }
+
+ public TimeUnit getMaxCloseWaitUnit() {
+ return configuration.getMaxCloseWaitUnit();
+ }
+
+ public void setMaxCloseWaitUnit(TimeUnit maxCloseWaitUnit) {
+ configuration.setMaxCloseWaitUnit(maxCloseWaitUnit);
+ }
+
+ public RetryPolicy getRetryPolicy() {
+ return configuration.getRetryPolicy();
+ }
+
+ public void setRetryPolicy(RetryPolicy retryPolicy) {
+ configuration.setRetryPolicy(retryPolicy);
+ }
+
+ public String getBasePath() {
+ return configuration.getBasePath();
+ }
+
+ public void setBasePath(String basePath) {
+ configuration.setBasePath(basePath);
+ }
+
+ // *********************************************
+ // Factory
+ // *********************************************
+
+ @Override
+ public ServiceDiscovery newInstance(CamelContext context) throws Exception {
+ ZooKeeperServiceDiscovery discovery = new ZooKeeperServiceDiscovery(configuration);
+ discovery.setCamelContext(context);
+
+ return discovery;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterService.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterService.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterService.java
index 389d83c..6af1269 100644
--- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterService.java
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterService.java
@@ -16,205 +16,182 @@
*/
package org.apache.camel.component.zookeeper.ha;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
-import java.util.Optional;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
+import org.apache.camel.component.zookeeper.ZooKeeperCuratorConfiguration;
+import org.apache.camel.component.zookeeper.ZooKeeperCuratorHelper;
import org.apache.camel.impl.ha.AbstractCamelClusterService;
import org.apache.camel.util.ObjectHelper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.AuthInfo;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ZooKeeperClusterService extends AbstractCamelClusterService<ZooKeeperClusterView> {
private static final Logger LOGGER = LoggerFactory.getLogger(ZooKeeperClusterService.class);
- private CuratorFramework client;
- private List<String> nodes;
- private String namespace;
- private long reconnectBaseSleepTime;
- private TimeUnit reconnectBaseSleepTimeUnit;
- private int reconnectMaxRetries;
- private long sessionTimeout;
- private TimeUnit sessionTimeoutUnit;
- private long connectionTimeout;
- private TimeUnit connectionTimeotUnit;
- private List<AuthInfo> authInfoList;
- private long maxCloseWait;
- private TimeUnit maxCloseWaitUnit;
- private boolean closeOnStop;
- private RetryPolicy retryPolicy;
+ private CuratorFramework curator;
+ private ZooKeeperCuratorConfiguration configuration;
+ private boolean managedInstance;
public ZooKeeperClusterService() {
- this.reconnectBaseSleepTime = 1000;
- this.reconnectBaseSleepTimeUnit = TimeUnit.MILLISECONDS;
- this.reconnectMaxRetries = 3;
- this.closeOnStop = true;
-
- // from org.apache.curator.framework.CuratorFrameworkFactory
- this.sessionTimeout = Integer.getInteger("curator-default-session-timeout", 60 * 1000);
- this.sessionTimeoutUnit = TimeUnit.MILLISECONDS;
-
- // from org.apache.curator.framework.CuratorFrameworkFactory
- this.connectionTimeout = Integer.getInteger("curator-default-connection-timeout", 15 * 1000);
- this.connectionTimeotUnit = TimeUnit.MILLISECONDS;
+ this.configuration = new ZooKeeperCuratorConfiguration();
+ this.managedInstance = true;
+ }
- // from org.apache.curator.framework.CuratorFrameworkFactory
- this.maxCloseWait = 1000;
- this.maxCloseWaitUnit = TimeUnit.MILLISECONDS;
+ public ZooKeeperClusterService(ZooKeeperCuratorConfiguration configuration) {
+ this.configuration = configuration.copy();
+ this.managedInstance = true;
}
// *********************************************
// Properties
// *********************************************
- public CuratorFramework getClient() {
- return client;
+ public ZooKeeperCuratorConfiguration getConfiguration() {
+ return configuration;
+ }
+
+ public void setConfiguration(ZooKeeperCuratorConfiguration configuration) {
+ this.configuration = configuration.copy();
+ }
+
+ public CuratorFramework getCuratorFramework() {
+ return configuration.getCuratorFramework();
}
- public void setClient(CuratorFramework client) {
- this.client = client;
+ public void setCuratorFramework(CuratorFramework curatorFramework) {
+ configuration.setCuratorFramework(curatorFramework);
}
public List<String> getNodes() {
- return nodes;
+ return configuration.getNodes();
}
public void setNodes(String nodes) {
- this.nodes = Collections.unmodifiableList(
- Arrays.stream(nodes.split(",")).collect(Collectors.toList())
- );
+ configuration.setNodes(nodes);
}
public void setNodes(List<String> nodes) {
- this.nodes = Collections.unmodifiableList(new ArrayList<>(nodes));
+ configuration.setNodes(nodes);
}
public String getNamespace() {
- return namespace;
+ return configuration.getNamespace();
}
public void setNamespace(String namespace) {
- this.namespace = namespace;
+ configuration.setNamespace(namespace);
}
public long getReconnectBaseSleepTime() {
- return reconnectBaseSleepTime;
+ return configuration.getReconnectBaseSleepTime();
}
public void setReconnectBaseSleepTime(long reconnectBaseSleepTime) {
- this.reconnectBaseSleepTime = reconnectBaseSleepTime;
+ configuration.setReconnectBaseSleepTime(reconnectBaseSleepTime);
}
public void setReconnectBaseSleepTime(long reconnectBaseSleepTime, TimeUnit reconnectBaseSleepTimeUnit) {
- this.reconnectBaseSleepTime = reconnectBaseSleepTime;
- this.reconnectBaseSleepTimeUnit = reconnectBaseSleepTimeUnit;
+ configuration.setReconnectBaseSleepTime(reconnectBaseSleepTime, reconnectBaseSleepTimeUnit);
}
public TimeUnit getReconnectBaseSleepTimeUnit() {
- return reconnectBaseSleepTimeUnit;
+ return configuration.getReconnectBaseSleepTimeUnit();
}
public void setReconnectBaseSleepTimeUnit(TimeUnit reconnectBaseSleepTimeUnit) {
- this.reconnectBaseSleepTimeUnit = reconnectBaseSleepTimeUnit;
+ configuration.setReconnectBaseSleepTimeUnit(reconnectBaseSleepTimeUnit);
}
public int getReconnectMaxRetries() {
- return reconnectMaxRetries;
+ return configuration.getReconnectMaxRetries();
}
public void setReconnectMaxRetries(int reconnectMaxRetries) {
- this.reconnectMaxRetries = reconnectMaxRetries;
+ configuration.setReconnectMaxRetries(reconnectMaxRetries);
}
public long getSessionTimeout() {
- return sessionTimeout;
+ return configuration.getSessionTimeout();
}
public void setSessionTimeout(long sessionTimeout) {
- this.sessionTimeout = sessionTimeout;
+ configuration.setSessionTimeout(sessionTimeout);
}
public void setSessionTimeout(long sessionTimeout, TimeUnit sessionTimeoutUnit) {
- this.sessionTimeout = sessionTimeout;
- this.sessionTimeoutUnit = sessionTimeoutUnit;
+ configuration.setSessionTimeout(sessionTimeout, sessionTimeoutUnit);
}
public TimeUnit getSessionTimeoutUnit() {
- return sessionTimeoutUnit;
+ return configuration.getSessionTimeoutUnit();
}
public void setSessionTimeoutUnit(TimeUnit sessionTimeoutUnit) {
- this.sessionTimeoutUnit = sessionTimeoutUnit;
+ configuration.setSessionTimeoutUnit(sessionTimeoutUnit);
}
public long getConnectionTimeout() {
- return connectionTimeout;
+ return configuration.getConnectionTimeout();
}
public void setConnectionTimeout(long connectionTimeout) {
- this.connectionTimeout = connectionTimeout;
+ configuration.setConnectionTimeout(connectionTimeout);
}
public void setConnectionTimeout(long connectionTimeout, TimeUnit connectionTimeotUnit) {
- this.connectionTimeout = connectionTimeout;
- this.connectionTimeotUnit = connectionTimeotUnit;
+ configuration.setConnectionTimeout(connectionTimeout, connectionTimeotUnit);
}
public TimeUnit getConnectionTimeotUnit() {
- return connectionTimeotUnit;
+ return configuration.getConnectionTimeoutUnit();
}
public void setConnectionTimeotUnit(TimeUnit connectionTimeotUnit) {
- this.connectionTimeotUnit = connectionTimeotUnit;
+ configuration.setConnectionTimeoutUnit(connectionTimeotUnit);
}
public List<AuthInfo> getAuthInfoList() {
- return authInfoList;
+ return configuration.getAuthInfoList();
}
public void setAuthInfoList(List<AuthInfo> authInfoList) {
- this.authInfoList = authInfoList;
+ configuration.setAuthInfoList(authInfoList);
}
public long getMaxCloseWait() {
- return maxCloseWait;
+ return configuration.getMaxCloseWait();
}
public void setMaxCloseWait(long maxCloseWait) {
- this.maxCloseWait = maxCloseWait;
+ configuration.setMaxCloseWait(maxCloseWait);
}
public TimeUnit getMaxCloseWaitUnit() {
- return maxCloseWaitUnit;
+ return configuration.getMaxCloseWaitUnit();
}
public void setMaxCloseWaitUnit(TimeUnit maxCloseWaitUnit) {
- this.maxCloseWaitUnit = maxCloseWaitUnit;
+ configuration.setMaxCloseWaitUnit(maxCloseWaitUnit);
}
- public boolean isCloseOnStop() {
- return closeOnStop;
+ public RetryPolicy getRetryPolicy() {
+ return configuration.getRetryPolicy();
}
- public void setCloseOnStop(boolean closeOnStop) {
- this.closeOnStop = closeOnStop;
+ public void setRetryPolicy(RetryPolicy retryPolicy) {
+ configuration.setRetryPolicy(retryPolicy);
}
- public RetryPolicy getRetryPolicy() {
- return retryPolicy;
+ public String getBasePath() {
+ return configuration.getBasePath();
}
- public void setRetryPolicy(RetryPolicy retryPolicy) {
- this.retryPolicy = retryPolicy;
+ public void setBasePath(String basePath) {
+ configuration.setBasePath(basePath);
}
// *********************************************
@@ -223,13 +200,18 @@ public class ZooKeeperClusterService extends AbstractCamelClusterService<ZooKeep
@Override
protected ZooKeeperClusterView createView(String namespace) throws Exception {
- return new ZooKeeperClusterView(this, getOrCreateClient(), namespace);
+
+ // Validation
+ ObjectHelper.notNull(getCamelContext(), "Camel Context");
+ ObjectHelper.notNull(configuration.getBasePath(), "ZooKeeper base path");
+
+ return new ZooKeeperClusterView(this, configuration, getOrCreateCurator(), namespace);
}
@Override
protected void doStart() throws Exception {
// instantiate a new CuratorFramework
- getOrCreateClient();
+ getOrCreateCurator();
super.doStart();
}
@@ -238,42 +220,30 @@ public class ZooKeeperClusterService extends AbstractCamelClusterService<ZooKeep
protected void doStop() throws Exception {
super.doStop();
- if (client != null && closeOnStop) {
- client.close();
+ if (curator != null && managedInstance) {
+ curator.close();
}
}
- private CuratorFramework getOrCreateClient() throws Exception {
- if (client == null) {
- // Validate parameters
- ObjectHelper.notNull(getCamelContext(), "Camel Context");
- ObjectHelper.notNull(nodes, "ZooKeeper Nodes");
-
- CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
- .connectString(String.join(",", nodes))
- .sessionTimeoutMs((int)sessionTimeoutUnit.toMillis(sessionTimeout))
- .connectionTimeoutMs((int)connectionTimeotUnit.toMillis(connectionTimeout))
- .maxCloseWaitMs((int)maxCloseWaitUnit.toMillis(maxCloseWait))
- .retryPolicy(retryPolicy());
+ private CuratorFramework getOrCreateCurator() throws Exception {
+ if (curator == null) {
+ curator = configuration.getCuratorFramework();
- Optional.ofNullable(namespace).ifPresent(builder::namespace);
- Optional.ofNullable(authInfoList).ifPresent(builder::authorization);
+ if (curator == null) {
+ managedInstance = true;
- LOGGER.debug("Connect to ZooKeeper with namespace {}, nodes: {}", namespace, nodes);
- client = builder.build();
+ LOGGER.debug("Starting ZooKeeper Curator with namespace '{}', nodes: '{}'",
+ configuration.getNamespace(),
+ String.join(",", configuration.getNodes())
+ );
- LOGGER.debug("Starting ZooKeeper client");
- client.start();
+ curator = ZooKeeperCuratorHelper.createCurator(configuration);
+ curator.start();
+ } else {
+ managedInstance = false;
+ }
}
- return this.client;
- }
-
- private RetryPolicy retryPolicy() {
- return retryPolicy != null
- ? retryPolicy
- : new ExponentialBackoffRetry(
- (int)reconnectBaseSleepTimeUnit.toMillis(reconnectBaseSleepTime),
- reconnectMaxRetries);
+ return curator;
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java
index 86e530f..acb1ad8 100644
--- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.zookeeper.ZooKeeperCuratorConfiguration;
import org.apache.camel.ha.CamelClusterMember;
import org.apache.camel.ha.CamelClusterService;
import org.apache.camel.impl.ha.AbstractCamelClusterView;
@@ -36,14 +37,16 @@ import org.slf4j.LoggerFactory;
final class ZooKeeperClusterView extends AbstractCamelClusterView {
private static final Logger LOGGER = LoggerFactory.getLogger(ZooKeeperClusterView.class);
+ private final ZooKeeperCuratorConfiguration configuration;
private final CuratorFramework client;
private final CuratorLocalMember localMember;
private LeaderSelector leaderSelector;
- public ZooKeeperClusterView(CamelClusterService cluster, CuratorFramework client, String namespace) {
+ public ZooKeeperClusterView(CamelClusterService cluster, ZooKeeperCuratorConfiguration configuration, CuratorFramework client, String namespace) {
super(cluster, namespace);
this.localMember = new CuratorLocalMember();
+ this.configuration = configuration;
this.client = client;
}
@@ -84,7 +87,7 @@ final class ZooKeeperClusterView extends AbstractCamelClusterView {
@Override
protected void doStart() throws Exception {
if (leaderSelector == null) {
- leaderSelector = new LeaderSelector(client, getNamespace(), new CamelLeaderElectionListener());
+ leaderSelector = new LeaderSelector(client, configuration.getBasePath(), new CamelLeaderElectionListener());
leaderSelector.setId(getClusterService().getId());
leaderSelector.start();
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/components/camel-zookeeper/src/main/resources/META-INF/services/org/apache/camel/cloud/zookeeper-service-discovery
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/resources/META-INF/services/org/apache/camel/cloud/zookeeper-service-discovery b/components/camel-zookeeper/src/main/resources/META-INF/services/org/apache/camel/cloud/zookeeper-service-discovery
new file mode 100644
index 0000000..6d8290de
--- /dev/null
+++ b/components/camel-zookeeper/src/main/resources/META-INF/services/org/apache/camel/cloud/zookeeper-service-discovery
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+class=org.apache.camel.component.zookeeper.cloud.ZooKeeperServiceDiscoveryFactory
http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/cloud/SpringZooKeeperServiceCallRouteTest.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/cloud/SpringZooKeeperServiceCallRouteTest.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/cloud/SpringZooKeeperServiceCallRouteTest.java
new file mode 100644
index 0000000..6f3ba10
--- /dev/null
+++ b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/cloud/SpringZooKeeperServiceCallRouteTest.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.cloud;
+
+import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
+import org.apache.camel.test.spring.CamelSpringTestSupport;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.x.discovery.ServiceDiscovery;
+import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
+import org.apache.curator.x.discovery.ServiceInstance;
+import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
+import org.junit.Test;
+import org.springframework.context.support.AbstractApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class SpringZooKeeperServiceCallRouteTest extends CamelSpringTestSupport {
+ private static final int SERVER_PORT = 9001;
+ private static final String SERVICE_NAME = "http-service";
+ private static final String SERVICE_PATH = "/camel";
+
+ private ZooKeeperTestSupport.TestZookeeperServer server;
+ private CuratorFramework curator;
+ private ServiceDiscovery<ZooKeeperServiceDiscovery.MetaData> discovery;
+
+ // ***********************
+ // Setup / tear down
+ // ***********************
+
+ @Override
+ public void doPreSetup() throws Exception {
+ super.doPreSetup();
+
+ server = new ZooKeeperTestSupport.TestZookeeperServer(SERVER_PORT, true);
+ ZooKeeperTestSupport.waitForServerUp("127.0.0.1:" + SERVER_PORT, 1000);
+
+ curator = CuratorFrameworkFactory.builder()
+ .connectString("127.0.0.1:" + SERVER_PORT)
+ .retryPolicy(new ExponentialBackoffRetry(1000, 3))
+ .build();
+
+ discovery = ServiceDiscoveryBuilder.builder(ZooKeeperServiceDiscovery.MetaData.class)
+ .client(curator)
+ .basePath(SERVICE_PATH)
+ .serializer(new JsonInstanceSerializer<>(ZooKeeperServiceDiscovery.MetaData.class))
+ .build();
+
+ curator.start();
+ discovery.start();
+
+ discovery.registerService(
+ ServiceInstance.<ZooKeeperServiceDiscovery.MetaData>builder()
+ .address("127.0.0.1")
+ .port(9011)
+ .name(SERVICE_NAME)
+ .id("service-1")
+ .build());
+
+ discovery.registerService(
+ ServiceInstance.<ZooKeeperServiceDiscovery.MetaData>builder()
+ .address("127.0.0.1")
+ .port(9012)
+ .name(SERVICE_NAME)
+ .id("service-2")
+ .build());
+
+ discovery.registerService(
+ ServiceInstance.<ZooKeeperServiceDiscovery.MetaData>builder()
+ .address("127.0.0.1")
+ .port(9013)
+ .name(SERVICE_NAME)
+ .id("service-3")
+ .build());
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+
+ CloseableUtils.closeQuietly(discovery);
+ CloseableUtils.closeQuietly(curator);
+
+ if (server != null) {
+ server.shutdown();
+ }
+ }
+
+ // ***********************
+ // Test
+ // ***********************
+
+ @Test
+ public void testServiceCall() throws Exception {
+ getMockEndpoint("mock:result").expectedMessageCount(3);
+ getMockEndpoint("mock:result").expectedBodiesReceivedInAnyOrder("ping 9011", "ping 9012", "ping 9013");
+
+ template.sendBody("direct:start", "ping");
+ template.sendBody("direct:start", "ping");
+ template.sendBody("direct:start", "ping");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ // ***********************
+ // Routes
+ // ***********************
+
+ @Override
+ protected AbstractApplicationContext createApplicationContext() {
+ return new ClassPathXmlApplicationContext("org/apache/camel/component/zookeeper/cloud/SpringZooKeeperServiceCallRouteTest.xml");
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceCallRouteTest.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceCallRouteTest.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceCallRouteTest.java
new file mode 100644
index 0000000..7e244a4
--- /dev/null
+++ b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceCallRouteTest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.cloud;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.x.discovery.ServiceDiscovery;
+import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
+import org.apache.curator.x.discovery.ServiceInstance;
+import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
+import org.junit.Test;
+
+public class ZooKeeperServiceCallRouteTest extends CamelTestSupport {
+ private static final int SERVER_PORT = AvailablePortFinder.getNextAvailable();
+ private static final String SERVICE_NAME = "http-service";
+ private static final int SERVICE_COUNT = 5;
+ private static final String SERVICE_PATH = "/camel";
+
+ private ZooKeeperTestSupport.TestZookeeperServer server;
+ private CuratorFramework curator;
+ private ServiceDiscovery<ZooKeeperServiceDiscovery.MetaData> discovery;
+ private List<ServiceInstance<ZooKeeperServiceDiscovery.MetaData>> instances;
+ private List<String> expectedBodies;
+
+ // *************************************************************************
+ // Setup / tear down
+ // *************************************************************************
+
+ @Override
+ protected void doPreSetup() throws Exception {
+ super.doPreSetup();
+
+ server = new ZooKeeperTestSupport.TestZookeeperServer(SERVER_PORT, true);
+ ZooKeeperTestSupport.waitForServerUp("127.0.0.1:" + SERVER_PORT, 1000);
+
+ curator = CuratorFrameworkFactory.builder()
+ .connectString("127.0.0.1:" + SERVER_PORT)
+ .retryPolicy(new ExponentialBackoffRetry(1000, 3))
+ .build();
+
+ discovery = ServiceDiscoveryBuilder.builder(ZooKeeperServiceDiscovery.MetaData.class)
+ .client(curator)
+ .basePath(SERVICE_PATH)
+ .serializer(new JsonInstanceSerializer<>(ZooKeeperServiceDiscovery.MetaData.class))
+ .build();
+
+ curator.start();
+ discovery.start();
+
+ instances = new ArrayList<>(SERVICE_COUNT);
+ expectedBodies = new ArrayList<>(SERVICE_COUNT);
+
+ for (int i = 0; i < SERVICE_COUNT; i++) {
+ ServiceInstance<ZooKeeperServiceDiscovery.MetaData> instance = ServiceInstance.<ZooKeeperServiceDiscovery.MetaData>builder()
+ .address("127.0.0.1")
+ .port(AvailablePortFinder.getNextAvailable())
+ .name(SERVICE_NAME)
+ .id("service-" + i)
+ .build();
+
+ discovery.registerService(instance);
+ instances.add(instance);
+ expectedBodies.add("ping on " + instance.getPort());
+ }
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+
+ for (ServiceInstance<ZooKeeperServiceDiscovery.MetaData> instace : instances) {
+ try {
+ discovery.unregisterService(instace);
+ } catch (Exception e) {
+ // Ignore
+ }
+ }
+
+ CloseableUtils.closeQuietly(discovery);
+ CloseableUtils.closeQuietly(curator);
+
+ server.shutdown();
+ }
+
+ // *************************************************************************
+ // Test
+ // *************************************************************************
+
+ @Test
+ public void testServiceCall() throws Exception {
+ getMockEndpoint("mock:result").expectedMessageCount(SERVICE_COUNT);
+ getMockEndpoint("mock:result").expectedBodiesReceivedInAnyOrder(expectedBodies);
+
+ instances.forEach(r -> template.sendBody("direct:start", "ping"));
+
+ assertMockEndpointsSatisfied();
+ }
+
+ // *************************************************************************
+ // Route
+ // *************************************************************************
+
+ @Override
+ protected RoutesBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .serviceCall()
+ .name(SERVICE_NAME)
+ .component("jetty")
+ .defaultLoadBalancer()
+ .zookeeperServiceDiscovery("127.0.0.1:" + SERVER_PORT, SERVICE_PATH)
+ .end()
+ .to("log:org.apache.camel.component.zookeeper.cloud?level=INFO&showAll=true&multiline=true")
+ .to("mock:result");
+
+ instances.forEach(r ->
+ fromF("jetty:http://%s:%d", r.getAddress(), r.getPort())
+ .transform().simple("${in.body} on " + r.getPort())
+ );
+ }
+ };
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceDiscoveryTest.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceDiscoveryTest.java
new file mode 100644
index 0000000..e6d5551
--- /dev/null
+++ b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceDiscoveryTest.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.cloud;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.cloud.ServiceDefinition;
+import org.apache.camel.component.zookeeper.ZooKeeperCuratorConfiguration;
+import org.apache.camel.component.zookeeper.ZooKeeperCuratorHelper;
+import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.x.discovery.ServiceDiscovery;
+import org.apache.curator.x.discovery.ServiceInstance;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class ZooKeeperServiceDiscoveryTest {
+
+ @Test
+ public void testServiceDiscovery() throws Exception {
+ ZooKeeperCuratorConfiguration configuration = new ZooKeeperCuratorConfiguration();
+ ServiceDiscovery<ZooKeeperServiceDiscovery.MetaData> zkDiscovery = null;
+ ZooKeeperTestSupport.TestZookeeperServer server = null;
+ int port = AvailablePortFinder.getNextAvailable();
+
+ try {
+ server = new ZooKeeperTestSupport.TestZookeeperServer(port, true);
+ ZooKeeperTestSupport.waitForServerUp("localhost:" + port, 1000);
+
+ configuration.setBasePath("/camel");
+ configuration.setCuratorFramework(CuratorFrameworkFactory.builder()
+ .connectString("localhost:" + port)
+ .retryPolicy(new ExponentialBackoffRetry(1000, 3))
+ .build()
+ );
+
+ zkDiscovery = ZooKeeperCuratorHelper.createServiceDiscovery(
+ configuration,
+ configuration.getCuratorFramework(),
+ ZooKeeperServiceDiscovery.MetaData.class
+ );
+
+ configuration.getCuratorFramework().start();
+ zkDiscovery.start();
+
+ List<ServiceInstance<ZooKeeperServiceDiscovery.MetaData>> instances = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ ServiceInstance<ZooKeeperServiceDiscovery.MetaData> instance = ServiceInstance.<ZooKeeperServiceDiscovery.MetaData>builder()
+ .address("127.0.0.1")
+ .port(AvailablePortFinder.getNextAvailable())
+ .name("my-service")
+ .id("service-" + i)
+ .build();
+
+ zkDiscovery.registerService(instance);
+ instances.add(instance);
+ }
+
+ ZooKeeperServiceDiscovery discovery = new ZooKeeperServiceDiscovery(configuration);
+ discovery.start();
+
+ List<ServiceDefinition> services = discovery.getServices("my-service");
+ assertNotNull(services);
+ assertEquals(3, services.size());
+
+ for (ServiceDefinition service : services) {
+ Assert.assertEquals(
+ 1,
+ instances.stream()
+ .filter(
+ i -> {
+ return i.getPort() == service.getPort()
+ && i.getAddress().equals(service.getHost())
+ && i.getId().equals(service.getMetadata().get("service_id"))
+ && i.getName().equals(service.getName());
+ }
+ ).count()
+ );
+ }
+
+ } finally {
+ CloseableUtils.closeQuietly(zkDiscovery);
+ CloseableUtils.closeQuietly(configuration.getCuratorFramework());
+
+ if (server != null) {
+ server.shutdown();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyTest.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyTest.java
index d73648b..1c32787 100644
--- a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyTest.java
+++ b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyTest.java
@@ -84,13 +84,13 @@ public final class ZooKeeperClusteredRoutePolicyTest {
ZooKeeperClusterService service = new ZooKeeperClusterService();
service.setId("node-" + id);
service.setNodes("localhost:" + PORT);
- service.setNamespace(null);
+ service.setBasePath("/camel");
DefaultCamelContext context = new DefaultCamelContext();
context.disableJMX();
context.setName("context-" + id);
context.addService(service);
- context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("/my-ns"));
+ context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns"));
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/components/camel-zookeeper/src/test/resources/org/apache/camel/component/zookeeper/cloud/SpringZooKeeperServiceCallRouteTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/test/resources/org/apache/camel/component/zookeeper/cloud/SpringZooKeeperServiceCallRouteTest.xml b/components/camel-zookeeper/src/test/resources/org/apache/camel/component/zookeeper/cloud/SpringZooKeeperServiceCallRouteTest.xml
new file mode 100644
index 0000000..482ed76
--- /dev/null
+++ b/components/camel-zookeeper/src/test/resources/org/apache/camel/component/zookeeper/cloud/SpringZooKeeperServiceCallRouteTest.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://camel.apache.org/schema/spring
+ http://camel.apache.org/schema/spring/camel-spring.xsd">
+
+ <camelContext xmlns="http://camel.apache.org/schema/spring">
+
+ <!-- ************************************* -->
+ <!-- Routes -->
+ <!-- ************************************* -->
+
+ <route id="scall">
+ <from uri="direct:start"/>
+ <serviceCall name="http-service" component="jetty">
+ <zookeeperServiceDiscovery nodes="localhost:9001" basePath="/camel"/>
+ </serviceCall>
+ <to uri="mock:result"/>
+ </route>
+
+ <route>
+ <from uri="jetty:http://localhost:9011"/>
+ <transform>
+ <simple>${body} 9011</simple>
+ </transform>
+ </route>
+
+ <route>
+ <from uri="jetty:http://localhost:9012"/>
+ <transform>
+ <simple>${body} 9012</simple>
+ </transform>
+ </route>
+
+ <route>
+ <from uri="jetty:http://localhost:9013"/>
+ <transform>
+ <simple>${body} 9013</simple>
+ </transform>
+ </route>
+
+ </camelContext>
+
+</beans>
http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index fa39932..a322298 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -167,7 +167,7 @@
<cobertura-maven-plugin-version>2.7</cobertura-maven-plugin-version>
<couchbase-client-version>1.4.13</couchbase-client-version>
<couchbase-client-bundle-version>1.4.13_1</couchbase-client-bundle-version>
- <curator-version>2.11.1</curator-version>
+ <curator-version>2.12.0</curator-version>
<cxf-version>3.1.11</cxf-version>
<cxf-version-range>[3.0,4.0)</cxf-version-range>
<cxf-xjc-plugin-version>3.0.5</cxf-xjc-plugin-version>
http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/platforms/karaf/features/src/main/resources/features.xml
----------------------------------------------------------------------
diff --git a/platforms/karaf/features/src/main/resources/features.xml b/platforms/karaf/features/src/main/resources/features.xml
index 82e57c0..b976d97 100644
--- a/platforms/karaf/features/src/main/resources/features.xml
+++ b/platforms/karaf/features/src/main/resources/features.xml
@@ -2197,7 +2197,10 @@
<bundle dependency='true'>mvn:org.apache.curator/curator-framework/${curator-version}</bundle>
<bundle dependency='true'>mvn:org.apache.curator/curator-client/${curator-version}</bundle>
<bundle dependency='true'>mvn:org.apache.curator/curator-recipes/${curator-version}</bundle>
+ <bundle dependency='true'>mvn:org.apache.curator/curator-x-discovery/${curator-version}</bundle>
<bundle dependency='true'>mvn:com.google.guava/guava/${zookeeper-guava-version}</bundle>
+ <bundle dependency='true'>mvn:org.codehaus.jackson/jackson-core-asl/${jackson-version}</bundle>
+ <bundle dependency='true'>mvn:org.codehaus.jackson/jackson-mapper-asl/${jackson-version}</bundle>
<bundle>mvn:org.apache.camel/camel-zookeeper/${project.version}</bundle>
</feature>
<feature name='camel-zookeeper-master' version='${project.version}' resolver='(obr)' start-level='50'>