You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ha...@apache.org on 2011/08/25 00:31:39 UTC

svn commit: r1161306 [2/3] - in /camel/trunk: components/ components/camel-zookeeper/ components/camel-zookeeper/src/ components/camel-zookeeper/src/main/ components/camel-zookeeper/src/main/java/ components/camel-zookeeper/src/main/java/org/ component...

Added: camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/GetChildrenOperation.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/GetChildrenOperation.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/GetChildrenOperation.java (added)
+++ camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/GetChildrenOperation.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.operations;
+
+import java.util.List;
+
+import static java.lang.String.format;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * <code>GetChildrenOperation</code> is a basic ZooKeeper operation used to
+ * retrieve the list of children belonging to a given ZooKeeper node.
+ */
+public class GetChildrenOperation extends ZooKeeperOperation<List<String>> {
+
+    public GetChildrenOperation(ZooKeeper connection, String node) {
+        super(connection, node);
+    }
+
+    public OperationResult<List<String>> getResult() {
+        try {
+            Stat statistics = new Stat();
+
+            List<String> children = connection.getChildren(node, true, statistics);
+            if (LOG.isDebugEnabled()) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace(format("Received children from '%s' path with statistics '%s'", node, statistics));
+                } else {
+                    LOG.debug(format("Received children from '%s' path ", node));
+                }
+            }
+            return new OperationResult<List<String>>(children, statistics);
+        } catch (Exception e) {
+            return new OperationResult<List<String>>(e);
+        }
+    }
+}

Added: camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/GetDataOperation.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/GetDataOperation.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/GetDataOperation.java (added)
+++ camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/GetDataOperation.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.operations;
+
+import static java.lang.String.format;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * <code>GetDataOperation</code> is a basic operation to immediately retrieve
+ * the data associated with a given ZooKeeper node.
+ */
+public class GetDataOperation extends ZooKeeperOperation<byte[]> {
+
+    public GetDataOperation(ZooKeeper connection, String node) {
+       super(connection, node);
+    }
+
+    public OperationResult<byte[]> getResult() {
+        try {
+            Stat statistics = new Stat();
+
+            if (LOG.isDebugEnabled()) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace(format("Received data from '%s' path with statistics '%s'", node, statistics));
+                } else {
+                    LOG.debug(format("Received data from '%s' path ", node));
+                }
+            }
+            return new OperationResult<byte[]>(connection.getData(node, true, statistics), statistics);
+        } catch (Exception e) {
+            return new OperationResult<byte[]>(e);
+        }
+    }
+
+}

Added: camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/OperationResult.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/OperationResult.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/OperationResult.java (added)
+++ camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/OperationResult.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.operations;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * <code>OperationResult</code> is used to ecapsulate the results of executing a
+ * {@link ZooKeeperOperation}
+ */
+public class OperationResult<ResultType> {
+    private Stat statistics;
+    private ResultType result;
+    private Exception exception;
+    private boolean ok;
+
+    public OperationResult(ResultType result, Stat statistics) {
+        this(result, statistics, true);
+    }
+
+    public OperationResult(ResultType result, Stat statistics, boolean ok) {
+        this.result = result;
+        this.statistics = statistics;
+        this.ok = ok;
+    }
+
+    public OperationResult(Exception exception) {
+        this.exception = exception;
+        ok = false;
+    }
+
+    public Exception getException() {
+        return exception;
+    }
+
+    public Stat getStatistics() {
+        return statistics;
+    }
+
+    public ResultType getResult() {
+        return result;
+    }
+
+    public boolean isOk() {
+        return ok;
+    }
+
+    public boolean failedDueTo(Code... codes) {
+        if (exception instanceof KeeperException) {
+
+            for (Code code : codes) {
+                if (code.equals(((KeeperException)exception).code())) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+}

Added: camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/SetDataOperation.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/SetDataOperation.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/SetDataOperation.java (added)
+++ camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/SetDataOperation.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.operations;
+
+import static java.lang.String.format;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * <code>SetDataOperation</code> sets the content of a ZooKeeper node. An optional version
+ * may be specified that the node must currently have for the operation to succeed.
+ * @see {@link ZooKeeper#setData(String, byte[], int)}
+ */
+public class SetDataOperation extends ZooKeeperOperation<byte[]> {
+
+    private byte[] data;
+
+    private int version = -1;
+
+    public SetDataOperation(ZooKeeper connection, String node, byte[] data) {
+        super(connection, node);
+        this.data = data;
+    }
+
+    public OperationResult<byte[]> getResult() {
+        try {
+            Stat statistics = connection.setData(node, data, version);
+            if (LOG.isDebugEnabled()) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace(format("Set data of node '%s'  with '%d' bytes of data, retrieved statistics '%s' ",
+                                     node, data != null ? data.length : 0, statistics));
+                } else {
+                    LOG.debug(format("Set data of node '%s' with '%d' bytes of data", node, data != null ? data.length : 0));
+                }
+            }
+            return new OperationResult<byte[]>(data, statistics);
+        } catch (Exception e) {
+            return new OperationResult<byte[]>(e);
+        }
+    }
+
+    public void setVersion(int version) {
+        this.version = version;
+    }
+
+    @Override
+    @SuppressWarnings("rawtypes")
+    public ZooKeeperOperation createCopy() throws Exception {
+        SetDataOperation copy = (SetDataOperation) super.createCopy();
+        copy.version = -1; // set the version to -1 for 'any version'
+        return copy;
+    }
+
+}

Added: camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/ZooKeeperOperation.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/ZooKeeperOperation.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/ZooKeeperOperation.java (added)
+++ camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/ZooKeeperOperation.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.operations;
+
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * <code>ZooKeeperOperation</code> is the base class for wrapping various
+ * ZooKeeper API instructions and callbacks into callable and composable operation
+ * objects.
+ */
+@SuppressWarnings("rawtypes")
+public abstract class ZooKeeperOperation<ResultType> {
+
+    protected static final transient Log LOG = LogFactory.getLog(ZooKeeperOperation.class);
+
+    protected static final Class[] CONSTRUCTOR_ARGS = {ZooKeeper.class, String.class};
+
+    protected String node;
+
+    protected ZooKeeper connection;
+
+    protected Set<Thread> waitingThreads = new CopyOnWriteArraySet<Thread>();
+    
+    protected OperationResult<ResultType> result;
+    
+    private boolean producesExchange;
+
+    private boolean cancelled;
+
+    public ZooKeeperOperation(ZooKeeper connection, String node) {
+        this(connection, node, true);
+    }
+
+    public ZooKeeperOperation(ZooKeeper connection, String node, boolean producesExchange) {
+        this.connection = connection;
+        this.node = node;
+        this.producesExchange = producesExchange;
+    }
+
+    /**
+     * Gets the result of this zookeeper operation, i.e. some data and the
+     * associated node stats
+     *
+     * @return
+     */
+    public abstract OperationResult<ResultType> getResult();
+
+    public OperationResult<ResultType> get() throws InterruptedException, ExecutionException {
+        waitingThreads.add(Thread.currentThread());
+        result = getResult();
+        return result;
+    }
+
+    public OperationResult<ResultType> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+        return get(); // TODO ; perhaps set a timer here ....
+    }
+
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        if (mayInterruptIfRunning) {
+            for (Thread waiting : waitingThreads) {
+                waiting.interrupt();
+            }
+            cancelled = true;
+        }
+        return mayInterruptIfRunning;
+    }
+
+    public boolean isCancelled() {
+        return cancelled;
+    }
+
+    public boolean isDone() {
+        return result != null;
+    }
+
+    public String getNode() {
+        return node;
+    }
+
+    public boolean shouldProduceExchange() {
+        return producesExchange;
+    }
+
+
+    // TODO: slightly different to a clone as it uses the constructor
+    public ZooKeeperOperation createCopy() throws Exception {
+        return getClass().getConstructor(CONSTRUCTOR_ARGS).newInstance(new Object[] {connection, node});
+    }
+}

