You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2011/07/08 00:55:07 UTC

svn commit: r1144087 - in /zookeeper/trunk: ./ src/recipes/election/ src/recipes/election/src/ src/recipes/election/src/java/ src/recipes/election/src/java/org/ src/recipes/election/src/java/org/apache/ src/recipes/election/src/java/org/apache/zookeepe...

Author: phunt
Date: Thu Jul  7 22:55:07 2011
New Revision: 1144087

URL: http://svn.apache.org/viewvc?rev=1144087&view=rev
Log:
ZOOKEEPER-1095. Simple leader election recipe (Eric Sammer via henry and phunt)

Added:
    zookeeper/trunk/src/recipes/election/
    zookeeper/trunk/src/recipes/election/README.txt
    zookeeper/trunk/src/recipes/election/build.xml
    zookeeper/trunk/src/recipes/election/src/
    zookeeper/trunk/src/recipes/election/src/java/
    zookeeper/trunk/src/recipes/election/src/java/org/
    zookeeper/trunk/src/recipes/election/src/java/org/apache/
    zookeeper/trunk/src/recipes/election/src/java/org/apache/zookeeper/
    zookeeper/trunk/src/recipes/election/src/java/org/apache/zookeeper/recipes/
    zookeeper/trunk/src/recipes/election/src/java/org/apache/zookeeper/recipes/leader/
    zookeeper/trunk/src/recipes/election/src/java/org/apache/zookeeper/recipes/leader/LeaderElectionAware.java
    zookeeper/trunk/src/recipes/election/src/java/org/apache/zookeeper/recipes/leader/LeaderElectionSupport.java
    zookeeper/trunk/src/recipes/election/src/java/org/apache/zookeeper/recipes/leader/LeaderOffer.java
    zookeeper/trunk/src/recipes/election/test/
    zookeeper/trunk/src/recipes/election/test/org/
    zookeeper/trunk/src/recipes/election/test/org/apache/
    zookeeper/trunk/src/recipes/election/test/org/apache/zookeeper/
    zookeeper/trunk/src/recipes/election/test/org/apache/zookeeper/recipes/
    zookeeper/trunk/src/recipes/election/test/org/apache/zookeeper/recipes/leader/
    zookeeper/trunk/src/recipes/election/test/org/apache/zookeeper/recipes/leader/LeaderElectionSupportTest.java
Modified:
    zookeeper/trunk/CHANGES.txt

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1144087&r1=1144086&r2=1144087&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Thu Jul  7 22:55:07 2011
@@ -329,6 +329,9 @@ IMPROVEMENTS:
 
   ZOOKEEPER-1073. address a documentation issue in ZOOKEEPER-1030. (phunt via mahadev)
 
+  ZOOKEEPER-1095. Simple leader election recipe
+  (Eric Sammer via henry and phunt)
+
 NEW FEATURES:
   ZOOKEEPER-729. Java client API to recursively delete a subtree.
   (Kay Kay via henry)

Added: zookeeper/trunk/src/recipes/election/README.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/recipes/election/README.txt?rev=1144087&view=auto
==============================================================================
--- zookeeper/trunk/src/recipes/election/README.txt (added)
+++ zookeeper/trunk/src/recipes/election/README.txt Thu Jul  7 22:55:07 2011
@@ -0,0 +1,27 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+1) This election interface recipe implements the leader election recipe
+mentioned in ../../../docs/recipes.[html,pdf].
+
+2) To compile the leader election java recipe you can just run ant jar from
+this directory.
+Please report any bugs on the jira 
+
+http://issues.apache.org/jira/browse/ZOOKEEPER
+
+  

