You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2012/09/03 10:58:31 UTC

svn commit: r1380174 - in /camel/trunk/components/camel-zookeeper/src: main/java/org/apache/camel/component/zookeeper/policy/ test/java/org/apache/camel/component/zookeeper/ test/java/org/apache/camel/component/zookeeper/policy/

Author: ningjiang
Date: Mon Sep  3 08:58:31 2012
New Revision: 1380174

URL: http://svn.apache.org/viewvc?rev=1380174&view=rev
Log:
CAMEL-5546 Applied the patch with thanks to Andrew

Added:
    camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ElectionWatcher.java
    camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperElection.java
    camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperElectionTest.java
Modified:
    camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java
    camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java
    camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/FailoverRoutePolicyTest.java

Added: camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ElectionWatcher.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ElectionWatcher.java?rev=1380174&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ElectionWatcher.java (added)
+++ camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ElectionWatcher.java Mon Sep  3 08:58:31 2012
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.policy;
+
+
+public interface ElectionWatcher {
+    /**
+     * This method is called when there is a potential change to the master.
+     * Implementations should call "isMaster" on their ZookeeperElection
+     * instance to re-validate their status.
+     */
+    void electionResultChanged();
+}

Added: camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperElection.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperElection.java?rev=1380174&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperElection.java (added)
+++ camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperElection.java Mon Sep  3 08:58:31 2012
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.policy;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.zookeeper.SequenceComparator;
+import org.apache.camel.component.zookeeper.ZooKeeperEndpoint;
+import org.apache.camel.component.zookeeper.ZooKeeperMessage;
+import org.apache.camel.impl.JavaUuidGenerator;
+import org.apache.camel.spi.UuidGenerator;
+
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <code>ZooKeeperElection</code> uses the leader election capabilities of a
+ * ZooKeeper cluster to control which nodes are enabled. It is typically used in
+ * fail-over scenarios controlling identical instances of an application across
+ * a cluster of Camel based servers. <p> The election is configured with a 'top
+ * n' number of servers that should be marked as master, for a simple
+ * master/slave scenario this would be 1. Each instance will execute the
+ * election algorithm to obtain its position in the hierarchy of servers, if it
+ * is within the 'top n' servers then the node is enabled and isMaster() will
+ * return 'true'. If not it waits for a change in the leader hierarchy and then
+ * reruns this scenario to see if it is now in the top n. <p> All instances of
+ * the election must also be configured with the same path on the ZooKeeper
+ * cluster where the election will be carried out. It is good practice for this
+ * to indicate the application e.g. <tt>/someapplication/someroute/</tt> note
+ * that these nodes should exist before using the election. <p> See <a
+ * href="http://hadoop.apache.org/zookeeper/docs/current/recipes.html#sc_leaderElection">
+ * for more on how Leader election</a> is archived with ZooKeeper.
+ */
+public class ZooKeeperElection {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperElection.class);
+    private final ProducerTemplate producerTemplate;
+    private final CamelContext camelContext;
+    private final String uri;
+    private final String candidateName;
+    private final Lock lock = new ReentrantLock();
+    private final CountDownLatch electionComplete = new CountDownLatch(1);
+    private AtomicBoolean masterNode = new AtomicBoolean();
+    private volatile boolean isCandidateCreated;
+    private int enabledCount = 1;
+    private UuidGenerator uuidGenerator = new JavaUuidGenerator();
+    private final List<ElectionWatcher> watchers = new ArrayList<ElectionWatcher>();
+
+    public ZooKeeperElection(CamelContext camelContext, String uri, int enabledCount) {
+        this(camelContext.createProducerTemplate(), camelContext, uri, enabledCount);
+    }
+
+    public ZooKeeperElection(ProducerTemplate producerTemplate, CamelContext camelContext, String uri, int enabledCount) {
+        this.camelContext = camelContext;
+        this.producerTemplate = producerTemplate;
+        this.uri = uri;
+        this.enabledCount = enabledCount;
+        this.candidateName = createCandidateName();
+    }
+
+    public boolean isMaster() {
+        if (!isCandidateCreated) {
+            testAndCreateCandidateNode();
+            awaitElectionResults();
+
+        }
+        return masterNode.get();
+    }
+
+    private String createCandidateName() {
+        StringBuilder builder = new StringBuilder();
+        try {
+            /* UUID would be enough, also using hostname for human readability */
+            builder.append(InetAddress.getLocalHost().getCanonicalHostName());
+        } catch (UnknownHostException ex) {
+            LOG.warn("Failed to get the local hostname.", ex);
+            builder.append("unknown-host");
+        }
+        builder.append("-").append(uuidGenerator.generateUuid());
+        return builder.toString();
+    }
+
+    private void testAndCreateCandidateNode() {
+        try {
+            lock.lock();
+            if (!isCandidateCreated) {
+                createCandidateNode(camelContext);
+                isCandidateCreated = true;
+            }
+        } catch (Exception e) {
+            handleException(e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void awaitElectionResults() {
+        while (electionComplete.getCount() > 0) {
+            try {
+                LOG.debug("Awaiting election results...");
+                electionComplete.await();
+            } catch (InterruptedException e1) {
+            }
+        }
+    }
+
+    private ZooKeeperEndpoint createCandidateNode(CamelContext camelContext) {
+        LOG.info("Initializing ZookeeperElection with uri '{}'", uri);
+        ZooKeeperEndpoint zep = camelContext.getEndpoint(uri, ZooKeeperEndpoint.class);
+        zep.getConfiguration().setCreate(true);
+        String fullpath = createFullPathToCandidate(zep);
+        Exchange e = zep.createExchange();
+        e.setPattern(ExchangePattern.InOut);
+        e.getIn().setHeader(ZooKeeperMessage.ZOOKEEPER_NODE, fullpath);
+        e.getIn().setHeader(ZooKeeperMessage.ZOOKEEPER_CREATE_MODE, CreateMode.EPHEMERAL_SEQUENTIAL);
+        producerTemplate.send(zep, e);
+
+        if (e.isFailed()) {
+            LOG.error("Error setting up election node " + fullpath, e.getException());
+        } else {
+            LOG.info("Candidate node '{}' has been created", fullpath);
+            try {
+                if (zep != null) {
+                    camelContext.addRoutes(new ElectoralMonitorRoute(zep));
+                }
+            } catch (Exception ex) {
+                LOG.error("Error configuring ZookeeperElection", ex);
+            }
+        }
+        return zep;
+
+    }
+
+    private String createFullPathToCandidate(ZooKeeperEndpoint zep) {
+        String fullpath = zep.getConfiguration().getPath();
+        if (!fullpath.endsWith("/")) {
+            fullpath += "/";
+        }
+        fullpath += candidateName;
+        return fullpath;
+    }
+
+    private void handleException(Exception e) {
+        throw new RuntimeException(e.getMessage(), e);
+    }
+
+    private void notifyElectionWatchers() {
+        for (ElectionWatcher watcher : watchers) {
+            try {
+                watcher.electionResultChanged();
+            } catch (Exception e) {
+                LOG.warn("Election watcher " + watcher + " of type " + watcher.getClass() + " threw an exception.", e);
+            }
+        }
+    }
+
+    public boolean addElectionWatcher(ElectionWatcher e) {
+        return watchers.add(e);
+    }
+
+    public boolean removeElectionWatcher(ElectionWatcher o) {
+        return watchers.remove(o);
+    }
+
+    private class ElectoralMonitorRoute extends RouteBuilder {
+
+        private SequenceComparator comparator = new SequenceComparator();
+        private ZooKeeperEndpoint zep;
+
+        public ElectoralMonitorRoute(ZooKeeperEndpoint zep) {
+            this.zep = zep;
+            zep.getConfiguration().setListChildren(true);
+            zep.getConfiguration().setSendEmptyMessageOnDelete(true);
+            zep.getConfiguration().setRepeat(true);
+        }
+
+        @Override
+        public void configure() throws Exception {
+
+            /**
+             * TODO: this is cheap cheerful but suboptimal; it suffers from the
+             * 'herd effect' that on any change to the candidates list every
+             * policy instance will ask for the entire candidate list again.
+             * This is fine for small numbers of nodes (for scenarios like
+             * Master-Slave it is perfect) but could get noisy if large numbers
+             * of nodes were involved. <p> Better would be to find the position
+             * of this node in the list and watch the node in the position ahead
+             * node ahead of this and only request the candidate list when its
+             * status changes. This will require enhancing the consumer to allow
+             * custom operation lists.
+             */
+            from(zep).id("election-route-" + candidateName.substring(0, 8)).sort(body(), comparator).process(new Processor() {
+                @Override
+                public void process(Exchange e) throws Exception {
+                    @SuppressWarnings("unchecked")
+                    List<String> candidates = e.getIn().getMandatoryBody(List.class);
+
+                    int location = Math.abs(Collections.binarySearch(candidates, candidateName));
+                    /**
+                     * check if the item at this location starts with this nodes
+                     * candidate name
+                     */
+                    if (isOurCandidateAtLocationInCandidatesList(candidates, location)) {
+
+                        masterNode.set(location <= enabledCount);
+                        LOG.debug(
+                                "This node is number '{}' on the candidate list, election is configured for the top '{}'. this node will be {}",
+                                new Object[]{location, enabledCount, masterNode.get() ? "enabled" : "disabled"}
+                        );
+                    }
+                    electionComplete.countDown();
+
+                    notifyElectionWatchers();
+                }
+
+                private boolean isOurCandidateAtLocationInCandidatesList(List<String> candidates, int location) {
+                    return location <= candidates.size() && candidates.get(location - 1).startsWith(candidateName);
+                }
+            });
+        }
+    }
+}

Modified: camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java?rev=1380174&r1=1380173&r2=1380174&view=diff
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java (original)
+++ camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java Mon Sep  3 08:58:31 2012
@@ -16,32 +16,15 @@
  */
 package org.apache.camel.component.zookeeper.policy;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Collections;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.Processor;
-import org.apache.camel.ProducerTemplate;
 import org.apache.camel.Route;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.zookeeper.SequenceComparator;
-import org.apache.camel.component.zookeeper.ZooKeeperEndpoint;
-import org.apache.camel.component.zookeeper.ZooKeeperMessage;
-import org.apache.camel.impl.JavaUuidGenerator;
 import org.apache.camel.impl.RoutePolicySupport;
-import org.apache.camel.spi.UuidGenerator;
-
-import org.apache.zookeeper.CreateMode;
 
 /**
  * <code>ZooKeeperRoutePolicy</code> uses the leader election capabilities of a
@@ -65,81 +48,57 @@ import org.apache.zookeeper.CreateMode;
  * See <a href="http://hadoop.apache.org/zookeeper/docs/current/recipes.html#sc_leaderElection">
  *     for more on how Leader election</a> is archived with ZooKeeper.
  */
-public class ZooKeeperRoutePolicy extends RoutePolicySupport {
+public class ZooKeeperRoutePolicy extends RoutePolicySupport implements ElectionWatcher {
 
     private final String uri;
     private final int enabledCount;
-    private String candidateName;
     private final Lock lock = new ReentrantLock();
-    private final CountDownLatch electionComplete = new CountDownLatch(1);
     private final Set<Route> suspendedRoutes = new CopyOnWriteArraySet<Route>();
     private final AtomicBoolean shouldProcessExchanges = new AtomicBoolean();
-    private ProducerTemplate template;
     private volatile boolean shouldStopConsumer = true;
-    private final UuidGenerator uuidGenerator = new JavaUuidGenerator();
-    private volatile boolean isCandidateCreated;
+
+    private final Lock electionLock = new ReentrantLock();
+    private ZooKeeperElection election;
 
     public ZooKeeperRoutePolicy(String uri, int enabledCount) {
         this.uri = uri;
         this.enabledCount = enabledCount;
-        createCandidateName();
     }
 
-    private void createCandidateName() {
-        /** UUID would be enough, also using hostname for human readability */
-        StringBuilder b = new StringBuilder(fetchHostname());
-        b.append("-").append(uuidGenerator.generateUuid());
-        this.candidateName = b.toString();
-    }
-
-    private String fetchHostname() {
-        try {
-            return InetAddress.getLocalHost().getCanonicalHostName();
-        } catch (UnknownHostException ex) {
-            log.warn("Unable to determine the local hostname, using a default.", ex);
-            return "default";
-        }
+    public ZooKeeperRoutePolicy(ZooKeeperElection election) {
+        this.election = election;
+        this.uri = null;
+        this.enabledCount = -1;
     }
 
     @Override
     public void onExchangeBegin(Route route, Exchange exchange) {
-        testAndCreateCandidateNode(route);
+        ensureElectionIsCreated(route);
 
-        awaitElectionResults();
-        if (!shouldProcessExchanges.get()) {
+        if (election.isMaster()) {
             if (shouldStopConsumer) {
-                stopConsumer(route);
+                startConsumer(route);
             }
-
-            IllegalStateException e = new IllegalStateException("Zookeeper based route policy prohibits processing exchanges, stopping route and failing the exchange");
-            exchange.setException(e);
-
         } else {
             if (shouldStopConsumer) {
-                startConsumer(route);
+                stopConsumer(route);
             }
-        }
-    }
 
-    private void testAndCreateCandidateNode(Route route) {
-        try {
-            lock.lock();
-            if (!isCandidateCreated) {
-                createCandidateNode(route.getRouteContext().getCamelContext());
-                isCandidateCreated = true;
-            }
-        } catch (Exception e) {
-            handleException(e);
-        } finally {
-            lock.unlock();
+            IllegalStateException e = new IllegalStateException("Zookeeper based route policy prohibits processing exchanges, stopping route and failing the exchange");
+            exchange.setException(e);
         }
     }
 
-    private void awaitElectionResults() {
-        while (electionComplete.getCount() > 0) {
+    private void ensureElectionIsCreated(Route route) {
+        if (election == null) {
+            electionLock.lock();
             try {
-                electionComplete.await();
-            } catch (InterruptedException e1) {
+                if (election == null) { // re-test
+                    election = new ZooKeeperElection(route.getRouteContext().getCamelContext(), uri, enabledCount);
+                    election.addElectionWatcher(this);
+                }
+            } finally {
+                electionLock.unlock();
             }
         }
     }
@@ -173,6 +132,13 @@ public class ZooKeeperRoutePolicy extend
         }
     }
 
+    @Override
+    public void electionResultChanged() {
+        if (election.isMaster()) {
+            startAllStoppedConsumers();
+        }
+    }
+
     private void startAllStoppedConsumers() {
         try {
             lock.lock();
@@ -200,99 +166,4 @@ public class ZooKeeperRoutePolicy extend
     public void setShouldStopConsumer(boolean shouldStopConsumer) {
         this.shouldStopConsumer = shouldStopConsumer;
     }
-
-    private ZooKeeperEndpoint createCandidateNode(CamelContext camelContext) {
-        this.template = camelContext.createProducerTemplate();
-        log.info("Initializing ZookeeperRoutePolicy with uri {}", uri);
-
-        ZooKeeperEndpoint zep = camelContext.getEndpoint(uri, ZooKeeperEndpoint.class);
-        zep.getConfiguration().setCreate(true);
-        String fullpath = createFullPathToCandidate(zep);
-        Exchange e = zep.createExchange();
-        e.setPattern(ExchangePattern.InOut);
-        e.getIn().setHeader(ZooKeeperMessage.ZOOKEEPER_NODE, fullpath);
-        e.getIn().setHeader(ZooKeeperMessage.ZOOKEEPER_CREATE_MODE, CreateMode.EPHEMERAL_SEQUENTIAL);
-        template.send(zep, e);
-
-        if (e.isFailed()) {
-            log.warn("Error setting up election node " + fullpath, e.getException());
-        } else {
-            log.info("Candidate node {} has been created", fullpath);
-            try {
-                camelContext.addRoutes(new ElectoralMonitorRoute(zep));
-            } catch (Exception ex) {
-                log.warn("Error configuring ZookeeperRoutePolicy. This exception is ignored.", ex);
-            }
-        }
-        return zep;
-
-    }
-
-    private String createFullPathToCandidate(ZooKeeperEndpoint zep) {
-        String fullpath = zep.getConfiguration().getPath();
-        if (!fullpath.endsWith("/")) {
-            fullpath += "/";
-        }
-        fullpath += candidateName;
-        return fullpath;
-    }
-
-    private class ElectoralMonitorRoute extends RouteBuilder {
-
-        private SequenceComparator comparator = new SequenceComparator();
-        
-        private ZooKeeperEndpoint zep;
-
-        public ElectoralMonitorRoute(ZooKeeperEndpoint zep) {
-            this.zep = zep;
-            zep.getConfiguration().setListChildren(true);
-            zep.getConfiguration().setRepeat(true);
-        }
-
-        @Override
-        public void configure() throws Exception {
-
-            /**
-             * TODO: this is cheap cheerful but suboptimal; it suffers from the
-             * 'herd effect' that on any change to the candidates list every
-             * policy instance will ask for the entire candidate list again.
-             * This is fine for small numbers of nodes (for scenarios
-             * like Master-Slave it is perfect) but could get noisy if
-             * large numbers of nodes were involved.
-             * <p>
-             * Better would be to find the position of this node in the list and
-             * watch the node in the position ahead node ahead of this and only
-             * request the candidate list when its status changes. This will
-             * require enhancing the consumer to allow custom operation lists.
-             */
-            from(zep).sort(body(), comparator).process(new Processor() {
-
-                public void process(Exchange e) throws Exception {
-                    @SuppressWarnings("unchecked")
-                    List<String> candidates = e.getIn().getMandatoryBody(List.class);
-
-                    int location = Math.abs(Collections.binarySearch(candidates, candidateName));
-                    /**
-                     * check if the item at this location starts with this nodes
-                     * candidate name
-                     */
-                    if (isOurCandidateAtLocationInCandidatesList(candidates, location)) {
-
-                        shouldProcessExchanges.set(location <= enabledCount);
-                        if (log.isDebugEnabled()) {
-                            log.debug("This node is number {} on the candidate list, route is configured for the top {}. Exchange processing will be {}",
-                                    new Object[]{location, enabledCount, shouldProcessExchanges.get() ? "enabled" : "disabled"});
-                        }
-                        startAllStoppedConsumers();
-                    }
-                    electionComplete.countDown();
-                }
-
-                private boolean isOurCandidateAtLocationInCandidatesList(List<String> candidates, int location) {
-                    return location <= candidates.size() && candidates.get(location - 1).startsWith(candidateName);
-                }
-            });
-        }
-    }
-
 }

Modified: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java?rev=1380174&r1=1380173&r2=1380174&view=diff
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java (original)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java Mon Sep  3 08:58:31 2012
@@ -206,6 +206,15 @@ public class ZooKeeperTestSupport extend
             }
         }
 
+        public void deleteAll(String node) throws Exception {
+            delay(200);
+            log.debug("Deleting {} and it's immediate children", node);
+            for (String child : zk.getChildren(node, false)) {
+                delete(node + "/" + child);
+            }
+            delete(node);
+        }
+
         public void delete(String node) throws Exception {
             delay(200);
             log.debug("Deleting node " + node);

Modified: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/FailoverRoutePolicyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/FailoverRoutePolicyTest.java?rev=1380174&r1=1380173&r2=1380174&view=diff
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/FailoverRoutePolicyTest.java (original)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/FailoverRoutePolicyTest.java Mon Sep  3 08:58:31 2012
@@ -27,8 +27,11 @@ import org.apache.camel.component.zookee
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.commons.logging.LogFactory;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class FailoverRoutePolicyTest extends ZooKeeperTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(FailoverRoutePolicyTest.class);
 
     protected CamelContext createCamelContext() throws Exception {
         disableJMX();
@@ -76,6 +79,7 @@ public class FailoverRoutePolicyTest ext
                 template.sendBody("vm:" + routename, ExchangePattern.InOut, message);
             } catch (Exception e) {
                 if (expected > 0) {
+                    LOG.error(e.getMessage(), e);
                     fail("Expected messages...");
                 }
             }

Added: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperElectionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperElectionTest.java?rev=1380174&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperElectionTest.java (added)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperElectionTest.java Mon Sep  3 08:58:31 2012
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.policy;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZookeeperElectionTest extends ZooKeeperTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperElectionTest.class);
+
+    private static final String NODE_BASE_KEY = "/someapp";
+    private static final String NODE_PARTICULAR_KEY = "/someapp/somepolicy";
+    private static final String ELECTION_URI = "zookeeper:localhost:39913/someapp/somepolicy";
+
+    @Before
+    public void before() throws Exception {
+        // set up the parent used to control the election
+        client.createPersistent(NODE_BASE_KEY, "App node to contain policy election nodes...");
+        client.createPersistent(NODE_PARTICULAR_KEY, "Policy node used by route policy to control routes...");
+    }
+
+    @After
+    public void after() throws Exception {
+        client.deleteAll(NODE_PARTICULAR_KEY);
+        client.delete(NODE_BASE_KEY);
+    }
+
+    @Test
+    public void masterCanBeElected() throws Exception {
+        ZooKeeperElection candidate = new ZooKeeperElection(template, context, ELECTION_URI, 1);
+        assertTrue("The only election candidate was not elected as master.", candidate.isMaster());
+    }
+
+    @Test
+    public void masterAndSlave() throws Exception {
+        final DefaultCamelContext candidateOneContext = createNewContext();
+        final DefaultCamelContext candidateTwoContext = createNewContext();
+
+        ZooKeeperElection electionCandidate1 = createElectionCandidate(candidateOneContext, 1);
+        assertTrue("The first candidate was not elected.", electionCandidate1.isMaster());
+        ZooKeeperElection electionCandidate2 = createElectionCandidate(candidateTwoContext, 1);
+        assertFalse("The second candidate should not have been elected.", electionCandidate2.isMaster());
+    }
+
+    @Test
+    public void testMasterGoesAway() throws Exception {
+        final DefaultCamelContext candidateOneContext = createNewContext();
+        final DefaultCamelContext candidateTwoContext = createNewContext();
+
+        ZooKeeperElection electionCandidate1 = createElectionCandidate(candidateOneContext, 1);
+        assertTrue("The first candidate was not elected.", electionCandidate1.isMaster());
+        ZooKeeperElection electionCandidate2 = createElectionCandidate(candidateTwoContext, 1);
+        assertFalse("The second candidate should not have been elected.", electionCandidate2.isMaster());
+
+        LOG.debug("About to shutdown the first candidate.");
+
+        candidateOneContext.stop(); // the first candidate was killed.
+
+        delay(3000); // more than the timeout on the zeekeeper server.
+        assertTrue("The second candidate should have been elected.", electionCandidate2.isMaster());
+    }
+
+    @Test
+    public void testDualMaster() throws Exception {
+        final DefaultCamelContext candidateOneContext = createNewContext();
+        final DefaultCamelContext candidateTwoContext = createNewContext();
+
+        ZooKeeperElection electionCandidate1 = createElectionCandidate(candidateOneContext, 2);
+        assertTrue("The first candidate was not elected.", electionCandidate1.isMaster());
+        ZooKeeperElection electionCandidate2 = createElectionCandidate(candidateTwoContext, 2);
+        assertTrue("The second candidate should also be a master.", electionCandidate2.isMaster());
+    }
+
+    @Test
+    public void testWatchersAreNotified() throws Exception {
+        final DefaultCamelContext candidateOneContext = createNewContext();
+        final DefaultCamelContext candidateTwoContext = createNewContext();
+
+        final AtomicBoolean notified = new AtomicBoolean(false);
+        ElectionWatcher watcher = new ElectionWatcher() {
+            @Override public void electionResultChanged() { notified.set(true); }
+        };
+
+        ZooKeeperElection electionCandidate1 = createElectionCandidate(candidateOneContext, 2);
+        assertTrue("The first candidate was not elected.", electionCandidate1.isMaster());
+        electionCandidate1.addElectionWatcher(watcher);
+        ZooKeeperElection electionCandidate2 = createElectionCandidate(candidateTwoContext, 2);
+        electionCandidate2.isMaster();
+        assertTrue("The first candidate should have had it's watcher notified", notified.get());
+    }
+
+    private DefaultCamelContext createNewContext() throws Exception {
+        DefaultCamelContext controlledContext = new DefaultCamelContext();
+        controlledContext.start();
+        return controlledContext;
+    }
+
+    private ZooKeeperElection createElectionCandidate(final DefaultCamelContext context, int masterCount) {
+        return new ZooKeeperElection(context.createProducerTemplate(), context, ELECTION_URI, masterCount);
+    }
+}