You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/03/10 17:50:40 UTC

[3/3] camel git commit: CAMEL-10986: camel-zookeeper-master - Donation of the master component from fabric8 v1

CAMEL-10986: camel-zookeeper-master - Donation of the master component from fabric8 v1


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bf82a5ed
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bf82a5ed
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bf82a5ed

Branch: refs/heads/master
Commit: bf82a5ed1980b96eaf2cc012e8b97ef78ffd2229
Parents: 8e14a77
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Mar 10 18:49:53 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Mar 10 18:49:53 2017 +0100

----------------------------------------------------------------------
 apache-camel/pom.xml                            |   4 +
 .../src/main/descriptors/common-bin.xml         |   1 +
 components/camel-zookeeper-master/pom.xml       | 157 +++++
 .../main/docs/zookeeper-master-component.adoc   |  86 +++
 .../zookeepermaster/CamelNodeState.java         |  37 +
 .../zookeepermaster/MasterComponent.java        |  76 +++
 .../zookeepermaster/MasterConsumer.java         | 157 +++++
 .../zookeepermaster/MasterEndpoint.java         |  98 +++
 .../zookeepermaster/ZKComponentSupport.java     | 185 +++++
 .../component/zookeepermaster/group/Group.java  | 100 +++
 .../zookeepermaster/group/GroupFactory.java     |  31 +
 .../zookeepermaster/group/GroupListener.java    |  32 +
 .../zookeepermaster/group/MultiGroup.java       |  26 +
 .../zookeepermaster/group/NodeState.java        |  69 ++
 .../group/internal/ChildData.java               | 129 ++++
 .../group/internal/CompositeOperation.java      |  56 ++
 .../group/internal/DelegateZooKeeperGroup.java  | 180 +++++
 .../internal/DelegateZooKeeperMultiGroup.java   |  44 ++
 .../group/internal/EventOperation.java          |  39 ++
 .../group/internal/GetDataOperation.java        |  61 ++
 .../group/internal/ManagedGroupFactory.java     |  28 +
 .../internal/ManagedGroupFactoryBuilder.java    |  50 ++
 .../group/internal/Operation.java               |  23 +
 .../group/internal/RefreshOperation.java        |  61 ++
 .../group/internal/SequenceComparator.java      |  27 +
 .../internal/StaticManagedGroupFactory.java     |  66 ++
 .../group/internal/UpdateOperation.java         |  55 ++
 .../group/internal/ZooKeeperGroup.java          | 671 +++++++++++++++++++
 .../group/internal/ZooKeeperGroupFactory.java   |  56 ++
 .../group/internal/ZooKeeperMultiGroup.java     |  58 ++
 .../internal/osgi/OsgiManagedGroupFactory.java  | 183 +++++
 .../internal/osgi/TrackingZooKeeperGroup.java   |  65 ++
 .../src/main/resources/META-INF/LICENSE.txt     | 203 ++++++
 .../src/main/resources/META-INF/NOTICE.txt      |  11 +
 .../org/apache/camel/component/zookeeper-master |  17 +
 .../zookeepermaster/CuratorFactoryBean.java     |  88 +++
 .../MasterEndpointFailoverTest.java             | 147 ++++
 .../zookeepermaster/MasterEndpointTest.java     | 100 +++
 .../MasterQuartz2EndpointTest.java              |  76 +++
 .../zookeepermaster/ZKServerFactoryBean.java    | 225 +++++++
 .../zookeepermaster/group/GroupTest.java        | 388 +++++++++++
 .../group/internal/ZooKeeperGroupTest.java      | 232 +++++++
 .../src/test/resources/log4j2.properties        |  45 ++
 .../MasterEndpointTest-context.xml              |  43 ++
 .../MasterQuartz2EndpointTest-context.xml       |  45 ++
 components/readme.adoc                          |   7 +-
 docs/user-manual/en/SUMMARY.md                  |   1 +
 parent/pom.xml                                  |   5 +
 .../camel-zookeeper-master-starter/pom.xml      |  51 ++
 .../MasterComponentAutoConfiguration.java       | 110 +++
 .../MasterComponentConfiguration.java           | 111 +++
 .../src/main/resources/META-INF/LICENSE.txt     | 203 ++++++
 .../src/main/resources/META-INF/NOTICE.txt      |  11 +
 ...dditional-spring-configuration-metadata.json |  10 +
 .../main/resources/META-INF/spring.factories    |  19 +
 .../src/main/resources/META-INF/spring.provides |  18 +
 .../spring-boot/components-starter/pom.xml      |   1 +
 57 files changed, 5076 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/bf82a5ed/apache-camel/pom.xml
----------------------------------------------------------------------
diff --git a/apache-camel/pom.xml b/apache-camel/pom.xml
index e05f958..db37cc5 100644
--- a/apache-camel/pom.xml
+++ b/apache-camel/pom.xml
@@ -1068,6 +1068,10 @@
       <groupId>org.apache.camel</groupId>
       <artifactId>camel-zookeeper</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-zookeeper-master</artifactId>
+    </dependency>
 
     <!-- camel starters -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/camel/blob/bf82a5ed/apache-camel/src/main/descriptors/common-bin.xml
----------------------------------------------------------------------
diff --git a/apache-camel/src/main/descriptors/common-bin.xml b/apache-camel/src/main/descriptors/common-bin.xml
index 672ed0e..ad89366 100644
--- a/apache-camel/src/main/descriptors/common-bin.xml
+++ b/apache-camel/src/main/descriptors/common-bin.xml
@@ -278,6 +278,7 @@
         <include>org.apache.camel:camel-zipkin</include>
         <include>org.apache.camel:camel-zipkin-starter</include>
         <include>org.apache.camel:camel-zookeeper</include>
+        <include>org.apache.camel:camel-zookeeper-master</include>
       </includes>
     </dependencySet>
 