Added: zookeeper/trunk/src/recipes/election/build.xml
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/recipes/election/build.xml?rev=1144087&view=auto
==============================================================================
--- zookeeper/trunk/src/recipes/election/build.xml (added)
+++ zookeeper/trunk/src/recipes/election/build.xml Thu Jul  7 22:55:07 2011
@@ -0,0 +1,128 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<project name="election" default="jar">
+  <import file="../build-recipes.xml"/>
+    <property name="test.main.classes" value="${zk.root}/build/test/classes"/>
+    <property name="test.build.dir" value="${build.test}" />
+    <property name="test.src.dir" value="test"/>
+    <property name="test.log.dir" value="${test.build.dir}/logs" />
+    <property name="test.data.dir" value="${test.build.dir}/data" />
+    <property name="test.data.upgrade.dir" value="${test.data.dir}/upgrade" />
+    <property name="test.tmp.dir" value="${test.build.dir}/tmp" />
+    <property name="test.output" value="no" />
+    <property name="test.timeout" value="900000" />
+    <property name="test.junit.output.format" value="plain" />
+    <property name="test.junit.fork.mode" value="perTest" />
+    <property name="test.junit.printsummary" value="yes" />
+    <property name="test.junit.haltonfailure" value="no" />
+    <property name="test.junit.maxmem" value="512m" />
+
+  <target name="setjarname">
+    <property name="jarname"
+              value="${build.dir}/zookeeper-${version}-recipes-${name}.jar"/>
+  </target>
+
+  <!-- Override jar target to specify main class -->
+  <target name="jar" depends="checkMainCompiled, setjarname, compile">
+    <echo message="recipes: ${name}"/>
+
+    <jar jarfile="${jarname}">
+      <fileset file="${zk.root}/LICENSE.txt" />
+      <fileset dir="${build.classes}"/>
+      <fileset dir="${build.test}"/>
+    </jar>
+  </target>
+
+	<target name="test" depends="compile-test,test-init,test-category,junit.run" />
+
+	<target name="compile-test" depends="compile">
+  		<property name="target.jdk" value="${ant.java.version}" />	
+		<property name="src.test.local" location="${basedir}/test" />
+		<mkdir dir="${build.test}"/>
+		<javac srcdir="${src.test.local}" 
+			destdir="${build.test}" 
+			target="${target.jdk}" 
+			debug="on" >
+			<classpath refid="classpath" />
+                        <classpath>
+                        <pathelement path="${test.main.classes}"/>
+                        </classpath>
+		</javac>
+	</target>
+	
+    <target name="test-init" depends="jar,compile-test">
+        <delete dir="${test.log.dir}" />
+        <delete dir="${test.tmp.dir}" />
+        <delete dir="${test.data.dir}" />
+        <mkdir dir="${test.log.dir}" />
+        <mkdir dir="${test.tmp.dir}" />
+        <mkdir dir="${test.data.dir}" />
+    </target>
+
+	<target name="test-category">
+         <property name="test.category" value=""/>
+    </target>
+
+	<target name="junit.run">
+		<echo message="${test.src.dir}" />
+        <junit showoutput="${test.output}"
+               printsummary="${test.junit.printsummary}"
+               haltonfailure="${test.junit.haltonfailure}"
+               fork="yes"
+               forkmode="${test.junit.fork.mode}"
+               maxmemory="${test.junit.maxmem}"
+               dir="${basedir}" timeout="${test.timeout}"
+               errorProperty="tests.failed" failureProperty="tests.failed">
+          <sysproperty key="build.test.dir" value="${test.tmp.dir}" />
+          <sysproperty key="test.data.dir" value="${test.data.dir}" />
+          <sysproperty key="log4j.configuration"
+                    value="file:${basedir}/conf/log4j.properties" />
+          <classpath refid="classpath"/>
+          <classpath>
+             <pathelement path="${build.test}" />
+             <pathelement path="${test.main.classes}"/>
+          </classpath>
+          <formatter type="${test.junit.output.format}" />
+          <batchtest todir="${test.log.dir}" unless="testcase">
+              <fileset dir="${test.src.dir}"
+                     includes="**/*${test.category}Test.java"/>
+          </batchtest>
+          <batchtest todir="${test.log.dir}" if="testcase">
+              <fileset dir="${test.src.dir}" includes="**/${testcase}.java"/>
+          </batchtest>
+       </junit>
+            <fail if="tests.failed">Tests failed!</fail>
+    </target>
+
+  <target name="package" depends="jar, zookeeperbuildrecipes.package"
+          unless="skip.recipes">
+
+    <copy file="${basedir}/build.xml" todir="${dist.dir}/recipes/${name}"/>
+
+    <mkdir dir="${dist.dir}/recipes/${name}/test"/>
+    <copy todir="${dist.dir}/recipes/${name}/test">
+      <fileset dir="${basedir}/test"/>
+    </copy>
+    <mkdir dir="${dist.dir}/recipes/${name}/src"/>
+    <copy todir="${dist.dir}/recipes/${name}/src">
+      <fileset dir="${basedir}/src"/>
+    </copy>
+  </target>
+
+</project>
+