Added: camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java (added)
+++ camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.policy;
+
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static java.lang.String.format;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.Route;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.zookeeper.SequenceComparator;
+import org.apache.camel.component.zookeeper.ZooKeeperEndpoint;
+import org.apache.camel.component.zookeeper.ZooKeeperMessage;
+import org.apache.camel.impl.JavaUuidGenerator;
+import org.apache.camel.impl.RoutePolicySupport;
+import org.apache.camel.spi.UuidGenerator;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.zookeeper.CreateMode;
+
+/**
+ * <code>ZooKeeperRoutePolicy</code> uses the leader election capabilities of a
+ * ZooKeeper cluster to control how routes are enabled. It is typically used in
+ * fail-over scenarios controlling identical instances of a route across a
+ * cluster of Camel based servers.
+ * <p>
+ * The policy is configured with a 'top n' number of routes that should be
+ * allowed to start, for a master/slave scenario this would be 1. Each instance
+ * of the policy will execute the election algorithm to obtain its position in
+ * the hierarchy of servers, if it is within the 'top n' servers then the policy
+ * is enabled and exchanges can be processed by the route. If not it waits for a
+ * change in the leader hierarchy and then reruns this scenario to see if it is
+ * now in the top n.
+ * <p>
+ * All instances of the policy must also be configured with the same path on the
+ * ZooKeeper cluster where the election will be carried out. It is good practice
+ * for this to indicate the application e.g. /someapplication/someroute/ note
+ * that these nodes should exist before using the policy.
+ * <p>
+ * See @link{ http://hadoop.apache
+ * .org/zookeeper/docs/current/recipes.html#sc_leaderElection} for more on how
+ * Leader election is achieved with ZooKeeper.
+ * 
+ */
+public class ZooKeeperRoutePolicy extends RoutePolicySupport {
+
+    private String uri;
+
+    private int enabledCount;
+
+    private String candidateName;
+
+    private final Lock lock = new ReentrantLock();
+
+    private final CountDownLatch electionComplete = new CountDownLatch(1);
+
+    private Set<Route> suspendedRoutes = new CopyOnWriteArraySet<Route>();
+
+    private AtomicBoolean shouldProcessExchanges = new AtomicBoolean();
+
+    private ProducerTemplate template;
+
+    private boolean shouldStopConsumer = true;
+
+    private UuidGenerator uuidGenerator = new JavaUuidGenerator();
+
+    private boolean isCandidateCreated;
+
+    public ZooKeeperRoutePolicy(String uri, int enabledCount) throws Exception {
+        this.uri = uri;
+        this.enabledCount = enabledCount;
+        createCandidateName();
+    }
+
+    private void createCandidateName() throws Exception {
+        /** UUID would be enough, also using hostname for human readability */
+        StringBuilder b = new StringBuilder(InetAddress.getLocalHost().getCanonicalHostName());
+        b.append("-").append(uuidGenerator.generateUuid());
+        this.candidateName = b.toString();
+    }
+
+    @Override
+    public void onExchangeBegin(Route route, Exchange exchange) {
+        testAndCreateCandidateNode(route);
+
+        awaitElectionResults();
+        if (!shouldProcessExchanges.get()) {
+            if (shouldStopConsumer) {
+                stopConsumer(route);
+            }
+
+            IllegalStateException e = new IllegalStateException("Zookeeper based route policy prohibits processing exchanges, stopping route and failing the exchange");
+            exchange.setException(e);
+
+        } else {
+            if (shouldStopConsumer) {
+                startConsumer(route);
+            }
+        }
+    }
+
+    private void testAndCreateCandidateNode(Route route) {
+        try {
+            lock.lock();
+            if (!isCandidateCreated) {
+                createCandidateNode(route.getRouteContext().getCamelContext());
+                isCandidateCreated = true;
+            }
+        } catch (Exception e) {
+            handleException(e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void awaitElectionResults() {
+        while (electionComplete.getCount() > 0) {
+            try {
+                electionComplete.await();
+            } catch (InterruptedException e1) {
+            }
+        }
+    }
+
+    private void startConsumer(Route route) {
+        try {
+            lock.lock();
+            if (suspendedRoutes.contains(route)) {
+                startConsumer(route.getConsumer());
+                suspendedRoutes.remove(route);
+            }
+        } catch (Exception e) {
+            handleException(e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void stopConsumer(Route route) {
+        try {
+            lock.lock();
+            // check that we should still suspend once the lock is acquired
+            if (!suspendedRoutes.contains(route) && !shouldProcessExchanges.get()) {
+                stopConsumer(route.getConsumer());
+                suspendedRoutes.add(route);
+            }
+        } catch (Exception e) {
+            handleException(e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void startAllStoppedConsumers() {
+        try {
+            lock.lock();
+            if (!suspendedRoutes.isEmpty()) {
+                if (log.isDebugEnabled()) {
+                    log.debug(format("'%d' have been stopped previously by poilcy, restarting.", suspendedRoutes.size()));
+                }
+                for (Route suspended : suspendedRoutes) {
+                    startConsumer(suspended.getConsumer());
+                }
+                suspendedRoutes.clear();
+            }
+
+        } catch (Exception e) {
+            handleException(e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public boolean isShouldStopConsumer() {
+        return shouldStopConsumer;
+    }
+
+    public void setShouldStopConsumer(boolean shouldStopConsumer) {
+        this.shouldStopConsumer = shouldStopConsumer;
+    }
+
+    private ZooKeeperEndpoint createCandidateNode(CamelContext camelContext) {
+        this.template = camelContext.createProducerTemplate();
+        if (log.isInfoEnabled()) {
+            log.info(format("Initializing ZookeeperRoutePolicy with uri '%s'", uri));
+        }
+        ZooKeeperEndpoint zep = (ZooKeeperEndpoint)camelContext.getEndpoint(uri);
+        zep.getConfiguration().setCreate(true);
+        String fullpath = createFullPathToCandidate(zep);
+        Exchange e = zep.createExchange();
+        e.setPattern(ExchangePattern.InOut);
+        e.getIn().setHeader(ZooKeeperMessage.ZOOKEEPER_NODE, fullpath);
+        e.getIn().setHeader(ZooKeeperMessage.ZOOKEEPER_CREATE_MODE, CreateMode.EPHEMERAL_SEQUENTIAL);
+        template.send(zep, e);
+
+        if (e.isFailed()) {
+            log.error("Error setting up election node " + fullpath, e.getException());
+        } else {
+            if (log.isInfoEnabled()) {
+                log.info(format("Candidate node '%s' has been created", fullpath));
+            }
+            try {
+                if (zep != null) {
+                    camelContext.addRoutes(new ElectoralMonitorRoute(zep));
+                }
+            } catch (Exception ex) {
+                log.error("Error configuring ZookeeperRoutePolicy", ex);
+            }
+        }
+        return zep;
+
+    }
+
+    private String createFullPathToCandidate(ZooKeeperEndpoint zep) {
+        String fullpath = zep.getConfiguration().getPath();
+        if (!fullpath.endsWith("/")) {
+            fullpath += "/";
+        }
+        fullpath += candidateName;
+        return fullpath;
+    }
+
+    private class ElectoralMonitorRoute extends RouteBuilder {
+
+        private SequenceComparator comparator = new SequenceComparator();
+        
+        private ZooKeeperEndpoint zep;
+
+        public ElectoralMonitorRoute(ZooKeeperEndpoint zep) {
+            this.zep = zep;
+            zep.getConfiguration().setListChildren(true);
+            zep.getConfiguration().setRepeat(true);
+        }
+
+        @Override
+        public void configure() throws Exception {
+
+            /**
+             * TODO: this is cheap cheerful but suboptimal; it suffers from the
+             * 'herd effect' that on any change to the candidates list every
+             * policy instance will ask for the entire candidate list again.
+             * This is fine for small numbers of nodes (for scenarios
+             * like Master-Slave it is perfect) but could get noisy if
+             * large numbers of nodes were involved.
+             * <p>
+             * Better would be to find the position of this node in the list and
+             * watch the node in the position ahead node ahead of this and only
+             * request the candidate list when its status changes. This will
+             * require enhancing the consumer to allow custom operation lists.
+             */
+            from(zep).sort(body(), comparator).process(new Processor() {
+
+                @SuppressWarnings("unchecked")
+                public void process(Exchange e) throws Exception {
+                    List<String> candidates = (List<String>)ExchangeHelper.getMandatoryInBody(e);
+
+                    int location = Math.abs(Collections.binarySearch(candidates, candidateName));
+                    /**
+                     * check if the item at this location starts with this nodes
+                     * candidate name
+                     */
+                    if (isOurCandidateAtLocationInCandidatesList(candidates, location)) {
+
+                        shouldProcessExchanges.set(location <= enabledCount);
+                        if (log.isDebugEnabled()) {
+                            log.debug(format("This node is number '%d' on the candidate list, route is configured for the top '%d'. Exchange processing will be %s", location,
+                                             enabledCount, shouldProcessExchanges.get() ? "enabled" : "disabled"));
+                        }
+                        startAllStoppedConsumers();
+                    }
+                    electionComplete.countDown();
+                }
+
+                private boolean isOurCandidateAtLocationInCandidatesList(List<String> candidates, int location) {
+                    return location <= candidates.size() && candidates.get(location - 1).startsWith(candidateName);
+                }
+            });
+        }
+    }
+
+}

Added: camel/trunk/components/camel-zookeeper/src/main/resources/META-INF/services/org/apache/camel/component/zoo
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/resources/META-INF/services/org/apache/camel/component/zoo?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/main/resources/META-INF/services/org/apache/camel/component/zoo (added)
+++ camel/trunk/components/camel-zookeeper/src/main/resources/META-INF/services/org/apache/camel/component/zoo Wed Aug 24 22:31:37 2011
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+class=org.apache.camel.component.zookeeper.ZooKeeperComponent

Added: camel/trunk/components/camel-zookeeper/src/main/resources/META-INF/services/org/apache/camel/component/zookeeper
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/resources/META-INF/services/org/apache/camel/component/zookeeper?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/main/resources/META-INF/services/org/apache/camel/component/zookeeper (added)
+++ camel/trunk/components/camel-zookeeper/src/main/resources/META-INF/services/org/apache/camel/component/zookeeper Wed Aug 24 22:31:37 2011
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+class=org.apache.camel.component.zookeeper.ZooKeeperComponent

Added: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeChildrenTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeChildrenTest.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeChildrenTest.java (added)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeChildrenTest.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.Message;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.zookeeper.NaturalSortComparator.Order;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.zookeeper.data.Stat;
+import org.junit.Test;
+
+@SuppressWarnings("unchecked")
+public class ConsumeChildrenTest extends ZooKeeperTestSupport {
+
+    @Override
+    protected RouteBuilder[] createRouteBuilders() throws Exception {
+        return new RouteBuilder[] {new RouteBuilder() {
+            public void configure() throws Exception {
+                from("zoo://localhost:39913/grimm?repeat=true&listChildren=true").sort(body(), new NaturalSortComparator(Order.Descending)).to("mock:zookeeper-data");
+            }
+        }};
+    }
+
+    @Test
+    public void shouldAwaitCreationAndGetDataNotification() throws Exception {
+
+        MockEndpoint mock = getMockEndpoint("mock:zookeeper-data");
+        mock.expectedMessageCount(5);
+
+        client.createPersistent("/grimm", "parent");
+        client.create("/grimm/hansel", "child");
+        client.create("/grimm/gretel", "child");
+        client.delete("/grimm/hansel");
+        client.delete("/grimm/gretel");
+
+        mock.await(5, TimeUnit.SECONDS);
+        mock.assertIsSatisfied();
+
+        validateExchangesContainListings(mock, createChildListing(), createChildListing("hansel"), createChildListing("hansel", "gretel"), createChildListing("gretel"),
+                                         createChildListing());
+
+    }
+
+    private void validateExchangesContainListings(MockEndpoint mock, List<String>... expected) throws InvalidPayloadException {
+        int index = 0;
+        for (Exchange received : mock.getReceivedExchanges()) {
+            List<String> actual = ExchangeHelper.getMandatoryInBody(received, List.class);
+            assertEquals(expected[index++], actual);
+            validateChildrenCountChangesEachTime(mock);
+        }
+    }
+
+    protected void validateChildrenCountChangesEachTime(MockEndpoint mock) {
+        int lastChildCount = -1;
+        List<Exchange> received = mock.getReceivedExchanges();
+        for (int x = 0; x < received.size(); x++) {
+            Message zkm = mock.getReceivedExchanges().get(x).getIn();
+            int childCount = ((Stat)zkm.getHeader(ZooKeeperMessage.ZOOKEEPER_STATISTICS)).getNumChildren();
+            assertNotSame("Num of children did not change", lastChildCount, childCount);
+            lastChildCount = childCount;
+        }
+    }
+}

Added: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeDataTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeDataTest.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeDataTest.java (added)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeDataTest.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.junit.Test;
+
+public class ConsumeDataTest extends ZooKeeperTestSupport {
+
+    @Override
+    protected RouteBuilder[] createRouteBuilders() throws Exception {
+        return new RouteBuilder[] {new RouteBuilder() {
+            public void configure() throws Exception {
+                from("zoo://localhost:39913/camel?repeat=true").to("mock:zookeeper-data");
+            }
+        }};
+    }
+
+    @Test
+    public void shouldAwaitCreationAndGetDataNotification() throws Exception {
+
+        MockEndpoint mock = getMockEndpoint("mock:zookeeper-data");
+        mock.expectedMinimumMessageCount(10);
+
+        createCamelNode();
+        updateNode(10);
+
+        mock.await(5, TimeUnit.SECONDS);
+        mock.assertIsSatisfied();
+
+        validateExchangesReceivedInOrderWithIncreasingVersion(mock);
+    }
+
+    @Test
+    public void deletionOfAwaitedNodeCausesNoFailure() throws Exception {
+
+        MockEndpoint mock = getMockEndpoint("mock:zookeeper-data");
+        mock.expectedMessageCount(11);
+        createCamelNode();
+
+        delay(200);
+
+        // by now we are back waiting for a change so delete the node
+        client.delete("/camel");
+
+        // recreate and update a number of times.
+        createCamelNode();
+        updateNode(10);
+
+        mock.await(5, TimeUnit.SECONDS);
+        mock.assertIsSatisfied();
+    }
+
+    private void updateNode(int times) throws InterruptedException, Exception {
+        for (int x = 1; x < times; x++) {
+            delay(200);
+            client.setData("/camel", testPayload + "_" + x, -1);
+        }
+    }
+
+    private void createCamelNode() throws InterruptedException, Exception {
+        try {
+            delay(1000);
+            client.create("/camel", testPayload + "_0");
+        } catch (NodeExistsException e) {
+        }
+    }
+}

Added: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/NaturalSortComparatorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/NaturalSortComparatorTest.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/NaturalSortComparatorTest.java (added)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/NaturalSortComparatorTest.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.camel.component.zookeeper.NaturalSortComparator.Order;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class NaturalSortComparatorTest {
+
+    @Test
+    public void testSortOrder() throws Exception {
+
+        List<String> sorted = Arrays
+            .asList(new String[] {"0", "1", "3", "4.0", "11", "30", "55", "225", "333", "camel-2.1.0", "camel-2.1.1", "camel-2.1.1-SNAPSHOT", "camel-2.2.0"});
+
+        List<String> unsorted = new ArrayList<String>(sorted);
+        Collections.shuffle(unsorted);
+        Collections.sort(unsorted, new NaturalSortComparator());
+        compareLists(sorted, unsorted);
+
+        Collections.shuffle(unsorted);
+        Collections.sort(unsorted, new NaturalSortComparator(Order.Descending));
+        Collections.reverse(sorted);
+        compareLists(sorted, unsorted);
+    }
+
+    private void compareLists(List<String> sorted, List<String> unsorted) {
+        for (int x = 0; x < unsorted.size(); x++) {
+            assertEquals(sorted.get(x), unsorted.get(x));
+        }
+    }
+}

Added: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperEndpointTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperEndpointTest.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperEndpointTest.java (added)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperEndpointTest.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper;
+
+import java.util.ArrayList;
+import java.util.Set;
+
+import javax.management.Attribute;
+import javax.management.ObjectName;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.zookeeper.ZooKeeperTestSupport.TestZookeeperClient;
+import org.apache.camel.component.zookeeper.ZooKeeperTestSupport.TestZookeeperServer;
+import org.apache.camel.management.JmxInstrumentationUsingDefaultsTest;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.springframework.jmx.support.JmxUtils;
+
+@SuppressWarnings("all")
+public class ZooKeeperEndpointTest extends JmxInstrumentationUsingDefaultsTest {
+
+    private static int teardownAfter;
+
+    @Override
+    protected void setUp() throws Exception {
+        if (teardownAfter == 0) {
+            ZooKeeperTestSupport.setupTestServer();
+        }
+        super.setUp();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        if (++teardownAfter == 3) {
+            ZooKeeperTestSupport.shutdownServer();
+        }
+    }
+
+    public synchronized void testEnpointConfigurationCanBeSetViaJMX() throws Exception {
+        Set s = mbsc.queryNames(new ObjectName(domainName + ":type=endpoints,*"), null);
+        assertEquals("Could not find  endpoints: " + s, 2, s.size());
+        ObjectName zepName = new ArrayList<ObjectName>(s).get(0);
+
+        verifyManagedAttribute(zepName, "Path", "/someotherpath");
+        verifyManagedAttribute(zepName, "Create", true);
+        verifyManagedAttribute(zepName, "Repeat", true);
+        verifyManagedAttribute(zepName, "ListChildren", true);
+        verifyManagedAttribute(zepName, "AwaitExistence", true);
+        verifyManagedAttribute(zepName, "Timeout", 12345);
+        verifyManagedAttribute(zepName, "Backoff", 12345L);
+
+        mbsc.invoke(zepName, "clearServers", null, JmxUtils.getMethodSignature(ZooKeeperEndpoint.class.getMethod("clearServers", null)));
+        mbsc.invoke(zepName, "addServer", new Object[] {"someserver:12345"},
+                    JmxUtils.getMethodSignature(ZooKeeperEndpoint.class.getMethod("addServer", new Class[] {String.class})));
+
+    }
+
+    private void verifyManagedAttribute(ObjectName zepName, String attributeName, String attributeValue) throws Exception {
+        mbsc.setAttribute(zepName, new Attribute(attributeName, attributeValue));
+        assertEquals(attributeValue, mbsc.getAttribute(zepName, attributeName));
+    }
+
+    private void verifyManagedAttribute(ObjectName zepName, String attributeName, Integer attributeValue) throws Exception {
+        mbsc.setAttribute(zepName, new Attribute(attributeName, attributeValue));
+        assertEquals(attributeValue, mbsc.getAttribute(zepName, attributeName));
+    }
+
+    private void verifyManagedAttribute(ObjectName zepName, String attributeName, Boolean attributeValue) throws Exception {
+        mbsc.setAttribute(zepName, new Attribute(attributeName, attributeValue));
+        assertEquals(attributeValue, mbsc.getAttribute(zepName, attributeName));
+    }
+
+    private void verifyManagedAttribute(ObjectName zepName, String attributeName, Long attributeValue) throws Exception {
+        mbsc.setAttribute(zepName, new Attribute(attributeName, attributeValue));
+        assertEquals(attributeValue, mbsc.getAttribute(zepName, attributeName));
+    }
+
+    @Override
+    public void testCounters() throws Exception {
+    }
+
+    @Override
+    public void testMBeansRegistered() throws Exception {
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("zoo://localhost:39913/node").to("mock:test");
+            }
+        };
+    }
+}

Added: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java (added)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,337 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.camel.util.FileUtil;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class ZooKeeperTestSupport extends CamelTestSupport {
+
+    protected static TestZookeeperServer server;
+    
+    protected static TestZookeeperClient client;
+
+    private static final Logger LOG = Logger.getLogger(ZooKeeperTestSupport.class);
+ 
+    protected String testPayload = "This is a test";
+
+    protected byte[] testPayloadBytes = testPayload.getBytes();
+        
+    @BeforeClass
+    public static void setupTestServer() throws Exception {
+        LOG.info("Starting Zookeeper Test Infrastructure");
+        server = new TestZookeeperServer(getServerPort(), clearServerData());
+        waitForServerUp("localhost:" + getServerPort(), 1000);
+        client = new TestZookeeperClient(getServerPort(), getTestClientSessionTimeout());
+        LOG.info("Started Zookeeper Test Infrastructure on port " + getServerPort());
+    }
+
+    public ZooKeeper getConnection() {
+        return client.getConnection();
+    }
+
+    @AfterClass
+    public static void shutdownServer() throws Exception {
+        LOG.info("Stopping Zookeeper Test Infrastructure");
+        client.shutdown();
+        server.shutdown();
+        waitForServerDown("localhost:" + getServerPort(), 1000);
+        LOG.info("Stopped Zookeeper Test Infrastructure");
+    }
+
+    protected static int getServerPort() {
+        return 39913;
+    }
+
+    protected static int getTestClientSessionTimeout() {
+        return 100000;
+    }
+
+    protected static boolean clearServerData() {
+        return true;
+    }
+
+    public static class TestZookeeperServer {
+        private NIOServerCnxn.Factory connectionFactory;
+        private ZooKeeperServer zkServer;
+
+        public TestZookeeperServer(int clientPort, boolean clearServerData) throws Exception {
+
+            if (clearServerData) {
+                File working = new File("./target/zookeeper");
+                deleteDir(working);
+                if (working.exists()) {
+                    throw new Exception("Could not delete Test Zookeeper Server working dir ./target/zookeeper");
+                }
+            }
+            zkServer = new ZooKeeperServer();
+            File dataDir = new File("./target/zookeeper/log");
+            File snapDir = new File("./target/zookeeper/data");
+            FileTxnSnapLog ftxn = new FileTxnSnapLog(dataDir, snapDir);
+            zkServer.setTxnLogFactory(ftxn);
+            zkServer.setTickTime(1000);
+            connectionFactory = new NIOServerCnxn.Factory(new InetSocketAddress("localhost", clientPort), 0);
+            connectionFactory.startup(zkServer);
+        }
+
+        public void shutdown() throws Exception {
+            connectionFactory.shutdown();
+            connectionFactory.join();
+            while (zkServer.isRunning()) {
+                zkServer.shutdown();
+                Thread.sleep(100);
+            }
+        }
+    }
+
+    public static class TestZookeeperClient implements Watcher {
+
+        public static int x;
+
+        private final Logger log = Logger.getLogger(getClass());
+
+        private ZooKeeper zk;
+
+        private CountDownLatch connected = new CountDownLatch(1);
+
+
+        public TestZookeeperClient(int port, int timeout) throws Exception {
+            zk = new ZooKeeper("localhost:" + port, timeout, this);
+            connected.await();
+        }
+
+        public ZooKeeper getConnection() {
+            return zk;
+        }
+
+        public void shutdown() throws Exception {
+            zk.close();
+        }
+
+        public byte[] waitForNodeChange(String node) throws Exception {
+            Stat stat = new Stat();
+            return zk.getData(node, this, stat);
+        }
+
+        public void create(String node, String data) throws Exception {
+            log.debug(String.format("Creating node '%s' with data '%s' ", node, data));
+            create(node, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+
+        }
+
+        public void createPersistent(String node, String data) throws Exception {
+            log.debug(String.format("Creating node '%s' with data '%s' ", node, data));
+            create(node, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        }
+
+        public void create(String znode, String data, List<ACL> access, CreateMode mode) throws Exception {
+            delay(200);
+            String created = zk.create(znode, data != null ? data.getBytes() : null, access, mode);
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Created znode named '%s'", created));
+            }
+        }
+
+        public Stat setData(String node, String data, int version) throws Exception {
+            log.debug(String.format("TestClient Updating data of node %s to %s", node, data));
+            return zk.setData(node, data.getBytes(), version);
+        }
+
+        public byte[] getData(String znode) throws Exception {
+            return zk.getData(znode, false, new Stat());
+        }
+
+        public void process(WatchedEvent event) {
+
+            if (event.getState() == KeeperState.SyncConnected) {
+                log.info("TestClient connected");
+                connected.countDown();
+            } else {
+                if (event.getState() == KeeperState.Disconnected) {
+                    log.info("TestClient connected ?" + zk.getState());
+                }
+            }
+        }
+
+        public void delete(String node) throws Exception {
+            delay(200);
+            log.debug("Deleting node " + node);
+            zk.delete(node, -1);
+        }
+    }
+
+    // Wait methods are taken directly from the Zookeeper tests. A tests jar
+    // would be nice! Another good reason the keeper folks should move to maven.
+    private static boolean waitForServerUp(String hp, long timeout) {
+        long start = System.currentTimeMillis();
+        while (true) {
+            try {
+                // if there are multiple hostports, just take the first one
+                hp = hp.split(",")[0];
+                String result = send4LetterWord(hp, "stat");
+                if (result.startsWith("Zookeeper version:")) {
+                    return true;
+                }
+            } catch (IOException e) {
+                LOG.info("server " + hp + " not up " + e);
+            }
+
+            if (System.currentTimeMillis() > start + timeout) {
+                break;
+            }
+            try {
+                Thread.sleep(250);
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+        return false;
+    }
+
+    private static String send4LetterWord(String hp, String cmd) throws IOException {
+        String split[] = hp.split(":");
+        String host = split[0];
+        int port;
+        try {
+            port = Integer.parseInt(split[1]);
+        } catch (RuntimeException e) {
+            throw new RuntimeException("Problem parsing " + hp + e.toString());
+        }
+
+        Socket sock = new Socket(host, port);
+        BufferedReader reader = null;
+        try {
+            OutputStream outstream = sock.getOutputStream();
+            outstream.write(cmd.getBytes());
+            outstream.flush();
+
+            reader = new BufferedReader(new InputStreamReader(sock.getInputStream()));
+            StringBuffer sb = new StringBuffer();
+            String line;
+            while ((line = reader.readLine()) != null) {
+                sb.append(line + "\n");
+            }
+            return sb.toString();
+        } finally {
+            sock.close();
+            if (reader != null) {
+                reader.close();
+            }
+        }
+    }
+
+    private static boolean waitForServerDown(String hp, long timeout) {
+        long start = System.currentTimeMillis();
+        while (true) {
+            try {
+                send4LetterWord(hp, "stat");
+            } catch (IOException e) {
+                return true;
+            }
+
+            if (System.currentTimeMillis() > start + timeout) {
+                break;
+            }
+            try {
+                Thread.sleep(250);
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+        return false;
+    }
+
+    public static void deleteDir(File f) {
+        LinkedList<File> deleteStack = new LinkedList<File>();
+        deleteStack.addLast(f);
+        deleteDir(deleteStack);
+    }
+
+    private static void deleteDir(Deque<File> deleteStack) {
+        File f = deleteStack.pollLast();
+        if (f != null) {
+            if (f.isDirectory()) {
+                for (File child : f.listFiles()) {
+                    deleteStack.addLast(child);
+                }
+            }
+            deleteDir(deleteStack);
+            FileUtil.deleteFile(f);
+        }
+    }
+
+    public static void delay(int wait) throws InterruptedException {
+        Thread.sleep(wait);
+    }
+
+    protected List<String> createChildListing(String... children) {
+        return Arrays.asList(children);
+    }
+
+    protected void validateExchangesReceivedInOrderWithIncreasingVersion(MockEndpoint mock) {
+        int lastVersion = -1;
+        List<Exchange> received = mock.getReceivedExchanges();
+        for (int x = 0; x < received.size(); x++) {
+            Message zkm = mock.getReceivedExchanges().get(x).getIn();
+            int version = ZooKeeperMessage.getStatistics(zkm).getVersion();
+            assertTrue("Version did not increase", lastVersion < version);
+            lastVersion = version;
+        }
+    }
+
+    protected void verifyAccessControlList(String node, List<ACL> expected) throws Exception {
+        getConnection().getACL(node, new Stat());
+    }
+
+    protected void verifyNodeContainsData(String node, byte[] expected) throws Exception {
+        if (expected == null) {
+            assertNull(client.getData(node));
+        } else {
+            assertEquals(new String(expected), new String(client.getData(node)));
+        }
+    }
+}

Added: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperUtilsTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperUtilsTest.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperUtilsTest.java (added)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperUtilsTest.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper;
+
+
+
+import org.apache.camel.Message;
+import org.apache.camel.impl.DefaultMessage;
+import org.apache.zookeeper.CreateMode;
+import org.junit.Test;
+
+import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getCreateMode;
+import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getCreateModeFromString;
+
+import static org.junit.Assert.assertEquals;
+
+public class ZooKeeperUtilsTest {
+
+    @Test
+    public void testCreatModeExtraction() {
+        assertEquals(CreateMode.EPHEMERAL, getCreateModeFromString("EPHEMERAL", CreateMode.EPHEMERAL));
+        assertEquals(CreateMode.EPHEMERAL_SEQUENTIAL, getCreateModeFromString("EPHEMERAL_SEQUENTIAL", CreateMode.EPHEMERAL));
+        assertEquals(CreateMode.PERSISTENT, getCreateModeFromString("PERSISTENT", CreateMode.EPHEMERAL));
+        assertEquals(CreateMode.PERSISTENT_SEQUENTIAL, getCreateModeFromString("PERSISTENT_SEQUENTIAL", CreateMode.EPHEMERAL));
+        assertEquals(CreateMode.EPHEMERAL, getCreateModeFromString("DOESNOTEXIST", CreateMode.EPHEMERAL));
+    }
+    
+    @Test
+    public void testCreatModeExtractionFromMessageHeader() {
+        assertEquals(CreateMode.EPHEMERAL, testModeInMessage("EPHEMERAL", CreateMode.EPHEMERAL));
+        assertEquals(CreateMode.EPHEMERAL_SEQUENTIAL, testModeInMessage("EPHEMERAL_SEQUENTIAL", CreateMode.EPHEMERAL));
+        assertEquals(CreateMode.PERSISTENT, testModeInMessage("PERSISTENT", CreateMode.EPHEMERAL));
+        assertEquals(CreateMode.PERSISTENT_SEQUENTIAL, testModeInMessage("PERSISTENT_SEQUENTIAL", CreateMode.EPHEMERAL));
+        assertEquals(CreateMode.EPHEMERAL, testModeInMessage("DOESNOTEXIST", CreateMode.EPHEMERAL));
+    }
+
+    private CreateMode testModeInMessage(String mode, CreateMode defaultMode) {
+        Message m = new DefaultMessage();
+        m.setHeader(ZooKeeperMessage.ZOOKEEPER_CREATE_MODE, mode);
+        return getCreateMode(m, defaultMode);
+    }
+}

Added: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperConnectionManagerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperConnectionManagerTest.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperConnectionManagerTest.java (added)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperConnectionManagerTest.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.junit.Test;
+
+public class ZookeeperConnectionManagerTest extends ZooKeeperTestSupport {
+
+    @Test
+    public void shouldWaitForConnection() {
+        ZooKeeperConfiguration config = new ZooKeeperConfiguration();
+        config.addZookeeperServer("localhost:39913");
+
+        ZooKeeperComponent component = new ZooKeeperComponent(config);
+        component.setConfiguration(config);
+        component.setCamelContext(context);
+
+        ZooKeeperEndpoint zep = new ZooKeeperEndpoint("zoo:someserver/this/is/a/path", component, config);
+
+        ZooKeeperConnectionManager zkcm = new ZooKeeperConnectionManager(zep);
+        ZooKeeper zk = zkcm.getConnection();
+        zk.getState();
+        assertEquals(States.CONNECTED, zk.getState());
+    }
+}

Added: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperProducerTest.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperProducerTest.java (added)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperProducerTest.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.zookeeper.operations.GetChildrenOperation;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.Stat;
+
+import org.junit.Test;
+
+import static org.apache.camel.component.zookeeper.ZooKeeperMessage.ZOOKEEPER_CREATE_MODE;
+import static org.apache.camel.component.zookeeper.ZooKeeperMessage.ZOOKEEPER_NODE;
+
+
+public class ZookeeperProducerTest extends ZooKeeperTestSupport {
+
+    private String zookeeperUri;
+
+    private String testPayload = "TestPayload";
+
+    @Override
+    protected RouteBuilder[] createRouteBuilders() throws Exception {
+        return new RouteBuilder[] {new RouteBuilder() {
+            public void configure() throws Exception {
+                zookeeperUri = "zoo://localhost:39913/node?create=true";
+                from("direct:roundtrip").to(zookeeperUri).to("mock:producer-out");
+                from(zookeeperUri).to("mock:consumed-from-node");
+            }
+        }, new RouteBuilder() {
+            public void configure() throws Exception {
+                from("direct:no-create-fails-set").to("zoo://localhost:39913/doesnotexist");
+            }
+        }, new RouteBuilder() {
+            public void configure() throws Exception {
+                from("direct:node-from-header").to("zoo://localhost:39913/notset?create=true");
+                from("zoo://localhost:39913/set?create=true").to("mock:consumed-from-set-node");
+            }
+        }, new RouteBuilder() {
+            public void configure() throws Exception {
+                from("direct:create-mode").to("zoo://localhost:39913/persistent?create=true&createMode=PERSISTENT").to("mock:create-mode");
+            }
+        }};
+    }
+
+    @Test
+    public void testRoundtripOfDataToAndFromZnode() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:consumed-from-node");
+        MockEndpoint pipeline = getMockEndpoint("mock:producer-out");
+        mock.expectedMessageCount(1);
+        pipeline.expectedMessageCount(1);
+
+        Exchange e = createExchangeWithBody(testPayload);
+        e.setPattern(ExchangePattern.InOut);
+        template.send("direct:roundtrip", e);
+
+        mock.await(2, TimeUnit.SECONDS);
+        mock.assertIsSatisfied();
+        pipeline.assertIsSatisfied();
+    }
+
+    @Test
+    public void testAsyncRoundtripOfDataToAndFromZnode() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:consumed-from-node");
+        mock.expectedMessageCount(1);
+
+        Exchange e = createExchangeWithBody(testPayload);
+        template.send("direct:roundtrip", e);
+
+        mock.await(2, TimeUnit.SECONDS);
+        mock.assertIsSatisfied();
+    }
+
+    @Test
+    public void setUsingNodeFromHeader() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:consumed-from-set-node");
+        mock.expectedMessageCount(1);
+
+        Exchange e = createExchangeWithBody(testPayload);
+        e.setPattern(ExchangePattern.InOut);
+        template.sendBodyAndHeader("direct:node-from-header", e, ZOOKEEPER_NODE, "/set");
+
+        mock.await(5, TimeUnit.SECONDS);
+        mock.assertIsSatisfied();
+    }
+
+    @Test
+    public void setUsingCreateModeFromHeader() throws Exception {
+
+        client.createPersistent("/modes-test", "parent for modes");
+        for (CreateMode mode : CreateMode.values()) {
+            Exchange exchange = createExchangeWithBody(testPayload);
+            exchange.getIn().setHeader(ZOOKEEPER_CREATE_MODE, mode);
+            exchange.getIn().setHeader(ZOOKEEPER_NODE, "/modes-test/" + mode);
+            exchange.setPattern(ExchangePattern.InOut);
+            template.send("direct:node-from-header", exchange);
+        }
+        GetChildrenOperation listing = new GetChildrenOperation(getConnection(), "/modes-test");
+        assertEquals(CreateMode.values().length, listing.get().getResult().size());
+    }
+    
+    @Test
+    public void createWithOtherCreateMode() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:create-mode");
+        mock.expectedMessageCount(1);
+        
+        Exchange e = createExchangeWithBody(testPayload);
+        e.setPattern(ExchangePattern.InOut);
+        
+        template.send("direct:create-mode", e);
+        mock.await(5, TimeUnit.SECONDS);
+        
+        Stat s = mock.getReceivedExchanges().get(0).getIn().getHeader(ZooKeeperMessage.ZOOKEEPER_STATISTICS, Stat.class);
+        assertEquals(s.getEphemeralOwner(), 0);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void setAndGetListing() throws Exception {
+
+        client.createPersistent("/set-listing", "parent for set and list test");
+
+        Exchange exchange = createExchangeWithBody(testPayload);
+        exchange.getIn().setHeader(ZOOKEEPER_NODE, "/set-listing/firstborn");
+        exchange.setPattern(ExchangePattern.InOut);
+        template.send("zoo://localhost:39913/set-listing?create=true&listChildren=true", exchange);
+        List<String> children = ExchangeHelper.getMandatoryOutBody(exchange, List.class);
+        assertEquals(1, children.size());
+        assertEquals("firstborn", children.get(0));
+    }
+
+    @Test
+    public void testZookeeperMessage() throws Exception {
+
+        MockEndpoint mock = getMockEndpoint("mock:consumed-from-node");
+        mock.expectedMessageCount(1);
+
+        Exchange exchange = createExchangeWithBody(testPayload);
+        template.send("direct:roundtrip", exchange);
+        mock.await();
+        mock.assertIsSatisfied();
+
+        Message received = mock.getReceivedExchanges().get(0).getIn();
+        assertEquals("/node", ZooKeeperMessage.getPath(received));
+        assertNotNull(ZooKeeperMessage.getStatistics(received));
+    }
+}

Added: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/AnyOfOperationTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/AnyOfOperationTest.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/AnyOfOperationTest.java (added)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/AnyOfOperationTest.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.operations;
+
+import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Test;
+
+public class AnyOfOperationTest extends ZooKeeperTestSupport {
+
+
+    @Test
+    public void testExistsOrWaitsWhenNodeExists() throws Exception {
+        String node = "/cogito";
+        client.create(node, "ergo sum");
+        AnyOfOperations operation = getExistsOrWaitOperation(node);
+        assertEquals(node, operation.get().getResult());
+    }
+
+
+    @Test
+    public void testExistsOrWaitsWhenNodeDoesNotExist() throws Exception {
+        String node = "/chapter-one";
+        AnyOfOperations operation = getExistsOrWaitOperation(node);
+        Thread.sleep(1000);
+        client.create(node, "I am born");
+        assertEquals(node, operation.get().getResult());
+    }
+
+    private AnyOfOperations getExistsOrWaitOperation(String node) {
+        ZooKeeper connection = getConnection();
+        AnyOfOperations operation = new AnyOfOperations(node, new ExistsOperation(connection, node),
+                                                        new ExistenceChangedOperation(connection, node));
+        return operation;
+    }
+}

Added: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/ChildrenChangedOperationTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/ChildrenChangedOperationTest.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/ChildrenChangedOperationTest.java (added)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/ChildrenChangedOperationTest.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.operations;
+
+import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Test;
+
+public class ChildrenChangedOperationTest extends ZooKeeperTestSupport {
+
+    @Test
+    public void getsListingWhenNodeIsCreated() throws Exception {
+        String path = "/parent";
+        client.createPersistent(path, null);
+
+        ZooKeeper connection = getConnection();
+        ChildrenChangedOperation future = new ChildrenChangedOperation(connection, path);
+        connection.getChildren(path, future, null);
+
+        client.createPersistent(path + "/child1", null);
+        assertEquals(createChildListing("child1"), future.get().getResult());
+    }
+
+    @Test
+    public void getsNotifiedWhenNodeIsDeleted() throws Exception {
+
+        String path = "/parent2";
+        client.createPersistent(path, null);
+        client.createPersistent(path + "/child1", null);
+
+        ZooKeeper connection = getConnection();
+        ChildrenChangedOperation future = new ChildrenChangedOperation(connection, path);
+        connection.getChildren(path, future, null);
+
+        client.delete(path + "/child1");
+        assertEquals(createChildListing(), future.get().getResult());
+    }
+
+    @Test
+    public void getsNoListingWhenOnlyChangeIsRequired() throws Exception {
+        String path = "/parent3";
+        client.createPersistent(path, null);
+
+        ZooKeeper connection = getConnection();
+        ChildrenChangedOperation future = new ChildrenChangedOperation(connection, path, false);
+        connection.getChildren(path, future, null);
+
+        client.createPersistent(path + "/child3", null);
+        assertEquals(null, future.get());
+    }
+}

Added: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/CreateOperationTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/CreateOperationTest.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/CreateOperationTest.java (added)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/CreateOperationTest.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.operations;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooDefs.Perms;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CreateOperationTest extends ZooKeeperTestSupport {
+
+    private ZooKeeper connection;
+
+    @Before
+    public void setupConnection() {
+        connection = getConnection();
+    }
+
+    @Test
+    public void createBasic() throws Exception {
+
+        CreateOperation create = new CreateOperation(connection, "/one");
+
+        OperationResult<String> result = create.get();
+        assertEquals("/one", result.getResult());
+
+        verifyNodeContainsData("/one", null);
+    }
+
+    @Test
+    public void createBasicWithData() throws Exception {
+        CreateOperation create = new CreateOperation(connection, "/two");
+        create.setData(testPayload.getBytes());
+
+        OperationResult<String> result = create.get();
+
+        assertEquals("/two", result.getResult());
+        verifyNodeContainsData("/two", testPayloadBytes);
+    }
+
+    @Test
+    public void createSequencedNodeToTestCreateMode() throws Exception {
+        CreateOperation create = new CreateOperation(connection, "/three");
+        create.setData(testPayload.getBytes());
+        create.setCreateMode(CreateMode.EPHEMERAL_SEQUENTIAL);
+
+        OperationResult<String> result = create.get();
+        assertEquals("/three0000000002", result.getResult());
+
+        verifyNodeContainsData("/three0000000002", testPayloadBytes);
+    }
+
+    @Test
+    public void createNodeWithSpecificAccess() throws Exception {
+        CreateOperation create = new CreateOperation(connection, "/four");
+        create.setData(testPayload.getBytes());
+        List<ACL> perms = Collections.singletonList(new ACL(Perms.CREATE, Ids.ANYONE_ID_UNSAFE));
+        create.setPermissions(perms);
+
+        OperationResult<String> result = create.get();
+        assertEquals("/four", result.getResult());
+
+        verifyAccessControlList("/four", perms);
+    }
+}

Added: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/DataChangedOperationTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/DataChangedOperationTest.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/DataChangedOperationTest.java (added)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/DataChangedOperationTest.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.operations;
+
+import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Test;
+
+public class DataChangedOperationTest extends ZooKeeperTestSupport {
+
+    @Test
+    public void getsDataWhenNodeChanges() throws Exception {
+        client.create("/datachanged", "this won't hurt a bit");
+        ZooKeeper connection = getConnection();
+
+        DataChangedOperation future = new DataChangedOperation(connection, "/datachanged", true);
+        connection.getData("/datachanged", future, null);
+
+        client.setData("/datachanged", "Really trust us", -1);
+        assertArrayEquals("Really trust us".getBytes(), future.get().getResult());
+    }
+
+    @Test
+    public void getsNotifiedWhenNodeIsDeleted() throws Exception {
+
+        client.create("/existedButWasDeleted", "this won't hurt a bit");
+        ZooKeeper connection = getConnection();
+
+        DataChangedOperation future = new DataChangedOperation(connection, "/existedButWasDeleted", true);
+        connection.getData("/existedButWasDeleted", future, null);
+
+        client.delete("/existedButWasDeleted");
+        assertEquals(null, future.get().getResult());
+    }
+}