http://git-wip-us.apache.org/repos/asf/camel/blob/bf82a5ed/components/camel-zookeeper-master/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper-master/pom.xml b/components/camel-zookeeper-master/pom.xml
new file mode 100644
index 0000000..493a266
--- /dev/null
+++ b/components/camel-zookeeper-master/pom.xml
@@ -0,0 +1,157 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>components</artifactId>
+    <groupId>org.apache.camel</groupId>
+    <version>2.19.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>camel-zookeeper-master</artifactId>
+  <packaging>jar</packaging>
+  <name>Camel :: Zookeeper Master</name>
+  <description>Camel Zookeeper Master Support</description>
+
+  <properties>
+    <camel.osgi.export.pkg>org.apache.camel.component.zookeepermaster.*</camel.osgi.export.pkg>
+    <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=zookeeper-master</camel.osgi.export.service>
+  </properties>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-core</artifactId>
+    </dependency>
+
+    <!-- zookeeper -->
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <version>${zookeeper-version}</version>
+      <!-- Just exclude the old version of netty to avoid netty version conflict-->
+      <exclusions>
+        <exclusion>
+           <groupId>org.jboss.netty</groupId>
+           <artifactId>netty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty</artifactId>
+      <version>${netty3-version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-framework</artifactId>
+      <version>${curator-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-recipes</artifactId>
+      <version>${curator-version}</version>
+    </dependency>
+
+    <!-- json state -->
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+    </dependency>
+
+    <!-- optional OSGi support -->
+    <dependency>
+      <groupId>org.osgi</groupId>
+      <artifactId>org.osgi.core</artifactId>
+      <scope>provided</scope>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>org.osgi</groupId>
+      <artifactId>org.osgi.compendium</artifactId>
+      <scope>provided</scope>
+      <optional>true</optional>
+    </dependency>
+
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-quartz2</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-test-spring</artifactId>
+      <scope>test</scope>
+    </dependency>  
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-slf4j-impl</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <childDelegation>false</childDelegation>
+          <useFile>true</useFile>
+          <forkCount>1</forkCount>
+          <reuseForks>true</reuseForks>
+          <forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+
+</project>

http://git-wip-us.apache.org/repos/asf/camel/blob/bf82a5ed/components/camel-zookeeper-master/src/main/docs/zookeeper-master-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper-master/src/main/docs/zookeeper-master-component.adoc b/components/camel-zookeeper-master/src/main/docs/zookeeper-master-component.adoc
new file mode 100644
index 0000000..be3530f
--- /dev/null
+++ b/components/camel-zookeeper-master/src/main/docs/zookeeper-master-component.adoc
@@ -0,0 +1,86 @@
+## ZooKeeper Master Component
+
+*Available as of Camel version 2.19*
+
+The **zookeeper-master:** endpoint provides a way to ensure only a single consumer in a cluster consumes from a given endpoint;
+with automatic failover if that JVM dies.
+
+This can be very useful if you need to consume from some legacy back end which either doesn't support concurrent
+consumption or due to commercial or stability reasons you can only have a single connection at any point in time.
+
+### Using the master endpoint
+
+Just prefix any camel endpoint with **zookeeper-master:someName:** where _someName_ is a logical name and is
+used to acquire the master lock. e.g.
+
+```
+from("zookeeper-master:cheese:jms:foo").to("activemq:wine");
+```
+The above simulates the [Exclusive Consumers](http://activemq.apache.org/exclusive-consumer.html) type feature in
+ActiveMQ; but on any third party JMS provider which maybe doesn't support exclusive consumers.
+
+
+### URI format
+
+[source]
+----
+zookeeper-master:name:endpoint[?options]
+----
+
+Where endpoint is any Camel endpoint you want to run in master/slave mode.
+
+
+### Options
+
+// component options: START
+The ZooKeeper Master component supports 6 options which are listed below.
+
+
+
+[width="100%",cols="2,1,1m,1m,5",options="header"]
+|=======================================================================
+| Name | Group | Default | Java Type | Description
+| zkRoot | consumer | /camel/zookeepermaster/clusters/master | String | The root path to use in zookeeper where information is stored which nodes are master/slave etc. Will by default use: /camel/zookeepermaster/clusters/master
+| curator | advanced |  | CuratorFramework | To use a custom configured CuratorFramework as connection to zookeeper ensemble.
+| maximumConnectionTimeout | consumer | 10000 | int | Timeout in millis to use when connecting to the zookeeper ensemble
+| zooKeeperUrl | consumer | localhost:2181 | String | The url for the zookeeper ensemble
+| zooKeeperPassword | security |  | String | The password to use when connecting to the zookeeper ensemble
+| resolvePropertyPlaceholders | advanced | true | boolean | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders.
+|=======================================================================
+// component options: END
+
+// endpoint options: START
+The ZooKeeper Master endpoint is configured using URI syntax:
+
+    zookeeper-master:name:endpoint
+
+with the following path and query parameters:
+
+#### Path Parameters (2 parameters):
+
+[width="100%",cols="2,1,1m,6",options="header"]
+|=======================================================================
+| Name | Default | Java Type | Description
+| name |  | String | *Required* The name of the cluster group to use
+| endpoint |  | String | *Required* The Camel endpoint to use in master/slave mode
+|=======================================================================
+
+#### Query Parameters (4 parameters):
+
+[width="100%",cols="2,1,1m,1m,5",options="header"]
+|=======================================================================
+| Name | Group | Default | Java Type | Description
+| bridgeErrorHandler | consumer | false | boolean | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored.
+| exceptionHandler | consumer (advanced) |  | ExceptionHandler | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored.
+| exchangePattern | consumer (advanced) |  | ExchangePattern | Sets the exchange pattern when the consumer creates an exchange.
+| synchronous | advanced | false | boolean | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported).
+|=======================================================================
+// endpoint options: END
+
+
+### See Also
+
+* link:configuring-camel.html[Configuring Camel]
+* link:component.html[Component]
+* link:endpoint.html[Endpoint]
+* link:getting-started.html[Getting Started]

http://git-wip-us.apache.org/repos/asf/camel/blob/bf82a5ed/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/CamelNodeState.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/CamelNodeState.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/CamelNodeState.java
new file mode 100644
index 0000000..d501e12
--- /dev/null
+++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/CamelNodeState.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeepermaster;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.camel.component.zookeepermaster.group.NodeState;
+
+public class CamelNodeState extends NodeState {
+
+    @JsonProperty
+    String consumer;
+
+    @JsonProperty
+    boolean started;
+
+    public CamelNodeState() {
+    }
+
+    public CamelNodeState(String id) {
+        super(id);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bf82a5ed/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/MasterComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/MasterComponent.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/MasterComponent.java
new file mode 100644
index 0000000..f2d5d63
--- /dev/null
+++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/MasterComponent.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeepermaster;
+
+import java.util.Map;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+
+/**
+ * The zookeeper-master camel component ensures that only a single endpoint in a cluster is active at any
+ * point in time with all other JVMs being hot standbys which wait until the master JVM dies before
+ * taking over to provide high availability of a single consumer.
+ */
+public class MasterComponent extends ZKComponentSupport {
+
+    @Metadata(defaultValue = "/camel/zookeepermaster/clusters/master")
+    private String zkRoot = "/camel/zookeepermaster/clusters/master";
+
+    public String getZkRoot() {
+        return zkRoot;
+    }
+
+    /**
+     * The root path to use in zookeeper where information is stored which nodes are master/slave etc.
+     * Will by default use: /camel/zookeepermaster/clusters/master
+     */
+    public void setZkRoot(String zkRoot) {
+        this.zkRoot = zkRoot;
+    }
+
+    //  Implementation methods
+    //-------------------------------------------------------------------------
+
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> params) throws Exception {
+        int idx = remaining.indexOf(':');
+        if (idx <= 0) {
+            throw new IllegalArgumentException("Missing : in URI so cannot split the group name from the actual URI for '" + remaining + "'");
+        }
+        // we are registering a regular endpoint
+        String name = remaining.substring(0, idx);
+        String childUri = remaining.substring(idx + 1);
+        // we need to apply the params here
+        if (params != null && params.size() > 0) {
+            childUri = childUri + "?" + URISupport.createQueryString(params);
+        }
+        MasterEndpoint answer = new MasterEndpoint(uri, this, name, childUri);
+        return answer;
+    }
+
+    protected String getCamelClusterPath(String name) {
+        String path = name;
+        if (ObjectHelper.isNotEmpty(zkRoot)) {
+            path = zkRoot + "/" + name;
+        }
+        return path;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bf82a5ed/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/MasterConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/MasterConsumer.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/MasterConsumer.java
new file mode 100644
index 0000000..ab3608c
--- /dev/null
+++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/MasterConsumer.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeepermaster;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.SuspendableService;
+import org.apache.camel.component.zookeepermaster.group.Group;
+import org.apache.camel.component.zookeepermaster.group.GroupListener;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.util.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A consumer which is only really active while it holds the master lock
+ */
+public class MasterConsumer extends DefaultConsumer implements GroupListener {
+    private static final transient Logger LOG = LoggerFactory.getLogger(MasterConsumer.class);
+
+    private final MasterEndpoint endpoint;
+    private final Processor processor;
+    private Consumer delegate;
+    private SuspendableService delegateService;
+    private final Group<CamelNodeState> singleton;
+
+    public MasterConsumer(MasterEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+        this.endpoint = endpoint;
+        this.processor = processor;
+        MasterComponent component = endpoint.getComponent();
+        String path = component.getCamelClusterPath(endpoint.getSingletonId());
+        this.singleton = component.createGroup(path);
+        this.singleton.add(this);
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        singleton.start();
+        LOG.info("Attempting to become master for endpoint: " + endpoint + " in " + endpoint.getCamelContext() + " with singletonID: " + endpoint.getSingletonId());
+        singleton.update(createNodeState());
+    }
+
+    private CamelNodeState createNodeState() {
+        CamelNodeState state = new CamelNodeState(endpoint.getSingletonId());
+        state.consumer = endpoint.getChildEndpoint().getEndpointUri();
+        return state;
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        try {
+            stopConsumer();
+        } finally {
+            singleton.close();
+        }
+    }
+
+    protected void stopConsumer() throws Exception {
+        ServiceHelper.stopAndShutdownServices(delegate);
+        ServiceHelper.stopAndShutdownServices(endpoint.getChildEndpoint());
+        delegate = null;
+        delegateService = null;
+    }
+
+    @Override
+    protected void doResume() throws Exception {
+        if (delegateService != null) {
+            delegateService.resume();
+        }
+        super.doResume();
+    }
+
+    @Override
+    protected void doSuspend() throws Exception {
+        if (delegateService != null) {
+            delegateService.suspend();
+        }
+        super.doSuspend();
+    }
+
+    @Override
+    public void groupEvent(Group group, GroupEvent event) {
+        switch (event) {
+        case CONNECTED:
+            break;
+        case CHANGED:
+            if (singleton.isConnected()) {
+                if (singleton.isMaster()) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Master/Standby endpoint is Master for:  " + endpoint + " in " + endpoint.getCamelContext());
+                    }
+                    onLockOwned();
+                } else {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Master/Standby endpoint is Standby for: " + endpoint + " in " + endpoint.getCamelContext());
+                    }
+                }
+            }
+            break;
+        case DISCONNECTED:
+            try {
+                LOG.info("Disconnecting as Master. Stopping consumer: {}", endpoint.getChildEndpoint());
+                stopConsumer();
+            } catch (Exception e) {
+                LOG.error("Failed to stop master consumer for: " + endpoint + ". Reason: " + e, e);
+            }
+            break;
+        default:
+            // noop
+        }
+
+    }
+
+    protected void onLockOwned() {
+        if (delegate == null) {
+            try {
+                // ensure endpoint is also started
+                LOG.info("Elected as master. Starting consumer: {}", endpoint.getChildEndpoint());
+                ServiceHelper.startService(endpoint.getChildEndpoint());
+
+                delegate = endpoint.getChildEndpoint().createConsumer(processor);
+                delegateService = null;
+                if (delegate instanceof SuspendableService) {
+                    delegateService = (SuspendableService) delegate;
+                }
+
+                // Lets show we are starting the consumer.
+                CamelNodeState nodeState = createNodeState();
+                nodeState.started = true;
+                singleton.update(nodeState);
+
+                ServiceHelper.startService(delegate);
+            } catch (Exception e) {
+                LOG.error("Failed to start master consumer for: " + endpoint + ". Reason: " + e, e);
+            }
+
+            LOG.info("Elected as master. Consumer started: {}", endpoint.getChildEndpoint());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bf82a5ed/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/MasterEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/MasterEndpoint.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/MasterEndpoint.java
new file mode 100644
index 0000000..6e034ed
--- /dev/null
+++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/MasterEndpoint.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeepermaster;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.DelegateEndpoint;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriPath;
+
+/**
+ * Represents an endpoint which only becomes active when it obtains the master lock
+ */
+@UriEndpoint(firstVersion = "2.19.0", scheme = "zookeeper-master", syntax = "zookeeper-master:name:endpoint", consumerClass = MasterConsumer.class, consumerOnly = true,
+    title = "ZooKeeper Master", lenientProperties = true, label = "clustering")
+public class MasterEndpoint extends DefaultEndpoint implements DelegateEndpoint {
+
+    private final MasterComponent component;
+
+    private final Endpoint childEndpoint;
+
+    @UriPath(name = "name", description = "The name of the cluster group to use")
+    @Metadata(required = "true")
+    private final String singletonId;
+
+    @UriPath(name = "endpoint", description = "The Camel endpoint to use in master/slave mode")
+    @Metadata(required = "true")
+    private final String child;
+
+    public MasterEndpoint(String uri, MasterComponent component, String singletonId, String child) {
+        super(uri, component);
+        this.component = component;
+        this.singletonId = singletonId;
+        this.child = child;
+        this.childEndpoint = getCamelContext().getEndpoint(child);
+    }
+
+    public String getSingletonId() {
+        return singletonId;
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        throw new UnsupportedOperationException("Cannot produce from this endpoint");
+    }
+
+    public Consumer createConsumer(Processor processor) throws Exception {
+        return new MasterConsumer(this, processor);
+    }
+
+    public boolean isSingleton() {
+        return true;
+    }
+
+    @Override
+    public boolean isLenientProperties() {
+        // to allow properties to be propagated to the child endpoint
+        return true;
+    }
+
+    // Properties
+    //-------------------------------------------------------------------------
+    public MasterComponent getComponent() {
+        return component;
+    }
+
+    public String getChild() {
+        return child;
+    }
+
+    // Implementation methods
+    //-------------------------------------------------------------------------
+    Endpoint getChildEndpoint() {
+        return childEndpoint;
+    }
+
+    public Endpoint getEndpoint() {
+        return getChildEndpoint();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bf82a5ed/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/ZKComponentSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/ZKComponentSupport.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/ZKComponentSupport.java
new file mode 100644
index 0000000..30d9e04
--- /dev/null
+++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/ZKComponentSupport.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeepermaster;
+
+import java.util.concurrent.Callable;
+
+import org.apache.camel.component.zookeepermaster.group.Group;
+import org.apache.camel.component.zookeepermaster.group.internal.ManagedGroupFactory;
+import org.apache.camel.component.zookeepermaster.group.internal.ManagedGroupFactoryBuilder;
+import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.spi.Metadata;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryOneTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ZKComponentSupport extends DefaultComponent implements Callable<CuratorFramework>, ConnectionStateListener {
+    private static final transient Logger LOG = LoggerFactory.getLogger(ZKComponentSupport.class);
+
+    private static final String ZOOKEEPER_URL = "zookeeper.url";
+    private static final String ZOOKEEPER_PASSWORD = "zookeeper.password";
+    private static final String ZOOKEEPER_URL_ENV = "ZOOKEEPER_URL";
+    private static final String ZOOKEEPER_HOST_ENV = "ZK_CLIENT_SERVICE_HOST";
+    private static final String ZOOKEEPER_PORT_ENV = "ZK_CLIENT_SERVICE_PORT";
+
+    private ManagedGroupFactory managedGroupFactory;
+
+    @Metadata(label = "advanced")
+    private CuratorFramework curator;
+    @Metadata(defaultValue = "10000")
+    private int maximumConnectionTimeout = 10 * 1000;
+    @Metadata(defaultValue = "localhost:2181")
+    private String zooKeeperUrl;
+    @Metadata(label = "security", secret = true)
+    private String zooKeeperPassword;
+
+    public CuratorFramework getCurator() {
+        if (managedGroupFactory == null) {
+            throw new IllegalStateException("Component is not started");
+        }
+        return managedGroupFactory.getCurator();
+    }
+
+    public Group<CamelNodeState> createGroup(String path) {
+        if (managedGroupFactory == null) {
+            throw new IllegalStateException("Component is not started");
+        }
+        return managedGroupFactory.createGroup(path, CamelNodeState.class);
+    }
+
+    /**
+     * To use a custom configured CuratorFramework as connection to zookeeper ensemble.
+     */
+    public void setCurator(CuratorFramework curator) {
+        this.curator = curator;
+        registerAsListener();
+    }
+
+    public int getMaximumConnectionTimeout() {
+        return maximumConnectionTimeout;
+    }
+
+    /**
+     * Timeout in millis to use when connecting to the zookeeper ensemble
+     */
+    public void setMaximumConnectionTimeout(int maximumConnectionTimeout) {
+        this.maximumConnectionTimeout = maximumConnectionTimeout;
+    }
+
+    public String getZooKeeperUrl() {
+        return zooKeeperUrl;
+    }
+
+    /**
+     * The url for the zookeeper ensemble
+     */
+    public void setZooKeeperUrl(String zooKeeperUrl) {
+        this.zooKeeperUrl = zooKeeperUrl;
+    }
+
+    public String getZooKeeperPassword() {
+        return zooKeeperPassword;
+    }
+
+    /**
+     * The password to use when connecting to the zookeeper ensemble
+     */
+    public void setZooKeeperPassword(String zooKeeperPassword) {
+        this.zooKeeperPassword = zooKeeperPassword;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        if (curator == null) {
+            try {
+                CuratorFramework aCurator = (CuratorFramework) getCamelContext().getRegistry().lookupByName("curator");
+                if (aCurator != null) {
+                    setCurator(aCurator);
+                }
+                if (curator != null) {
+                    LOG.debug("Zookeeper client found in camel registry. " + curator);
+                }
+            } catch (Exception exception) {
+                // ignore
+            }
+        }
+
+        // will auto create curator if needed
+        managedGroupFactory = ManagedGroupFactoryBuilder.create(curator, getClass().getClassLoader(), getCamelContext().getClassResolver(), this);
+    }
+
+    public CuratorFramework call() throws Exception {
+        String connectString = getZooKeeperUrl();
+        if (connectString == null) {
+            connectString = System.getenv(ZOOKEEPER_URL_ENV);
+        }
+        if (connectString == null) {
+            String zkHost = System.getenv(ZOOKEEPER_HOST_ENV);
+            if (zkHost != null) {
+                String zkPort = System.getenv(ZOOKEEPER_PORT_ENV);
+                connectString = zkHost + ":" + (zkPort == null ? "2181" : zkPort);
+            }
+        }
+        if (connectString == null) {
+            connectString = System.getProperty(ZOOKEEPER_URL, "localhost:2181");
+        }
+        String password = getZooKeeperPassword();
+        if (password == null) {
+            System.getProperty(ZOOKEEPER_PASSWORD);
+        }
+        LOG.debug("CuratorFramework not found in Camel registry, creating new with connection " + connectString);
+        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
+            .connectString(connectString)
+            .retryPolicy(new RetryOneTime(1000))
+            .connectionTimeoutMs(getMaximumConnectionTimeout());
+
+        if (password != null && !password.isEmpty()) {
+            builder.authorization("digest", ("fabric:" + password).getBytes());
+        }
+
+        curator = builder.build();
+        LOG.debug("Starting curator " + curator);
+        curator.start();
+        return curator;
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        if (managedGroupFactory != null) {
+            managedGroupFactory.close();
+            managedGroupFactory = null;
+        }
+    }
+
+    @Override
+    public void stateChanged(CuratorFramework client, ConnectionState newState) {
+        LOG.info("Curator Connection new state: " + newState);
+    }
+
+    protected void registerAsListener() {
+        if (curator != null) {
+            curator.getConnectionStateListenable().addListener(this);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bf82a5ed/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/Group.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/Group.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/Group.java
new file mode 100644
index 0000000..647e8c1
--- /dev/null
+++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/Group.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeepermaster.group;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public interface Group<T extends NodeState> extends Closeable {
+
+    /**
+     * Are we connected with the cluster?
+     */
+    boolean isConnected();
+
+    /**
+     * Start this member
+     */
+    void start();
+
+    /**
+     * A member should be closed to release acquired resources used
+     * to monitor the group membership.
+     *
+     * When the member is closed, any memberships registered via this
+     * Group will be removed from the group.
+     */
+    void close() throws IOException;
+
+    /**
+     * Registers a listener which will be called
+     * when the cluster membership changes or
+     * the group is connected or disconnected.
+     */
+    void add(GroupListener<T> listener);
+
+    /**
+     * Removes a previously added listener.
+     */
+    void remove(GroupListener<T> listener);
+
+    /**
+     * Update the state of this group member.
+     * If the state is null, the member will leave the group.
+     *
+     * This method can be called even if the group is not started,
+     * in which case the state will be stored and updated
+     * when the group becomes started.
+     *
+     * @param state the new state of this group member
+     */
+    void update(T state);
+
+    /**
+     * Get the list of members connected to this group.
+     */
+    Map<String, T> members();
+
+    /**
+     * Check if we are the master.
+     */
+    boolean isMaster();
+
+    /**
+     * Retrieve the master node.
+     */
+    T master();
+
+    /**
+     * Retrieve the list of slaves.
+     */
+    List<T> slaves();
+
+    /**
+     * Gets the last state.
+     * <p/>
+     * This can be used by clients to access that last state, such as when the clients is being added
+     * as a {@link #add(GroupListener) listener} but wants to retrieve the last state to be up to date when the
+     * client is added.
+     *
+     * @return the state, or <tt>null</tt> if no last state yet.
+     */
+    T getLastState();
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bf82a5ed/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/GroupFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/GroupFactory.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/GroupFactory.java
new file mode 100644
index 0000000..2298e40
--- /dev/null
+++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/GroupFactory.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeepermaster.group;
+
+import java.util.concurrent.ThreadFactory;
+
+public interface GroupFactory {
+
+    <T extends NodeState> Group<T> createGroup(String path, Class<T> clazz);
+
+    <T extends NodeState> Group<T> createGroup(String path, Class<T> clazz, ThreadFactory threadFactory);
+
+    <T extends NodeState> Group<T> createMultiGroup(String path, Class<T> clazz);
+
+    <T extends NodeState> Group<T> createMultiGroup(String path, Class<T> clazz, ThreadFactory threadFactory);
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bf82a5ed/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/GroupListener.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/GroupListener.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/GroupListener.java
new file mode 100644
index 0000000..7d98c6e
--- /dev/null
+++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/GroupListener.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeepermaster.group;
+
+/**
+ * Callback interface used to get notifications of changes to a cluster group.
+ */
+public interface GroupListener<T extends NodeState> {
+
+    enum GroupEvent {
+        CONNECTED,
+        CHANGED,
+        DISCONNECTED
+    }
+
+    void groupEvent(Group<T> group, GroupEvent event);
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bf82a5ed/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/MultiGroup.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/MultiGroup.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/MultiGroup.java
new file mode 100644
index 0000000..b7d94a5
--- /dev/null
+++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/MultiGroup.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeepermaster.group;
+
+public interface MultiGroup<T extends NodeState> extends Group<T> {
+
+    /**
+     * Check if we are the master.
+     */
+    boolean isMaster(String id);
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bf82a5ed/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/NodeState.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/NodeState.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/NodeState.java
new file mode 100644
index 0000000..7188201
--- /dev/null
+++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/NodeState.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeepermaster.group;
+
+import java.util.UUID;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class NodeState {
+
+    @JsonProperty
+    public String id;
+
+    @JsonProperty
+    public final String container;
+
+    @JsonProperty
+    public String uuid; // internal use to suppress duplicates
+
+    public NodeState() {
+        this(null);
+    }
+
+    public NodeState(String id) {
+        this(id, System.getProperty("runtime.id", UUID.randomUUID().toString()));
+    }
+
+    public NodeState(String id, String container) {
+        this.id = id;
+        this.container = container;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getContainer() {
+        return container;
+    }
+
+    @Override
+    public String toString() {
+        try {
+            return new ObjectMapper().disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES).writeValueAsString(this);
+        } catch (Exception e) {
+            return super.toString();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bf82a5ed/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ChildData.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ChildData.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ChildData.java
new file mode 100644
index 0000000..7874673
--- /dev/null
+++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ChildData.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeepermaster.group.internal;
+
+import java.util.Arrays;
+
+import org.apache.zookeeper.data.Stat;
+
+public class ChildData<T> implements Comparable<ChildData> {
+    private final String path;
+    private final Stat stat;
+    private final byte[] data;
+    private final T node;
+
+    ChildData(String path, Stat stat, byte[] data, T node) {
+        this.path = path;
+        this.stat = stat;
+        this.data = data;
+        this.node = node;
+    }
+
+    /**
+     * @inheritDoc
+     *
+     * Note: this class has a natural ordering that is inconsistent with equals.
+     */
+    @Override
+    public int compareTo(ChildData rhs) {
+        if (this == rhs) {
+            return 0;
+        }
+        if (rhs == null || getClass() != rhs.getClass()) {
+            return -1;
+        }
+
+        return path.compareTo(rhs.path);
+    }
+
+    @SuppressWarnings("RedundantIfStatement")
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        ChildData childData = (ChildData) o;
+
+        if (!Arrays.equals(data, childData.data)) {
+            return false;
+        }
+        if (path != null ? !path.equals(childData.path) : childData.path != null) {
+            return false;
+        }
+        if (stat != null ? !stat.equals(childData.stat) : childData.stat != null) {
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = path != null ? path.hashCode() : 0;
+        result = 31 * result + (stat != null ? stat.hashCode() : 0);
+        result = 31 * result + (data != null ? Arrays.hashCode(data) : 0);
+        return result;
+    }
+
+    /**
+     * Returns the full path of the this child
+     *
+     * @return full path
+     */
+    public String getPath() {
+        return path;
+    }
+
+    /**
+     * Returns the stat data for this child
+     *
+     * @return stat or null
+     */
+    public Stat getStat() {
+        return stat;
+    }
+
+    /**
+     * <p>Returns the node data for this child when the cache mode is set to cache data.</p>
+     *
+     * <p><b>NOTE:</b> the byte array returned is the raw reference of this instance's field. If you change
+     * the values in the array any other callers to this method will see the change.</p>
+     *
+     * @return node data or null
+     */
+    public byte[] getData() {
+        return data;
+    }
+
+    /**
+     * <p>Returns the node for this group member.</p>
+     *
+     * @return the node or null
+     */
+    public T getNode() {
+        return node;
+    }
+
+    @Override
+    public String toString() {
+        return "ChildData{path='" + path + '\'' + ", stat=" + stat + ", data=" + Arrays.toString(data) + '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bf82a5ed/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/CompositeOperation.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/CompositeOperation.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/CompositeOperation.java
new file mode 100644
index 0000000..7073264
--- /dev/null
+++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/CompositeOperation.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeepermaster.group.internal;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Operation that aggregates several {@link Operation operations} to be performed inside single task passed
+ * to thread executor
+ */
+public class CompositeOperation implements Operation {
+
+    public static final Logger LOG = LoggerFactory.getLogger(CompositeOperation.class);
+
+    private Operation[] operations;
+
+    public CompositeOperation(Operation... operations) {
+        this.operations = operations;
+    }
+
+    @Override
+    public void invoke() throws Exception {
+        for (Operation op : operations) {
+            try {
+                op.invoke();
+                if (Thread.currentThread().isInterrupted()) {
+                    LOG.debug("Interrupting composite operation");
+                    Thread.currentThread().interrupt();
+                    break;
+                }
+            } catch (InterruptedException e) {
+                LOG.debug("Interrupting composite operation");
+                Thread.currentThread().interrupt();
+                break;
+            } catch (Exception e) {
+                LOG.error(e.getMessage(), e);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bf82a5ed/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/DelegateZooKeeperGroup.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/DelegateZooKeeperGroup.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/DelegateZooKeeperGroup.java
new file mode 100644
index 0000000..f0e3f64
--- /dev/null
+++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/DelegateZooKeeperGroup.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeepermaster.group.internal;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.camel.component.zookeepermaster.group.Group;
+import org.apache.camel.component.zookeepermaster.group.GroupListener;
+import org.apache.camel.component.zookeepermaster.group.NodeState;
+import org.apache.curator.framework.CuratorFramework;
+
+import static org.apache.curator.utils.CloseableUtils.closeQuietly;
+
+public class DelegateZooKeeperGroup<T extends NodeState> implements Group<T> {
+
+    private final String path;
+    private final Class<T> clazz;
+    private final List<GroupListener<T>> listeners;
+    private Group<T> group;
+    private T state;
+    private AtomicBoolean started = new AtomicBoolean();
+
+    public DelegateZooKeeperGroup(String path, Class<T> clazz) {
+        this.listeners = new ArrayList<GroupListener<T>>();
+        this.path = path;
+        this.clazz = clazz;
+    }
+
+    public void useCurator(CuratorFramework curator) {
+        Group<T> group = this.group;
+        if (group != null) {
+            closeQuietly(group);
+        }
+        if (curator != null) {
+            group = createGroup(curator, path, clazz);
+            group.update(state);
+            for (GroupListener<T> listener : listeners) {
+                group.add(listener);
+            }
+            if (started.get()) {
+                group.start();
+            }
+            this.group = group;
+        }
+    }
+
+    protected Group<T> createGroup(CuratorFramework client, String path, Class<T> clazz) {
+        return new ZooKeeperGroup<T>(client, path, clazz);
+    }
+
+    @Override
+    public void add(GroupListener<T> listener) {
+        listeners.add(listener);
+        Group<T> group = this.group;
+        if (group != null) {
+            group.add(listener);
+        }
+    }
+
+    @Override
+    public void remove(GroupListener<T> listener) {
+        listeners.remove(listener);
+        Group<T> group = this.group;
+        if (group != null) {
+            group.remove(listener);
+        }
+    }
+
+    @Override
+    public boolean isConnected() {
+        Group<T> group = this.group;
+        if (group != null) {
+            return group.isConnected();
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public void start() {
+        if (started.compareAndSet(false, true)) {
+            doStart();
+        }
+    }
+
+    protected void doStart() {
+        if (group != null) {
+            group.start();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (started.compareAndSet(true, false)) {
+            doStop();
+        }
+    }
+
+    protected void doStop() throws IOException {
+        closeQuietly(group);
+    }
+
+    @Override
+    public void update(T state) {
+        this.state = state;
+        Group<T> group = this.group;
+        if (group != null) {
+            group.update(state);
+        }
+    }
+
+    @Override
+    public Map<String, T> members() {
+        Group<T> group = this.group;
+        if (group != null) {
+            return group.members();
+        } else {
+            return Collections.emptyMap();
+        }
+    }
+
+    @Override
+    public boolean isMaster() {
+        Group<T> group = this.group;
+        if (group != null) {
+            return group.isMaster();
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public T master() {
+        Group<T> group = this.group;
+        if (group != null) {
+            return group.master();
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public List<T> slaves() {
+        Group<T> group = this.group;
+        if (group != null) {
+            return group.slaves();
+        } else {
+            return Collections.emptyList();
+        }
+    }
+
+    public Group<T> getGroup() {
+        return group;
+    }
+
+    @Override
+    public T getLastState() {
+        return group != null ? group.getLastState() : null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bf82a5ed/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/DelegateZooKeeperMultiGroup.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/DelegateZooKeeperMultiGroup.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/DelegateZooKeeperMultiGroup.java
new file mode 100644
index 0000000..16afc2d
--- /dev/null
+++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/DelegateZooKeeperMultiGroup.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeepermaster.group.internal;
+
+import org.apache.camel.component.zookeepermaster.group.Group;
+import org.apache.camel.component.zookeepermaster.group.MultiGroup;
+import org.apache.camel.component.zookeepermaster.group.NodeState;
+import org.apache.curator.framework.CuratorFramework;
+
+public class DelegateZooKeeperMultiGroup<T extends NodeState> extends DelegateZooKeeperGroup<T> implements MultiGroup<T> {
+
+    public DelegateZooKeeperMultiGroup(String path, Class<T> clazz) {
+        super(path, clazz);
+    }
+
+    protected Group<T> createGroup(CuratorFramework client, String path, Class<T> clazz) {
+        return new ZooKeeperMultiGroup<T>(client, path, clazz);
+    }
+
+    @Override
+    public boolean isMaster(String id) {
+        Group<T> group = this.getGroup();
+        if (group != null) {
+            return ((MultiGroup) group).isMaster(id);
+        } else {
+            return false;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bf82a5ed/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/EventOperation.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/EventOperation.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/EventOperation.java
new file mode 100644
index 0000000..72ad142
--- /dev/null
+++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/EventOperation.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeepermaster.group.internal;
+
+import org.apache.camel.component.zookeepermaster.group.GroupListener;
+
+class EventOperation implements Operation {
+    private final ZooKeeperGroup cache;
+    private final GroupListener.GroupEvent event;
+
+    EventOperation(ZooKeeperGroup cache, GroupListener.GroupEvent event) {
+        this.cache = cache;
+        this.event = event;
+    }
+
+    @Override
+    public void invoke() {
+        cache.callListeners(event);
+    }
+
+    @Override
+    public String toString() {
+        return "EventOperation{event=" + event + '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bf82a5ed/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/GetDataOperation.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/GetDataOperation.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/GetDataOperation.java
new file mode 100644
index 0000000..fc7f610
--- /dev/null
+++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/GetDataOperation.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeepermaster.group.internal;
+
+class GetDataOperation implements Operation {
+
+    private final ZooKeeperGroup cache;
+    private final String fullPath;
+
+    GetDataOperation(ZooKeeperGroup cache, String fullPath) {
+        this.cache = cache;
+        this.fullPath = fullPath;
+    }
+
+    @Override
+    public void invoke() throws Exception {
+        cache.getDataAndStat(fullPath);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        GetDataOperation that = (GetDataOperation) o;
+
+        if (!fullPath.equals(that.fullPath)) {
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        return fullPath.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "GetDataOperation{fullPath='" + fullPath + '\'' + '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bf82a5ed/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ManagedGroupFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ManagedGroupFactory.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ManagedGroupFactory.java
new file mode 100644
index 0000000..b812209
--- /dev/null
+++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ManagedGroupFactory.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeepermaster.group.internal;
+
+import org.apache.camel.component.zookeepermaster.group.GroupFactory;
+import org.apache.curator.framework.CuratorFramework;
+
+public interface ManagedGroupFactory extends GroupFactory {
+
+    CuratorFramework getCurator();
+
+    void close();
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bf82a5ed/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ManagedGroupFactoryBuilder.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ManagedGroupFactoryBuilder.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ManagedGroupFactoryBuilder.java
new file mode 100644
index 0000000..393c205
--- /dev/null
+++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ManagedGroupFactoryBuilder.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeepermaster.group.internal;
+
+import java.util.concurrent.Callable;
+
+import org.apache.camel.spi.ClassResolver;
+import org.apache.camel.util.IntrospectionSupport;
+import org.apache.curator.framework.CuratorFramework;
+
+public final class ManagedGroupFactoryBuilder {
+
+    private ManagedGroupFactoryBuilder() {
+    }
+
+    public static ManagedGroupFactory create(CuratorFramework curator,
+                                             ClassLoader loader,
+                                             ClassResolver resolver,
+                                             Callable<CuratorFramework> factory) throws Exception {
+        if (curator != null) {
+            return new StaticManagedGroupFactory(curator, false);
+        }
+        try {
+            Class<?> clazz = resolver.resolveClass("org.apache.camel.component.zookeepermaster.group.internal.osgi.OsgiManagedGroupFactory");
+            if (clazz != null) {
+                Object instance = clazz.newInstance();
+                IntrospectionSupport.setProperty(instance, "classLoader", loader);
+                return (ManagedGroupFactory) instance;
+            }
+        } catch (Throwable e) {
+            // Ignore if we'e not in OSGi
+        }
+        return new StaticManagedGroupFactory(factory.call(), true);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bf82a5ed/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/Operation.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/Operation.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/Operation.java
new file mode 100644
index 0000000..d50e51a
--- /dev/null
+++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/Operation.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeepermaster.group.internal;
+
+interface Operation {
+
+    void invoke() throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bf82a5ed/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/RefreshOperation.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/RefreshOperation.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/RefreshOperation.java
new file mode 100644
index 0000000..ab52530
--- /dev/null
+++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/RefreshOperation.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeepermaster.group.internal;
+
+class RefreshOperation implements Operation {
+    private final ZooKeeperGroup cache;
+    private final ZooKeeperGroup.RefreshMode mode;
+
+    RefreshOperation(ZooKeeperGroup cache, ZooKeeperGroup.RefreshMode mode) {
+        this.cache = cache;
+        this.mode = mode;
+    }
+
+    @Override
+    public void invoke() throws Exception {
+        cache.refresh(mode);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        RefreshOperation that = (RefreshOperation) o;
+
+        //noinspection RedundantIfStatement
+        if (mode != that.mode) {
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        return mode.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "RefreshOperation(" + mode + "){}";
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bf82a5ed/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/SequenceComparator.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/SequenceComparator.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/SequenceComparator.java
new file mode 100644
index 0000000..42b33a2
--- /dev/null
+++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/SequenceComparator.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeepermaster.group.internal;
+
+import java.util.Comparator;
+
+public class SequenceComparator implements Comparator<ChildData> {
+
+    @Override
+    public int compare(ChildData left, ChildData right) {
+        return left.getPath().compareTo(right.getPath());
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bf82a5ed/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/StaticManagedGroupFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/StaticManagedGroupFactory.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/StaticManagedGroupFactory.java
new file mode 100644
index 0000000..3b91ccd
--- /dev/null
+++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/StaticManagedGroupFactory.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeepermaster.group.internal;
+
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.camel.component.zookeepermaster.group.Group;
+import org.apache.camel.component.zookeepermaster.group.NodeState;
+import org.apache.curator.framework.CuratorFramework;
+
+public class StaticManagedGroupFactory implements ManagedGroupFactory {
+
+    private final CuratorFramework curator;
+    private final boolean shouldClose;
+
+    StaticManagedGroupFactory(CuratorFramework curator, boolean shouldClose) {
+        this.curator = curator;
+        this.shouldClose = shouldClose;
+    }
+
+    @Override
+    public CuratorFramework getCurator() {
+        return curator;
+    }
+
+    @Override
+    public <T extends NodeState> Group<T> createGroup(String path, Class<T> clazz) {
+        return new ZooKeeperGroup<T>(curator, path, clazz);
+    }
+
+    @Override
+    public <T extends NodeState> Group<T> createGroup(String path, Class<T> clazz, ThreadFactory threadFactory) {
+        return new ZooKeeperGroup<T>(curator, path, clazz, threadFactory);
+    }
+
+    @Override
+    public <T extends NodeState> Group<T> createMultiGroup(String path, Class<T> clazz) {
+        return new ZooKeeperMultiGroup<T>(curator, path, clazz);
+    }
+
+    @Override
+    public <T extends NodeState> Group<T> createMultiGroup(String path, Class<T> clazz, ThreadFactory threadFactory) {
+        return new ZooKeeperMultiGroup<T>(curator, path, clazz, threadFactory);
+    }
+
+    @Override
+    public void close() {
+        if (shouldClose) {
+            curator.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bf82a5ed/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/UpdateOperation.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/UpdateOperation.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/UpdateOperation.java
new file mode 100644
index 0000000..687bfbf
--- /dev/null
+++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/UpdateOperation.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeepermaster.group.internal;
+
+import org.apache.camel.component.zookeepermaster.group.NodeState;
+
+class UpdateOperation<T extends NodeState> implements Operation {
+    private final ZooKeeperGroup<T> cache;
+    private final T node;
+
+    UpdateOperation(ZooKeeperGroup cache, T node) {
+        this.cache = cache;
+        this.node = node;
+    }
+
+    @Override
+    public void invoke() throws Exception {
+        cache.doUpdate(node);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        return getClass().hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "UpdateOperation{node='" + node + '\'' + '}';
+    }
+}