Added: zookeeper/trunk/src/recipes/election/src/java/org/apache/zookeeper/recipes/leader/LeaderElectionAware.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/recipes/election/src/java/org/apache/zookeeper/recipes/leader/LeaderElectionAware.java?rev=1144087&view=auto
==============================================================================
--- zookeeper/trunk/src/recipes/election/src/java/org/apache/zookeeper/recipes/leader/LeaderElectionAware.java (added)
+++ zookeeper/trunk/src/recipes/election/src/java/org/apache/zookeeper/recipes/leader/LeaderElectionAware.java Thu Jul  7 22:55:07 2011
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.recipes.leader;
+
+import org.apache.zookeeper.recipes.leader.LeaderElectionSupport.EventType;
+
+/**
+ * An interface to be implemented by clients that want to receive election
+ * events.
+ */
+public interface LeaderElectionAware {
+
+  /**
+   * Called during each state transition. Current, low level events are provided
+   * at the beginning and end of each state. For instance, START may be followed
+   * by OFFER_START, OFFER_COMPLETE, DETERMINE_START, DETERMINE_COMPLETE, and so
+   * on.
+   * 
+   * @param eventType
+   */
+  public void onElectionEvent(EventType eventType);
+
+}

Added: zookeeper/trunk/src/recipes/election/src/java/org/apache/zookeeper/recipes/leader/LeaderElectionSupport.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/recipes/election/src/java/org/apache/zookeeper/recipes/leader/LeaderElectionSupport.java?rev=1144087&view=auto
==============================================================================
--- zookeeper/trunk/src/recipes/election/src/java/org/apache/zookeeper/recipes/leader/LeaderElectionSupport.java (added)
+++ zookeeper/trunk/src/recipes/election/src/java/org/apache/zookeeper/recipes/leader/LeaderElectionSupport.java Thu Jul  7 22:55:07 2011
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.recipes.leader;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * A leader election support library implementing the ZooKeeper election recipe.
+ * </p>
+ * <p>
+ * This support library is meant to simplify the construction of an exclusive
+ * leader system on top of Apache ZooKeeper. Any application that can become the
+ * leader (usually a process that provides a service, exclusively) would
+ * configure an instance of this class with their hostname, at least one
+ * listener (an implementation of {@link LeaderElectionAware}), and either an
+ * instance of {@link ZooKeeper} or the proper connection information. Once
+ * configured, invoking {@link #start()} will cause the client to connect to
+ * ZooKeeper and create a leader offer. The library then determines if it has
+ * been elected the leader using the algorithm described below. The client
+ * application can follow all state transitions via the listener callback.
+ * </p>
+ * <p>
+ * Leader election algorithm
+ * </p>
+ * <p>
+ * The library starts in a START state. Through each state transition, a state
+ * start and a state complete event are sent to all listeners. When
+ * {@link #start()} is called, a leader offer is created in ZooKeeper. A leader
+ * offer is an ephemeral sequential node that indicates a process that can act
+ * as a leader for this service. A read of all leader offers is then performed.
+ * The offer with the lowest sequence number is said to be the leader. The
+ * process elected leader will transition to the leader state. All other
+ * processes will transition to a ready state. Internally, the library creates a
+ * ZooKeeper watch on the leader offer with the sequence ID of N - 1 (where N is
+ * the process's sequence ID). If that offer disappears due to a process
+ * failure, the watching process will run through the election determination
+ * process again to see if it should become the leader. Note that sequence ID
+ * may not be contiguous due to failed processes. A process may revoke its offer
+ * to be the leader at any time by calling {@link #stop()}.
+ * </p>
+ * <p>
+ * Guarantees (not) Made and Caveats
+ * </p>
+ * <p>
+ * <ul>
+ * <li>It is possible for a (poorly implemented) process to create a leader
+ * offer, get the lowest sequence ID, but have something terrible occur where it
+ * maintains its connection to ZK (and thus its ephemeral leader offer node) but
+ * doesn't actually provide the service in question. It is up to the user to
+ * ensure any failure to become the leader - and whatever that means in the
+ * context of the user's application - results in a revocation of its leader
+ * offer (i.e. that {@link #stop()} is called).</li>
+ * <li>It is possible for ZK timeouts and retries to play a role in service
+ * liveliness. In other words, if process A has the lowest sequence ID but
+ * requires a few attempts to read the other leader offers' sequence IDs,
+ * election can seem slow. Users should apply timeouts during the determination
+ * process if they need to hit a specific SLA.</li>
+ * <li>The library makes a "best effort" to detect catastrophic failures of the
+ * process. It is possible that an unforeseen event results in (for instance) an
+ * unchecked exception that propagates passed normal error handling code. This
+ * normally doesn't matter as the same exception would almost certain destroy
+ * the entire process and thus the connection to ZK and the leader offer
+ * resulting in another round of leader determination.</li>
+ * </ul>
+ * </p>
+ */
+public class LeaderElectionSupport implements Watcher {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(LeaderElectionSupport.class);
+
+  private ZooKeeper zooKeeper;
+
+  private State state;
+  private Set<LeaderElectionAware> listeners;
+
+  private String rootNodeName;
+  private LeaderOffer leaderOffer;
+  private String hostName;
+
+  public LeaderElectionSupport() {
+    state = State.STOP;
+    listeners = Collections.synchronizedSet(new HashSet<LeaderElectionAware>());
+  }
+
+  /**
+   * <p>
+   * Start the election process. This method will create a leader offer,
+   * determine its status, and either become the leader or become ready. If an
+   * instance of {@link ZooKeeper} has not yet been configured by the user, a
+   * new instance is created using the connectString and sessionTime specified.
+   * </p>
+   * <p>
+   * Any (anticipated) failures result in a failed event being sent to all
+   * listeners.
+   * </p>
+   */
+  public synchronized void start() {
+    state = State.START;
+    dispatchEvent(EventType.START);
+
+    logger.info("Starting leader election support");
+
+    if (zooKeeper == null) {
+      throw new IllegalStateException(
+          "No instance of zookeeper provided. Hint: use setZooKeeper()");
+    }
+
+    if (hostName == null) {
+      throw new IllegalStateException(
+          "No hostname provided. Hint: use setHostName()");
+    }
+
+    try {
+      makeOffer();
+      determineElectionStatus();
+    } catch (KeeperException e) {
+      becomeFailed(e);
+      return;
+    } catch (InterruptedException e) {
+      becomeFailed(e);
+      return;
+    }
+  }
+
+  /**
+   * Stops all election services, revokes any outstanding leader offers, and
+   * disconnects from ZooKeeper.
+   */
+  public synchronized void stop() {
+    state = State.STOP;
+    dispatchEvent(EventType.STOP_START);
+
+    logger.info("Stopping leader election support");
+
+    if (leaderOffer != null) {
+      try {
+        zooKeeper.delete(leaderOffer.getNodePath(), -1);
+        logger.info("Removed leader offer {}", leaderOffer.getNodePath());
+      } catch (InterruptedException e) {
+        becomeFailed(e);
+      } catch (KeeperException e) {
+        becomeFailed(e);
+      }
+    }
+
+    dispatchEvent(EventType.STOP_COMPLETE);
+  }
+
+  private void makeOffer() throws KeeperException, InterruptedException {
+    state = State.OFFER;
+    dispatchEvent(EventType.OFFER_START);
+
+    leaderOffer = new LeaderOffer();
+
+    leaderOffer.setHostName(hostName);
+    leaderOffer.setNodePath(zooKeeper.create(rootNodeName + "/" + "n_",
+        hostName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+        CreateMode.EPHEMERAL_SEQUENTIAL));
+
+    logger.debug("Created leader offer {}", leaderOffer);
+
+    dispatchEvent(EventType.OFFER_COMPLETE);
+  }
+
+  private void determineElectionStatus() throws KeeperException,
+      InterruptedException {
+
+    state = State.DETERMINE;
+    dispatchEvent(EventType.DETERMINE_START);
+
+    String[] components = leaderOffer.getNodePath().split("/");
+
+    leaderOffer.setId(Integer.valueOf(components[components.length - 1]
+        .substring("n_".length())));
+
+    List<LeaderOffer> leaderOffers = toLeaderOffers(zooKeeper.getChildren(
+        rootNodeName, false));
+
+    /*
+     * For each leader offer, find out where we fit in. If we're first, we
+     * become the leader. If we're not elected the leader, attempt to stat the
+     * offer just less than us. If they exist, watch for their failure, but if
+     * they don't, become the leader.
+     */
+    for (int i = 0; i < leaderOffers.size(); i++) {
+      LeaderOffer leaderOffer = leaderOffers.get(i);
+
+      if (leaderOffer.getId().equals(this.leaderOffer.getId())) {
+        logger.debug("There are {} leader offers. I am {} in line.",
+            leaderOffers.size(), i);
+
+        dispatchEvent(EventType.DETERMINE_COMPLETE);
+
+        if (i == 0) {
+          becomeLeader();
+        } else {
+          becomeReady(leaderOffers.get(i - 1));
+        }
+
+        /* Once we've figured out where we are, we're done. */
+        break;
+      }
+    }
+  }
+
+  private void becomeReady(LeaderOffer neighborLeaderOffer)
+      throws KeeperException, InterruptedException {
+    dispatchEvent(EventType.READY_START);
+
+    logger.info("{} not elected leader. Watching node:{}",
+        leaderOffer.getNodePath(), neighborLeaderOffer.getNodePath());
+
+    /*
+     * Make sure to pass an explicit Watcher because we could be sharing this
+     * zooKeeper instance with someone else.
+     */
+    Stat stat = zooKeeper.exists(neighborLeaderOffer.getNodePath(), this);
+
+    if (stat != null) {
+      logger.debug(
+          "We're behind {} in line and they're alive. Keeping an eye on them.",
+          neighborLeaderOffer.getNodePath());
+      state = State.READY;
+      dispatchEvent(EventType.READY_COMPLETE);
+    } else {
+      /*
+       * If the stat fails, the node has gone missing between the call to
+       * getChildren() and exists(). We need to try and become the leader.
+       */
+      logger
+          .info(
+              "We were behind {} but it looks like they died. Back to determination.",
+              neighborLeaderOffer.getNodePath());
+      determineElectionStatus();
+    }
+
+  }
+
+  private void becomeLeader() {
+    state = State.ELECTED;
+    dispatchEvent(EventType.ELECTED_START);
+
+    logger.info("Becoming leader with node:{}", leaderOffer.getNodePath());
+
+    dispatchEvent(EventType.ELECTED_COMPLETE);
+  }
+
+  private void becomeFailed(Exception e) {
+    logger.error("Failed in state {} - Exception:{}", state, e);
+
+    state = State.FAILED;
+    dispatchEvent(EventType.FAILED);
+  }
+
+  /**
+   * Fetch the (user supplied) hostname of the current leader. Note that by the
+   * time this method returns, state could have changed so do not depend on this
+   * to be strongly consistent. This method has to read all leader offers from
+   * ZooKeeper to deterime who the leader is (i.e. there is no caching) so
+   * consider the performance implications of frequent invocation. If there are
+   * no leader offers this method returns null.
+   * 
+   * @return hostname of the current leader
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public String getLeaderHostName() throws KeeperException,
+      InterruptedException {
+
+    List<LeaderOffer> leaderOffers = toLeaderOffers(zooKeeper.getChildren(
+        rootNodeName, false));
+
+    if (leaderOffers.size() > 0) {
+      return leaderOffers.get(0).getHostName();
+    }
+
+    return null;
+  }
+
+  private List<LeaderOffer> toLeaderOffers(List<String> strings)
+      throws KeeperException, InterruptedException {
+
+    List<LeaderOffer> leaderOffers = new ArrayList<LeaderOffer>(strings.size());
+
+    /*
+     * Turn each child of rootNodeName into a leader offer. This is a tuple of
+     * the sequence number and the node name.
+     */
+    for (String offer : strings) {
+      String hostName = new String(zooKeeper.getData(
+          rootNodeName + "/" + offer, false, null));
+
+      leaderOffers.add(new LeaderOffer(Integer.valueOf(offer.substring("n_"
+          .length())), rootNodeName + "/" + offer, hostName));
+    }
+
+    /*
+     * We sort leader offers by sequence number (which may not be zero-based or
+     * contiguous) and keep their paths handy for setting watches.
+     */
+    Collections.sort(leaderOffers, new LeaderOffer.IdComparator());
+
+    return leaderOffers;
+  }
+
+  @Override
+  public void process(WatchedEvent event) {
+    if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
+      if (!event.getPath().equals(leaderOffer.getNodePath())
+          && state != State.STOP) {
+        logger.debug(
+            "Node {} deleted. Need to run through the election process.",
+            event.getPath());
+        try {
+          determineElectionStatus();
+        } catch (KeeperException e) {
+          becomeFailed(e);
+        } catch (InterruptedException e) {
+          becomeFailed(e);
+        }
+      }
+    }
+  }
+
+  private void dispatchEvent(EventType eventType) {
+    logger.debug("Dispatching event:{}", eventType);
+
+    synchronized (listeners) {
+      if (listeners.size() > 0) {
+        for (LeaderElectionAware observer : listeners) {
+          observer.onElectionEvent(eventType);
+        }
+      }
+    }
+  }
+
+  /**
+   * Adds {@code listener} to the list of listeners who will receive events.
+   * 
+   * @param listener
+   */
+  public void addListener(LeaderElectionAware listener) {
+    listeners.add(listener);
+  }
+
+  /**
+   * Remove {@code listener} from the list of listeners who receive events.
+   * 
+   * @param listener
+   */
+  public void removeListener(LeaderElectionAware listener) {
+    listeners.remove(listener);
+  }
+
+  @Override
+  public String toString() {
+    return "{ state:" + state + " leaderOffer:" + leaderOffer + " zooKeeper:"
+        + zooKeeper + " hostName:" + hostName + " listeners:" + listeners
+        + " }";
+  }
+
+  /**
+   * <p>
+   * Gets the ZooKeeper root node to use for this service.
+   * </p>
+   * <p>
+   * For instance, a root node of {@code /mycompany/myservice} would be the
+   * parent of all leader offers for this service. Obviously all processes that
+   * wish to contend for leader status need to use the same root node. Note: We
+   * assume this node already exists.
+   * </p>
+   * 
+   * @return a znode path
+   */
+  public String getRootNodeName() {
+    return rootNodeName;
+  }
+
+  /**
+   * <p>
+   * Sets the ZooKeeper root node to use for this service.
+   * </p>
+   * <p>
+   * For instance, a root node of {@code /mycompany/myservice} would be the
+   * parent of all leader offers for this service. Obviously all processes that
+   * wish to contend for leader status need to use the same root node. Note: We
+   * assume this node already exists.
+   * </p>
+   */
+  public void setRootNodeName(String rootNodeName) {
+    this.rootNodeName = rootNodeName;
+  }
+
+  /**
+   * The {@link ZooKeeper} instance to use for all operations. Provided this
+   * overrides any connectString or sessionTimeout set.
+   */
+  public ZooKeeper getZooKeeper() {
+    return zooKeeper;
+  }
+
+  public void setZooKeeper(ZooKeeper zooKeeper) {
+    this.zooKeeper = zooKeeper;
+  }
+
+  /**
+   * The hostname of this process. Mostly used as a convenience for logging and
+   * to respond to {@link #getLeaderHostName()} requests.
+   */
+  public String getHostName() {
+    return hostName;
+  }
+
+  public void setHostName(String hostName) {
+    this.hostName = hostName;
+  }
+
+  /**
+   * The type of event.
+   */
+  public static enum EventType {
+    START, OFFER_START, OFFER_COMPLETE, DETERMINE_START, DETERMINE_COMPLETE, ELECTED_START, ELECTED_COMPLETE, READY_START, READY_COMPLETE, FAILED, STOP_START, STOP_COMPLETE,
+  }
+
+  /**
+   * The internal state of the election support service.
+   */
+  public static enum State {
+    START, OFFER, DETERMINE, ELECTED, READY, FAILED, STOP
+  }
+}

Added: zookeeper/trunk/src/recipes/election/src/java/org/apache/zookeeper/recipes/leader/LeaderOffer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/recipes/election/src/java/org/apache/zookeeper/recipes/leader/LeaderOffer.java?rev=1144087&view=auto
==============================================================================
--- zookeeper/trunk/src/recipes/election/src/java/org/apache/zookeeper/recipes/leader/LeaderOffer.java (added)
+++ zookeeper/trunk/src/recipes/election/src/java/org/apache/zookeeper/recipes/leader/LeaderOffer.java Thu Jul  7 22:55:07 2011
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.recipes.leader;
+
+import java.util.Comparator;
+
+/**
+ * A leader offer is a numeric id / path pair. The id is the sequential node id
+ * assigned by ZooKeeper where as the path is the absolute path to the ZNode.
+ */
+public class LeaderOffer {
+
+  private Integer id;
+  private String nodePath;
+  private String hostName;
+
+  public LeaderOffer() {
+    // Default constructor
+  }
+
+  public LeaderOffer(Integer id, String nodePath, String hostName) {
+    this.id = id;
+    this.nodePath = nodePath;
+    this.hostName = hostName;
+  }
+
+  @Override
+  public String toString() {
+    return "{ id:" + id + " nodePath:" + nodePath + " hostName:" + hostName
+        + " }";
+  }
+
+  public Integer getId() {
+    return id;
+  }
+
+  public void setId(Integer id) {
+    this.id = id;
+  }
+
+  public String getNodePath() {
+    return nodePath;
+  }
+
+  public void setNodePath(String nodePath) {
+    this.nodePath = nodePath;
+  }
+
+  public String getHostName() {
+    return hostName;
+  }
+
+  public void setHostName(String hostName) {
+    this.hostName = hostName;
+  }
+
+  /**
+   * Compare two instances of {@link LeaderOffer} using only the {code}id{code}
+   * member.
+   */
+  public static class IdComparator implements Comparator<LeaderOffer> {
+
+    @Override
+    public int compare(LeaderOffer o1, LeaderOffer o2) {
+      return o1.getId().compareTo(o2.getId());
+    }
+
+  }
+
+}

Added: zookeeper/trunk/src/recipes/election/test/org/apache/zookeeper/recipes/leader/LeaderElectionSupportTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/recipes/election/test/org/apache/zookeeper/recipes/leader/LeaderElectionSupportTest.java?rev=1144087&view=auto
==============================================================================
--- zookeeper/trunk/src/recipes/election/test/org/apache/zookeeper/recipes/leader/LeaderElectionSupportTest.java (added)
+++ zookeeper/trunk/src/recipes/election/test/org/apache/zookeeper/recipes/leader/LeaderElectionSupportTest.java Thu Jul  7 22:55:07 2011
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.recipes.leader;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import junit.framework.Assert;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LeaderElectionSupportTest extends ClientBase {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(LeaderElectionSupportTest.class);
+  private static final String testRootNode = "/" + System.currentTimeMillis()
+      + "_";
+
+  private ZooKeeper zooKeeper;
+
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+
+    zooKeeper = createClient();
+
+    zooKeeper.create(testRootNode + Thread.currentThread().getId(),
+        new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (zooKeeper != null) {
+      zooKeeper.delete(testRootNode + Thread.currentThread().getId(), -1);
+    }
+
+    super.tearDown();
+  }
+
+  @Test
+  public void testNode() throws IOException, InterruptedException,
+      KeeperException {
+
+    LeaderElectionSupport electionSupport = createLeaderElectionSupport();
+
+    electionSupport.start();
+    Thread.sleep(3000);
+    electionSupport.stop();
+  }
+
+  @Test
+  public void testNodes3() throws IOException, InterruptedException,
+      KeeperException {
+
+    int testIterations = 3;
+    final CountDownLatch latch = new CountDownLatch(testIterations);
+    final AtomicInteger failureCounter = new AtomicInteger();
+
+    for (int i = 0; i < testIterations; i++) {
+      runElectionSupportThread(latch, failureCounter);
+    }
+
+    Assert.assertEquals(0, failureCounter.get());
+
+    if (!latch.await(10, TimeUnit.SECONDS)) {
+      logger
+          .info(
+              "Waited for all threads to start, but timed out. We had {} failures.",
+              failureCounter);
+    }
+  }
+
+  @Test
+  public void testNodes9() throws IOException, InterruptedException,
+      KeeperException {
+
+    int testIterations = 9;
+    final CountDownLatch latch = new CountDownLatch(testIterations);
+    final AtomicInteger failureCounter = new AtomicInteger();
+
+    for (int i = 0; i < testIterations; i++) {
+      runElectionSupportThread(latch, failureCounter);
+    }
+
+    Assert.assertEquals(0, failureCounter.get());
+
+    if (!latch.await(10, TimeUnit.SECONDS)) {
+      logger
+          .info(
+              "Waited for all threads to start, but timed out. We had {} failures.",
+              failureCounter);
+    }
+  }
+
+  @Test
+  public void testNodes20() throws IOException, InterruptedException,
+      KeeperException {
+
+    int testIterations = 20;
+    final CountDownLatch latch = new CountDownLatch(testIterations);
+    final AtomicInteger failureCounter = new AtomicInteger();
+
+    for (int i = 0; i < testIterations; i++) {
+      runElectionSupportThread(latch, failureCounter);
+    }
+
+    Assert.assertEquals(0, failureCounter.get());
+
+    if (!latch.await(10, TimeUnit.SECONDS)) {
+      logger
+          .info(
+              "Waited for all threads to start, but timed out. We had {} failures.",
+              failureCounter);
+    }
+  }
+
+  @Test
+  public void testNodes100() throws IOException, InterruptedException,
+      KeeperException {
+
+    int testIterations = 100;
+    final CountDownLatch latch = new CountDownLatch(testIterations);
+    final AtomicInteger failureCounter = new AtomicInteger();
+
+    for (int i = 0; i < testIterations; i++) {
+      runElectionSupportThread(latch, failureCounter);
+    }
+
+    Assert.assertEquals(0, failureCounter.get());
+
+    if (!latch.await(20, TimeUnit.SECONDS)) {
+      logger
+          .info(
+              "Waited for all threads to start, but timed out. We had {} failures.",
+              failureCounter);
+    }
+  }
+
+  @Test
+  public void testOfferShuffle() throws InterruptedException {
+    int testIterations = 10;
+    final CountDownLatch latch = new CountDownLatch(testIterations);
+    final AtomicInteger failureCounter = new AtomicInteger();
+    List<Thread> threads = new ArrayList<Thread>(testIterations);
+
+    for (int i = 1; i <= testIterations; i++) {
+      threads.add(runElectionSupportThread(latch, failureCounter,
+          Math.min(i * 1200, 10000)));
+    }
+
+    if (!latch.await(60, TimeUnit.SECONDS)) {
+      logger
+          .info(
+              "Waited for all threads to start, but timed out. We had {} failures.",
+              failureCounter);
+    }
+  }
+
+  @Test
+  public void testGetLeaderHostName() throws KeeperException,
+      InterruptedException {
+
+    LeaderElectionSupport electionSupport = createLeaderElectionSupport();
+
+    electionSupport.start();
+
+    // Sketchy: We assume there will be a leader (probably us) in 3 seconds.
+    Thread.sleep(3000);
+
+    String leaderHostName = electionSupport.getLeaderHostName();
+
+    Assert.assertNotNull(leaderHostName);
+    Assert.assertEquals("foohost", leaderHostName);
+
+    electionSupport.stop();
+  }
+
+  private LeaderElectionSupport createLeaderElectionSupport() {
+    LeaderElectionSupport electionSupport = new LeaderElectionSupport();
+
+    electionSupport.setZooKeeper(zooKeeper);
+    electionSupport.setRootNodeName(testRootNode
+        + Thread.currentThread().getId());
+    electionSupport.setHostName("foohost");
+
+    return electionSupport;
+  }
+
+  private Thread runElectionSupportThread(final CountDownLatch latch,
+      final AtomicInteger failureCounter) {
+    return runElectionSupportThread(latch, failureCounter, 3000);
+  }
+
+  private Thread runElectionSupportThread(final CountDownLatch latch,
+      final AtomicInteger failureCounter, final long sleepDuration) {
+
+    final LeaderElectionSupport electionSupport = createLeaderElectionSupport();
+
+    Thread t = new Thread() {
+
+      @Override
+      public void run() {
+        try {
+          electionSupport.start();
+          Thread.sleep(sleepDuration);
+          electionSupport.stop();
+
+          latch.countDown();
+        } catch (Exception e) {
+          logger.warn("Failed to run leader election due to: {}",
+              e.getMessage());
+          failureCounter.incrementAndGet();
+        }
+      }
+    };
+
+    t.start();
+
+    return t;
+  }
+
+}