You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2017/06/26 15:30:19 UTC

[1/2] camel git commit: camel-atomix: makes cluster service simpler

Repository: camel
Updated Branches:
  refs/heads/master 2993c5145 -> 37a15c727


camel-atomix: makes cluster service simpler


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c7384488
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c7384488
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c7384488

Branch: refs/heads/master
Commit: c73844880d659267fef9f662a1459c0ca3b6dd4f
Parents: 2993c51
Author: lburgazzoli <lb...@gmail.com>
Authored: Mon Jun 26 15:26:25 2017 +0200
Committer: lburgazzoli <lb...@gmail.com>
Committed: Mon Jun 26 15:26:25 2017 +0200

----------------------------------------------------------------------
 .../atomix/AtomixConfigurationAware.java        | 29 --------------------
 .../atomix/ha/AtomixClusterClientService.java   |  7 ++---
 .../atomix/ha/AtomixClusterService.java         |  7 ++---
 .../component/atomix/ha/AtomixClusterView.java  |  8 ++----
 4 files changed, 7 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c7384488/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfigurationAware.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfigurationAware.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfigurationAware.java
deleted file mode 100644
index b13de80..0000000
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfigurationAware.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.atomix;
-
-public interface AtomixConfigurationAware<C extends AtomixConfiguration> {
-    /**
-     * @return the configuration
-     */
-    C getConfiguration();
-
-    /**
-     * @param configuration the configuration
-     */
-    void setConfiguration(C configuration);
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/c7384488/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterClientService.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterClientService.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterClientService.java
index 00695c0..cc135ca 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterClientService.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterClientService.java
@@ -22,7 +22,6 @@ import io.atomix.AtomixClient;
 import io.atomix.catalyst.transport.Address;
 import io.atomix.catalyst.transport.Transport;
 import org.apache.camel.CamelContext;
-import org.apache.camel.component.atomix.AtomixConfigurationAware;
 import org.apache.camel.component.atomix.client.AtomixClientConfiguration;
 import org.apache.camel.component.atomix.client.AtomixClientHelper;
 import org.apache.camel.impl.ha.AbstractCamelClusterService;
@@ -30,7 +29,7 @@ import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public final class AtomixClusterClientService extends AbstractCamelClusterService<AtomixClusterView> implements AtomixConfigurationAware<AtomixClientConfiguration> {
+public final class AtomixClusterClientService extends AbstractCamelClusterService<AtomixClusterView> {
     private static final Logger LOGGER = LoggerFactory.getLogger(AtomixClusterClientService.class);
     private AtomixClientConfiguration configuration;
     private AtomixClient atomix;
@@ -49,12 +48,10 @@ public final class AtomixClusterClientService extends AbstractCamelClusterServic
     // Properties
     // **********************************
 
-    @Override
     public AtomixClientConfiguration getConfiguration() {
         return configuration;
     }
 
-    @Override
     public void setConfiguration(AtomixClientConfiguration configuration) {
         this.configuration = configuration.copy();
     }
@@ -109,7 +106,7 @@ public final class AtomixClusterClientService extends AbstractCamelClusterServic
 
     @Override
     protected AtomixClusterView createView(String namespace) throws Exception {
-        return new AtomixClusterView(this, namespace, getOrCreateClient());
+        return new AtomixClusterView(this, namespace, getOrCreateClient(), configuration);
     }
 
     private AtomixClient getOrCreateClient() throws Exception {

http://git-wip-us.apache.org/repos/asf/camel/blob/c7384488/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java
index a5a8e51..a7901ce 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java
@@ -23,13 +23,12 @@ import io.atomix.catalyst.transport.Address;
 import io.atomix.catalyst.transport.Transport;
 import io.atomix.copycat.server.storage.StorageLevel;
 import org.apache.camel.CamelContext;
-import org.apache.camel.component.atomix.AtomixConfigurationAware;
 import org.apache.camel.impl.ha.AbstractCamelClusterService;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public final class AtomixClusterService extends AbstractCamelClusterService<AtomixClusterView>  implements AtomixConfigurationAware<AtomixClusterConfiguration> {
+public final class AtomixClusterService extends AbstractCamelClusterService<AtomixClusterView> {
     private static final Logger LOGGER = LoggerFactory.getLogger(AtomixClusterService.class);
 
     private Address address;
@@ -63,12 +62,10 @@ public final class AtomixClusterService extends AbstractCamelClusterService<Atom
         this.address = address;
     }
 
-    @Override
     public AtomixClusterConfiguration getConfiguration() {
         return configuration;
     }
 
-    @Override
     public void setConfiguration(AtomixClusterConfiguration configuration) {
         this.configuration = configuration.copy();
     }
@@ -139,7 +136,7 @@ public final class AtomixClusterService extends AbstractCamelClusterService<Atom
 
     @Override
     protected AtomixClusterView createView(String namespace) throws Exception {
-        return new AtomixClusterView(this, namespace, getOrCreateReplica());
+        return new AtomixClusterView(this, namespace, getOrCreateReplica(), configuration);
     }
 
     private AtomixReplica getOrCreateReplica() throws Exception {

http://git-wip-us.apache.org/repos/asf/camel/blob/c7384488/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java
index 62104a7..c7bbf2d 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java
@@ -27,7 +27,6 @@ import io.atomix.group.DistributedGroup;
 import io.atomix.group.GroupMember;
 import io.atomix.group.LocalMember;
 import org.apache.camel.component.atomix.AtomixConfiguration;
-import org.apache.camel.component.atomix.AtomixConfigurationAware;
 import org.apache.camel.ha.CamelClusterMember;
 import org.apache.camel.ha.CamelClusterService;
 import org.apache.camel.impl.ha.AbstractCamelClusterView;
@@ -40,12 +39,14 @@ final class AtomixClusterView extends AbstractCamelClusterView {
 
     private final Atomix atomix;
     private final AtomixLocalMember localMember;
+    private final AtomixConfiguration<?> configuration;
     private DistributedGroup group;
 
-    AtomixClusterView(CamelClusterService cluster, String namespace, Atomix atomix) {
+    AtomixClusterView(CamelClusterService cluster, String namespace, Atomix atomix, AtomixConfiguration<?> configuration) {
         super(cluster, namespace);
 
         this.atomix = atomix;
+        this.configuration = configuration;
         this.localMember = new AtomixLocalMember();
     }
 
@@ -89,9 +90,6 @@ final class AtomixClusterView extends AbstractCamelClusterView {
         if (!localMember.hasJoined()) {
             LOGGER.debug("Get group {}", getNamespace());
 
-            final AtomixConfigurationAware service = AtomixConfigurationAware.class.cast(getClusterService());
-            final AtomixConfiguration<?> configuration = service.getConfiguration();
-
             group = this.atomix.getGroup(
                 getNamespace(),
                 new DistributedGroup.Config(configuration.getResourceConfig(getNamespace())),


[2/2] camel git commit: camel-zookeeper: create a zookeeper cluster-service

Posted by lb...@apache.org.
camel-zookeeper: create a zookeeper cluster-service


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/37a15c72
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/37a15c72
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/37a15c72

Branch: refs/heads/master
Commit: 37a15c72716dffe37562a4893b34d21a1dc8dd7f
Parents: c738448
Author: lburgazzoli <lb...@gmail.com>
Authored: Mon Jun 26 15:27:13 2017 +0200
Committer: lburgazzoli <lb...@gmail.com>
Committed: Mon Jun 26 17:21:42 2017 +0200

----------------------------------------------------------------------
 .../zookeeper/ha/ZooKeeperClusterService.java   | 279 +++++++++++++++++++
 .../zookeeper/ha/ZooKeeperClusterView.java      | 166 +++++++++++
 .../zookeeper/ZooKeeperTestSupport.java         |   2 +-
 .../ha/ZooKeeperClientRoutePolicyTest.java      | 122 ++++++++
 .../src/test/resources/log4j2.properties        |   2 +-
 5 files changed, 569 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/37a15c72/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterService.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterService.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterService.java
new file mode 100644
index 0000000..389d83c
--- /dev/null
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterService.java
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.ha;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.camel.impl.ha.AbstractCamelClusterService;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.AuthInfo;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZooKeeperClusterService extends AbstractCamelClusterService<ZooKeeperClusterView> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ZooKeeperClusterService.class);
+
+    private CuratorFramework client;
+    private List<String> nodes;
+    private String namespace;
+    private long reconnectBaseSleepTime;
+    private TimeUnit reconnectBaseSleepTimeUnit;
+    private int reconnectMaxRetries;
+    private long sessionTimeout;
+    private TimeUnit sessionTimeoutUnit;
+    private long connectionTimeout;
+    private TimeUnit connectionTimeotUnit;
+    private List<AuthInfo> authInfoList;
+    private long maxCloseWait;
+    private TimeUnit maxCloseWaitUnit;
+    private boolean closeOnStop;
+    private RetryPolicy retryPolicy;
+
+    public ZooKeeperClusterService() {
+        this.reconnectBaseSleepTime = 1000;
+        this.reconnectBaseSleepTimeUnit = TimeUnit.MILLISECONDS;
+        this.reconnectMaxRetries = 3;
+        this.closeOnStop = true;
+
+        // from org.apache.curator.framework.CuratorFrameworkFactory
+        this.sessionTimeout = Integer.getInteger("curator-default-session-timeout", 60 * 1000);
+        this.sessionTimeoutUnit =  TimeUnit.MILLISECONDS;
+
+        // from org.apache.curator.framework.CuratorFrameworkFactory
+        this.connectionTimeout = Integer.getInteger("curator-default-connection-timeout", 15 * 1000);
+        this.connectionTimeotUnit = TimeUnit.MILLISECONDS;
+
+        // from org.apache.curator.framework.CuratorFrameworkFactory
+        this.maxCloseWait = 1000;
+        this.maxCloseWaitUnit = TimeUnit.MILLISECONDS;
+    }
+
+    // *********************************************
+    // Properties
+    // *********************************************
+
+    public CuratorFramework getClient() {
+        return client;
+    }
+
+    public void setClient(CuratorFramework client) {
+        this.client = client;
+    }
+
+    public List<String> getNodes() {
+        return nodes;
+    }
+
+    public void setNodes(String nodes) {
+        this.nodes = Collections.unmodifiableList(
+            Arrays.stream(nodes.split(",")).collect(Collectors.toList())
+        );
+    }
+
+    public void setNodes(List<String> nodes) {
+        this.nodes = Collections.unmodifiableList(new ArrayList<>(nodes));
+    }
+
+    public String getNamespace() {
+        return namespace;
+    }
+
+    public void setNamespace(String namespace) {
+        this.namespace = namespace;
+    }
+
+    public long getReconnectBaseSleepTime() {
+        return reconnectBaseSleepTime;
+    }
+
+    public void setReconnectBaseSleepTime(long reconnectBaseSleepTime) {
+        this.reconnectBaseSleepTime = reconnectBaseSleepTime;
+    }
+
+    public void setReconnectBaseSleepTime(long reconnectBaseSleepTime, TimeUnit reconnectBaseSleepTimeUnit) {
+        this.reconnectBaseSleepTime = reconnectBaseSleepTime;
+        this.reconnectBaseSleepTimeUnit = reconnectBaseSleepTimeUnit;
+    }
+
+    public TimeUnit getReconnectBaseSleepTimeUnit() {
+        return reconnectBaseSleepTimeUnit;
+    }
+
+    public void setReconnectBaseSleepTimeUnit(TimeUnit reconnectBaseSleepTimeUnit) {
+        this.reconnectBaseSleepTimeUnit = reconnectBaseSleepTimeUnit;
+    }
+
+    public int getReconnectMaxRetries() {
+        return reconnectMaxRetries;
+    }
+
+    public void setReconnectMaxRetries(int reconnectMaxRetries) {
+        this.reconnectMaxRetries = reconnectMaxRetries;
+    }
+
+    public long getSessionTimeout() {
+        return sessionTimeout;
+    }
+
+    public void setSessionTimeout(long sessionTimeout) {
+        this.sessionTimeout = sessionTimeout;
+    }
+
+    public void setSessionTimeout(long sessionTimeout, TimeUnit sessionTimeoutUnit) {
+        this.sessionTimeout = sessionTimeout;
+        this.sessionTimeoutUnit = sessionTimeoutUnit;
+    }
+
+    public TimeUnit getSessionTimeoutUnit() {
+        return sessionTimeoutUnit;
+    }
+
+    public void setSessionTimeoutUnit(TimeUnit sessionTimeoutUnit) {
+        this.sessionTimeoutUnit = sessionTimeoutUnit;
+    }
+
+    public long getConnectionTimeout() {
+        return connectionTimeout;
+    }
+
+    public void setConnectionTimeout(long connectionTimeout) {
+        this.connectionTimeout = connectionTimeout;
+    }
+
+    public void setConnectionTimeout(long connectionTimeout, TimeUnit connectionTimeotUnit) {
+        this.connectionTimeout = connectionTimeout;
+        this.connectionTimeotUnit = connectionTimeotUnit;
+    }
+
+    public TimeUnit getConnectionTimeotUnit() {
+        return connectionTimeotUnit;
+    }
+
+    public void setConnectionTimeotUnit(TimeUnit connectionTimeotUnit) {
+        this.connectionTimeotUnit = connectionTimeotUnit;
+    }
+
+    public List<AuthInfo> getAuthInfoList() {
+        return authInfoList;
+    }
+
+    public void setAuthInfoList(List<AuthInfo> authInfoList) {
+        this.authInfoList = authInfoList;
+    }
+
+    public long getMaxCloseWait() {
+        return maxCloseWait;
+    }
+
+    public void setMaxCloseWait(long maxCloseWait) {
+        this.maxCloseWait = maxCloseWait;
+    }
+
+    public TimeUnit getMaxCloseWaitUnit() {
+        return maxCloseWaitUnit;
+    }
+
+    public void setMaxCloseWaitUnit(TimeUnit maxCloseWaitUnit) {
+        this.maxCloseWaitUnit = maxCloseWaitUnit;
+    }
+
+    public boolean isCloseOnStop() {
+        return closeOnStop;
+    }
+
+    public void setCloseOnStop(boolean closeOnStop) {
+        this.closeOnStop = closeOnStop;
+    }
+
+    public RetryPolicy getRetryPolicy() {
+        return retryPolicy;
+    }
+
+    public void setRetryPolicy(RetryPolicy retryPolicy) {
+        this.retryPolicy = retryPolicy;
+    }
+
+    // *********************************************
+    //
+    // *********************************************
+
+    @Override
+    protected ZooKeeperClusterView createView(String namespace) throws Exception {
+        return new ZooKeeperClusterView(this, getOrCreateClient(), namespace);
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        // instantiate a new CuratorFramework
+        getOrCreateClient();
+
+        super.doStart();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+
+        if (client != null && closeOnStop) {
+            client.close();
+        }
+    }
+
+    private CuratorFramework getOrCreateClient() throws Exception {
+        if (client == null) {
+            // Validate parameters
+            ObjectHelper.notNull(getCamelContext(), "Camel Context");
+            ObjectHelper.notNull(nodes, "ZooKeeper Nodes");
+
+            CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
+                .connectString(String.join(",", nodes))
+                .sessionTimeoutMs((int)sessionTimeoutUnit.toMillis(sessionTimeout))
+                .connectionTimeoutMs((int)connectionTimeotUnit.toMillis(connectionTimeout))
+                .maxCloseWaitMs((int)maxCloseWaitUnit.toMillis(maxCloseWait))
+                .retryPolicy(retryPolicy());
+
+            Optional.ofNullable(namespace).ifPresent(builder::namespace);
+            Optional.ofNullable(authInfoList).ifPresent(builder::authorization);
+
+            LOGGER.debug("Connect to ZooKeeper with namespace {},  nodes: {}", namespace, nodes);
+            client = builder.build();
+
+            LOGGER.debug("Starting ZooKeeper client");
+            client.start();
+        }
+
+        return this.client;
+    }
+
+    private RetryPolicy retryPolicy() {
+        return retryPolicy != null
+            ? retryPolicy
+            : new ExponentialBackoffRetry(
+                (int)reconnectBaseSleepTimeUnit.toMillis(reconnectBaseSleepTime),
+                reconnectMaxRetries);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/37a15c72/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java
new file mode 100644
index 0000000..86e530f
--- /dev/null
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.ha;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.ha.CamelClusterMember;
+import org.apache.camel.ha.CamelClusterService;
+import org.apache.camel.impl.ha.AbstractCamelClusterView;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.leader.LeaderSelector;
+import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
+import org.apache.curator.framework.recipes.leader.Participant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class ZooKeeperClusterView extends AbstractCamelClusterView {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ZooKeeperClusterView.class);
+
+    private final CuratorFramework client;
+    private final CuratorLocalMember localMember;
+    private LeaderSelector leaderSelector;
+
+    public ZooKeeperClusterView(CamelClusterService cluster, CuratorFramework client, String namespace) {
+        super(cluster, namespace);
+
+        this.localMember = new CuratorLocalMember();
+        this.client = client;
+    }
+
+    @Override
+    public CamelClusterMember getLocalMember() {
+        return this.localMember;
+    }
+
+    @Override
+    public Optional<CamelClusterMember> getMaster() {
+        if (leaderSelector == null) {
+            return Optional.empty();
+        }
+
+        try {
+            return Optional.of(new CuratorClusterMember(leaderSelector.getLeader()));
+        } catch (Exception e) {
+            throw new RuntimeCamelException(e);
+        }
+    }
+
+    @Override
+    public List<CamelClusterMember> getMembers() {
+        if (leaderSelector == null) {
+            return Collections.emptyList();
+        }
+
+        try {
+            return leaderSelector.getParticipants()
+                .stream()
+                .map(CuratorClusterMember::new)
+                .collect(Collectors.toList());
+        } catch (Exception e) {
+            throw new RuntimeCamelException(e);
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        if (leaderSelector == null) {
+            leaderSelector = new LeaderSelector(client, getNamespace(), new CamelLeaderElectionListener());
+            leaderSelector.setId(getClusterService().getId());
+            leaderSelector.start();
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (leaderSelector != null) {
+            leaderSelector.interruptLeadership();
+            leaderSelector.close();
+            leaderSelector = null;
+        }
+    }
+
+    class CamelLeaderElectionListener extends LeaderSelectorListenerAdapter {
+        @Override
+        public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
+            localMember.setMaster(true);
+            fireLeadershipChangedEvent(localMember);
+
+            while (isRunAllowed()) {
+                try {
+                    Thread.sleep(5000);
+                } catch (InterruptedException e) {
+                    Thread.interrupted();
+                    break;
+                }
+            }
+            
+            localMember.setMaster(false);
+            getMaster().ifPresent(leader -> fireLeadershipChangedEvent(leader));
+        }
+    }
+
+    // ***********************************************
+    //
+    // ***********************************************
+
+    private final class CuratorLocalMember implements CamelClusterMember {
+        private AtomicBoolean master = new AtomicBoolean(false);
+
+        void setMaster(boolean master) {
+            this.master.set(master);
+        }
+
+        @Override
+        public boolean isMaster() {
+            return master.get();
+        }
+
+        @Override
+        public String getId() {
+            return getClusterService().getId();
+        }
+    }
+
+    private final class CuratorClusterMember implements CamelClusterMember {
+        private final Participant participant;
+
+        CuratorClusterMember(Participant participant) {
+            this.participant = participant;
+        }
+
+        @Override
+        public String getId() {
+            return participant.getId();
+        }
+
+        @Override
+        public boolean isMaster() {
+            try {
+                return leaderSelector.getLeader().equals(this.participant);
+            } catch (Exception e) {
+                LOGGER.debug("", e);
+                return false;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/37a15c72/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java
index 6225e60..ae27c78 100644
--- a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java
+++ b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java
@@ -225,7 +225,7 @@ public class ZooKeeperTestSupport extends CamelTestSupport {
 
     // Wait methods are taken directly from the Zookeeper tests. A tests jar
     // would be nice! Another good reason the keeper folks should move to maven.
-    private static boolean waitForServerUp(String hp, long timeout) {
+    public static boolean waitForServerUp(String hp, long timeout) {
         long start = System.currentTimeMillis();
         while (true) {
             try {

http://git-wip-us.apache.org/repos/asf/camel/blob/37a15c72/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClientRoutePolicyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClientRoutePolicyTest.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClientRoutePolicyTest.java
new file mode 100644
index 0000000..564940d
--- /dev/null
+++ b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClientRoutePolicyTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.ha;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
+import org.apache.camel.component.zookeeper.ZooKeeperTestSupport.TestZookeeperServer;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
+import org.apache.camel.test.AvailablePortFinder;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class ZooKeeperClientRoutePolicyTest {
+    private static final int PORT = AvailablePortFinder.getNextAvailable();
+    private static final Logger LOGGER = LoggerFactory.getLogger(ZooKeeperClientRoutePolicyTest.class);
+    private static final List<String> CLIENTS = IntStream.range(0, 3).mapToObj(Integer::toString).collect(Collectors.toList());
+    private static final List<String> RESULTS = new ArrayList<>();
+    private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(CLIENTS.size() * 2);
+    private static final CountDownLatch LATCH = new CountDownLatch(CLIENTS.size());
+
+    // ************************************
+    // Test
+    // ************************************
+
+    @Test
+    public void test() throws Exception {
+        TestZookeeperServer server = null;
+
+        try {
+            server = new TestZookeeperServer(PORT, true);
+            ZooKeeperTestSupport.waitForServerUp("localhost:" + PORT, 1000);
+
+            for (String id : CLIENTS) {
+                SCHEDULER.submit(() -> run(id));
+            }
+
+            LATCH.await(1, TimeUnit.MINUTES);
+            SCHEDULER.shutdownNow();
+
+            Assert.assertEquals(CLIENTS.size(), RESULTS.size());
+            Assert.assertTrue(RESULTS.containsAll(CLIENTS));
+        } finally {
+            if (server != null) {
+                server.shutdown();
+            }
+        }
+    }
+
+    // ************************************
+    // Run a Camel node
+    // ************************************
+
+    private static void run(String id) {
+        try {
+            CountDownLatch contextLatch = new CountDownLatch(1);
+
+            ZooKeeperClusterService service = new ZooKeeperClusterService();
+            service.setId("node-" + id);
+            service.setNodes("localhost:" + PORT);
+            service.setNamespace(null );
+
+            DefaultCamelContext context = new DefaultCamelContext();
+            context.disableJMX();
+            context.setName("context-" + id);
+            context.addService(service);
+            context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("/my-ns"));
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("timer:zookeeper?delay=1s&period=1s&repeatCount=1")
+                        .routeId("route-" + id)
+                        .process(e -> {
+                            LOGGER.debug("Node {} done", id);
+                            RESULTS.add(id);
+                            // Shutdown the context later on to give a chance to
+                            // other members to catch-up
+                            SCHEDULER.schedule(contextLatch::countDown, 2 + ThreadLocalRandom.current().nextInt(3), TimeUnit.SECONDS);
+                        });
+                }
+            });
+
+            // Start the context after some random time so the startup order
+            // changes for each test.
+            Thread.sleep(ThreadLocalRandom.current().nextInt(500));
+            context.start();
+
+            contextLatch.await();
+            context.stop();
+
+            LATCH.countDown();
+        } catch (Exception e) {
+            LOGGER.warn("", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/37a15c72/components/camel-zookeeper/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/test/resources/log4j2.properties b/components/camel-zookeeper/src/test/resources/log4j2.properties
index d7bb987..2e7cfa5 100644
--- a/components/camel-zookeeper/src/test/resources/log4j2.properties
+++ b/components/camel-zookeeper/src/test/resources/log4j2.properties
@@ -43,6 +43,6 @@ logger.springframework.name = org.springframework
 logger.springframework.level = WARN
 rootLogger.level = INFO
 #rootLogger.appenderRef.stdout.ref = out
-rootLogger.appenderRef.file.ref = file
+rootLogger.appenderRef.file.ref = out