You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ga...@apache.org on 2014/07/07 21:00:47 UTC

git commit: (TWILL-85) Make ZK-coordinated leader election available to twill applications

Repository: incubator-twill
Updated Branches:
  refs/heads/master 36b9df175 -> 94b53664a


(TWILL-85) Make ZK-coordinated leader election available to twill applications


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/94b53664
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/94b53664
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/94b53664

Branch: refs/heads/master
Commit: 94b53664a2366194407cf5d6c7e45607deb078e1
Parents: 36b9df1
Author: Gary Helmling <ga...@apache.org>
Authored: Mon Jul 7 12:00:20 2014 -0700
Committer: Gary Helmling <ga...@apache.org>
Committed: Mon Jul 7 12:00:20 2014 -0700

----------------------------------------------------------------------
 .../org/apache/twill/api/ElectionHandler.java   |  21 +
 .../java/org/apache/twill/api/TwillContext.java |   9 +
 .../twill/internal/BasicTwillContext.java       |  16 +
 .../apache/twill/internal/ElectionRegistry.java |  61 +++
 .../internal/container/TwillContainerMain.java  |  17 +-
 .../container/TwillContainerService.java        |   1 +
 twill-zookeeper/pom.xml                         |   5 +
 .../internal/zookeeper/LeaderElection.java      | 388 +++++++++++++++++++
 .../internal/zookeeper/LeaderElectionTest.java  | 274 +++++++++++++
 9 files changed, 788 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/94b53664/twill-api/src/main/java/org/apache/twill/api/ElectionHandler.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/ElectionHandler.java b/twill-api/src/main/java/org/apache/twill/api/ElectionHandler.java
new file mode 100644
index 0000000..4b8b1db
--- /dev/null
+++ b/twill-api/src/main/java/org/apache/twill/api/ElectionHandler.java
@@ -0,0 +1,21 @@
+package org.apache.twill.api;
+
+/**
+ * Handles events of election/un-election of leader.
+ */
+public interface ElectionHandler {
+
+  /**
+   * This method will get invoked when a participant becomes a leader in a
+   * leader election process. It is guaranteed that this method won't get called
+   * consecutively (i.e. called twice or more in a row).
+   */
+  void leader();
+
+  /**
+   * This method will get invoked when a participant is a follower in a
+   * leader election process. This method might get called multiple times without
+   * the {@link #leader()} method being called.
+   */
+  void follower();
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/94b53664/twill-api/src/main/java/org/apache/twill/api/TwillContext.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillContext.java b/twill-api/src/main/java/org/apache/twill/api/TwillContext.java
index f7a7ac1..10f4499 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillContext.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillContext.java
@@ -17,6 +17,7 @@
  */
 package org.apache.twill.api;
 
+import org.apache.twill.common.Cancellable;
 import org.apache.twill.discovery.DiscoveryServiceClient;
 import org.apache.twill.discovery.ServiceDiscovered;
 
@@ -85,4 +86,12 @@ public interface TwillContext extends ServiceAnnouncer, DiscoveryServiceClient {
    */
   @Override
   ServiceDiscovered discover(String name);
+
+  /**
+   * Register to participate in a leader election by instances within the same {@link TwillApplication}.
+   *
+   * @param name Unique name for the election
+   * @return A {@link org.apache.twill.common.Cancellable} object representing this candidate's participation.
+   */
+  Cancellable electLeader(String name, ElectionHandler participantHandler);
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/94b53664/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java b/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
index 4a503e0..6eb3072 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
@@ -17,6 +17,7 @@
  */
 package org.apache.twill.internal;
 
+import org.apache.twill.api.ElectionHandler;
 import org.apache.twill.api.RunId;
 import org.apache.twill.api.TwillContext;
 import org.apache.twill.api.TwillRunnableSpecification;
@@ -46,10 +47,12 @@ public final class BasicTwillContext implements TwillContext {
   private final int allowedMemoryMB;
   private final int virtualCores;
   private volatile int instanceCount;
+  private final ElectionRegistry elections;
 
   public BasicTwillContext(RunId runId, RunId appRunId, InetAddress host, String[] args, String[] appArgs,
                            TwillRunnableSpecification spec, int instanceId,
                            DiscoveryService discoveryService, DiscoveryServiceClient discoveryServiceClient,
+                           ElectionRegistry electionRegistry,
                            int instanceCount, int allowedMemoryMB, int virtualCores) {
     this.runId = runId;
     this.appRunId = appRunId;
@@ -60,6 +63,7 @@ public final class BasicTwillContext implements TwillContext {
     this.instanceId = instanceId;
     this.discoveryService = discoveryService;
     this.discoveryServiceClient = discoveryServiceClient;
+    this.elections = electionRegistry;
     this.instanceCount = instanceCount;
     this.allowedMemoryMB = allowedMemoryMB;
     this.virtualCores = virtualCores;
@@ -138,4 +142,16 @@ public final class BasicTwillContext implements TwillContext {
   public ServiceDiscovered discover(String name) {
     return discoveryServiceClient.discover(name);
   }
+
+  @Override
+  public Cancellable electLeader(String name, ElectionHandler participantHandler) {
+    return elections.register(name, participantHandler);
+  }
+
+  /**
+   * Stops and frees any currently allocated resources.
+   */
+  public void stop() {
+    elections.shutdown();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/94b53664/twill-core/src/main/java/org/apache/twill/internal/ElectionRegistry.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ElectionRegistry.java b/twill-core/src/main/java/org/apache/twill/internal/ElectionRegistry.java
new file mode 100644
index 0000000..e2fe43e
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/ElectionRegistry.java
@@ -0,0 +1,61 @@
+package org.apache.twill.internal;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import org.apache.twill.api.ElectionHandler;
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.internal.zookeeper.LeaderElection;
+import org.apache.twill.zookeeper.ZKClient;
+
+/**
+ * Tracks currently active leader elections within the Twill container.
+ */
+public class ElectionRegistry {
+  private final ZKClient zkClient;
+  private final Multimap<String, LeaderElection> registry;
+
+  public ElectionRegistry(ZKClient zkClient) {
+    this.zkClient = zkClient;
+    Multimap<String, LeaderElection> multimap = HashMultimap.create();
+    this.registry = Multimaps.synchronizedMultimap(multimap);
+  }
+
+  /**
+   * Creates a new {@link LeaderElection} for the given arguments, starts the service, and adds it to the registry.
+   * @param name Name for the election.
+   * @param handler Callback to handle leader and follower transitions.
+   * @return An object to cancel the election participation.
+   */
+  public Cancellable register(String name, ElectionHandler handler) {
+    LeaderElection election = new LeaderElection(zkClient, name, handler);
+    election.start();
+    registry.put(name, election);
+    return new CancellableElection(name, election);
+  }
+
+  /**
+   * Stops all active {@link LeaderElection} processes.
+   */
+  public void shutdown() {
+    for (LeaderElection election : registry.values()) {
+      election.stop();
+    }
+  }
+
+  private class CancellableElection implements Cancellable {
+    private final String name;
+    private final LeaderElection election;
+
+    public CancellableElection(String name, LeaderElection election) {
+      this.name = name;
+      this.election = election;
+    }
+
+    @Override
+    public void cancel() {
+      election.stop();
+      registry.remove(name, election);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/94b53664/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
index c3aece6..1e2241e 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
@@ -36,6 +36,7 @@ import org.apache.twill.internal.Arguments;
 import org.apache.twill.internal.BasicTwillContext;
 import org.apache.twill.internal.Constants;
 import org.apache.twill.internal.ContainerInfo;
+import org.apache.twill.internal.ElectionRegistry;
 import org.apache.twill.internal.EnvContainerInfo;
 import org.apache.twill.internal.EnvKeys;
 import org.apache.twill.internal.RunIds;
@@ -87,6 +88,10 @@ public final class TwillContainerMain extends ServiceMain {
 
     ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClientService);
 
+    ZKClient electionZKClient = getAppRunZKClient(zkClientService, appRunId);
+    // leader elections are namespaced by the application
+    ElectionRegistry electionRegistry = new ElectionRegistry(electionZKClient);
+
     TwillSpecification twillSpec = loadTwillSpec(twillSpecFile);
     renameLocalFiles(twillSpec.getRunnables().get(runnableName));
     
@@ -97,13 +102,13 @@ public final class TwillContainerMain extends ServiceMain {
       runId, appRunId, containerInfo.getHost(),
       arguments.getRunnableArguments().get(runnableName).toArray(new String[0]),
       arguments.getArguments().toArray(new String[0]),
-      runnableSpec, instanceId, discoveryService, discoveryService, instanceCount,
-      containerInfo.getMemoryMB(), containerInfo.getVirtualCores()
+      runnableSpec, instanceId, discoveryService, discoveryService, electionRegistry,
+      instanceCount, containerInfo.getMemoryMB(), containerInfo.getVirtualCores()
     );
 
+    ZKClient containerZKClient = getContainerZKClient(zkClientService, appRunId, runnableName);
     Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
-    Service service = new TwillContainerService(context, containerInfo,
-                                                getContainerZKClient(zkClientService, appRunId, runnableName),
+    Service service = new TwillContainerService(context, containerInfo, containerZKClient,
                                                 runId, runnableSpec, getClassLoader(),
                                                 createAppLocation(conf));
     new TwillContainerMain().doMain(zkClientService, service);
@@ -141,6 +146,10 @@ public final class TwillContainerMain extends ServiceMain {
     }
   }
 
+  private static ZKClient getAppRunZKClient(ZKClient zkClient, RunId appRunId) {
+    return ZKClients.namespace(zkClient, String.format("/%s", appRunId));
+  }
+
   private static ZKClient getContainerZKClient(ZKClient zkClient, RunId appRunId, String runnableName) {
     return ZKClients.namespace(zkClient, String.format("/%s/runnables/%s", appRunId, runnableName));
   }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/94b53664/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
index b212a29..a476bc1 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
@@ -158,6 +158,7 @@ public final class TwillContainerService extends AbstractTwillService {
     protected void shutDown() throws Exception {
       commandExecutor.shutdownNow();
       runnable.destroy();
+      context.stop();
       Loggings.forceFlush();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/94b53664/twill-zookeeper/pom.xml
----------------------------------------------------------------------
diff --git a/twill-zookeeper/pom.xml b/twill-zookeeper/pom.xml
index 1b37cc5..5aba362 100644
--- a/twill-zookeeper/pom.xml
+++ b/twill-zookeeper/pom.xml
@@ -32,6 +32,11 @@
     <dependencies>
         <dependency>
             <groupId>${project.groupId}</groupId>
+            <artifactId>twill-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
             <artifactId>twill-common</artifactId>
             <version>${project.version}</version>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/94b53664/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java
new file mode 100644
index 0000000..50dc6a8
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java
@@ -0,0 +1,388 @@
+package org.apache.twill.internal.zookeeper;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.twill.api.ElectionHandler;
+import org.apache.twill.common.Threads;
+import org.apache.twill.zookeeper.NodeChildren;
+import org.apache.twill.zookeeper.OperationFuture;
+import org.apache.twill.zookeeper.ZKClient;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Performs leader election as specified in
+ * <a href="http://zookeeper.apache.org/doc/trunk/recipes.html#sc_leaderElection">Zookeeper recipes</a>.
+ *
+ * It will enter the leader election process when {@link #start()} is called and leave the process when
+ * {@link #stop()} is invoked.
+ */
+public final class LeaderElection extends AbstractService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LeaderElection.class);
+
+  private enum State {
+    IN_PROGRESS,
+    LEADER,
+    FOLLOWER,
+    CANCELLED
+  }
+
+  private final String guid;
+
+  private final ZKClient zkClient;
+  private final String zkFolderPath;
+  private final ElectionHandler handler;
+
+  private ExecutorService executor;
+  private String zkNodePath;
+  private State state;
+
+  public LeaderElection(ZKClient zkClient, String prefix, ElectionHandler handler) {
+    this.guid = UUID.randomUUID().toString();
+    this.zkClient = zkClient;
+    this.zkFolderPath = prefix.startsWith("/") ? prefix : "/" + prefix;
+    this.handler = handler;
+  }
+
+  @Override
+  protected void doStart() {
+    LOG.info("Start leader election on {}{} with guid {}", zkClient.getConnectString(), zkFolderPath, guid);
+
+    executor = Executors.newSingleThreadExecutor(
+      Threads.createDaemonThreadFactory("leader-election" + zkFolderPath.replace('/', '-')));
+
+    executor.execute(new Runnable() {
+      @Override
+      public void run() {
+        register();
+        LeaderElection.this.zkClient.addConnectionWatcher(wrapWatcher(new ConnectionWatcher()));
+      }
+    });
+    notifyStarted();
+  }
+
+  @Override
+  protected void doStop() {
+    final SettableFuture<String> completion = SettableFuture.create();
+    Futures.addCallback(completion, new FutureCallback<String>() {
+      @Override
+      public void onSuccess(String result) {
+        notifyStopped();
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        notifyFailed(t);
+      }
+    }, Threads.SAME_THREAD_EXECUTOR);
+
+    executor.execute(new Runnable() {
+      @Override
+      public void run() {
+        if (state != State.CANCELLED) {
+          // becomeFollower has to be called before deleting node to make sure no two active leader.
+          if (state == State.LEADER) {
+            becomeFollower();
+          }
+          state = State.CANCELLED;
+          doDeleteNode(completion);
+        }
+      }
+    });
+  }
+
+  private byte[] getNodeData() {
+    String hostname;
+    try {
+      hostname = InetAddress.getLocalHost().getCanonicalHostName();
+    } catch (Exception e) {
+      LOG.warn("Failed to get local hostname.", e);
+      hostname = "unknown";
+    }
+    return hostname.getBytes(Charsets.UTF_8);
+  }
+
+  private void register() {
+    state = State.IN_PROGRESS;
+    zkNodePath = null;
+
+    // Register for election
+    final String path = String.format("%s/%s-", zkFolderPath, guid);
+    LOG.debug("Registering for election {} with path {}", zkFolderPath, path);
+
+    OperationFuture<String> createFuture = zkClient.create(path, getNodeData(), CreateMode.EPHEMERAL_SEQUENTIAL, true);
+    Futures.addCallback(createFuture, new FutureCallback<String>() {
+
+      @Override
+      public void onSuccess(String result) {
+        LOG.debug("Created zk node {}", result);
+        zkNodePath = result;
+        if (state == State.CANCELLED) {
+          // If cancel was called after create(), but before callback trigger, delete the node created.
+          deleteNode();
+        } else {
+          runElection();
+        }
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        LOG.error("Got exception during node creation for folder {}", path, t);
+        // The node may created successfully on server and then server crash,
+        // which client might receive failure instead.
+        // Not checking for cancel here, as we don't know the zkNodePath.
+        // Needs to rely on runElection to handle cancel.
+        runElection();
+      }
+    }, executor);
+  }
+
+  private void runElection() {
+    LOG.debug("Running election for {}", zkNodePath);
+
+    OperationFuture<NodeChildren> childrenFuture = zkClient.getChildren(zkFolderPath);
+    Futures.addCallback(childrenFuture, new FutureCallback<NodeChildren>() {
+      @Override
+      public void onSuccess(NodeChildren result) {
+        Optional<String> nodeToWatch = findNodeToWatch(result.getChildren());
+
+        if (state == State.CANCELLED) {
+          deleteNode();
+          return;
+        }
+
+        if (nodeToWatch == null) {
+          // zkNodePath unknown, need to run register.
+          register();
+          return;
+        }
+
+        if (nodeToWatch.isPresent()) {
+          // Watch for deletion of largest node smaller than current node
+          watchNode(zkFolderPath + "/" + nodeToWatch.get(), new LowerNodeWatcher());
+        } else {
+          // This is leader
+          becomeLeader();
+        }
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        LOG.warn("Got exception during children fetch for {}. Retry.", zkFolderPath, t);
+        // If cancel has been called before this callback and the zkNodePath is known, we can simply
+        // delete the node. Otherwise, runElection() is needed to determine the zkNodePath if it is cancelled.
+        if (state == State.CANCELLED && zkNodePath != null) {
+          deleteNode();
+        } else {
+          runElection();
+        }
+      }
+    }, executor);
+  }
+
+  private void becomeLeader() {
+    state = State.LEADER;
+    LOG.debug("Become leader for {}.", zkNodePath);
+    try {
+      handler.leader();
+    } catch (Throwable t) {
+      LOG.warn("Exception thrown when calling leader() method. Withdraw from the leader election process.", t);
+      stop();
+    }
+  }
+
+  private void becomeFollower() {
+    state = State.FOLLOWER;
+    LOG.debug("Become follower for {}", zkNodePath);
+    try {
+      handler.follower();
+    } catch (Throwable t) {
+      LOG.warn("Exception thrown when calling follower() method. Withdraw from the leader election process.", t);
+      stop();
+    }
+  }
+
+  /**
+   * Starts watching for the max. of smaller node.
+   */
+  private void watchNode(final String nodePath, Watcher watcher) {
+    OperationFuture<Stat> watchFuture = zkClient.exists(nodePath, watcher);
+    Futures.addCallback(watchFuture, new FutureCallback<Stat>() {
+      @Override
+      public void onSuccess(Stat result) {
+        if (state != State.CANCELLED) {
+          becomeFollower();
+        }
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        LOG.warn("Exception while setting watch on node {}. Retry.", nodePath, t);
+        runElection();
+      }
+    }, executor);
+  }
+
+  private ListenableFuture<String> deleteNode() {
+    SettableFuture<String> completion = SettableFuture.create();
+    doDeleteNode(completion);
+    return completion;
+  }
+
+  private void doDeleteNode(final SettableFuture<String> completion) {
+    if (zkNodePath == null) {
+      completion.set(null);
+      return;
+    }
+    try {
+      Futures.addCallback(zkClient.delete(zkNodePath), new FutureCallback<String>() {
+        @Override
+        public void onSuccess(String result) {
+          LOG.debug("Node deleted: {}", result);
+          completion.set(result);
+        }
+
+        @Override
+        public void onFailure(Throwable t) {
+          LOG.warn("Fail to delete node: {}", zkNodePath);
+          if (!(t instanceof KeeperException.NoNodeException)) {
+            LOG.debug("Retry delete node: {}", zkNodePath);
+            doDeleteNode(completion);
+          } else {
+            completion.setException(t);
+          }
+        }
+      }, executor);
+    } catch (Throwable t) {
+      // If any exception happens when calling delete, treats it as completed with failure.
+      completion.setException(t);
+    }
+  }
+
+  private Watcher wrapWatcher(final Watcher watcher) {
+    return new Watcher() {
+      @Override
+      public void process(final WatchedEvent event) {
+        executor.execute(new Runnable() {
+          @Override
+          public void run() {
+            watcher.process(event);
+          }
+        });
+      }
+    };
+  }
+
+  /**
+   * Find the node to watch for and return it in the {@link com.google.common.base.Optional} value. If this client is
+   * the leader, return an {@link com.google.common.base.Optional#absent()}. This method also tries to set the
+   * zkNodePath if it is not set and return {@code null} if the zkNodePath cannot be determined.
+   */
+  private Optional<String> findNodeToWatch(List<String> nodes) {
+    // If this node path is not know, find it first.
+    if (zkNodePath == null) {
+      for (String node : nodes) {
+        if (node.startsWith(guid)) {
+          zkNodePath = zkFolderPath + "/" + node;
+          break;
+        }
+      }
+    }
+
+    if (zkNodePath == null) {
+      // Cannot find the node path, return null
+      return null;
+    }
+
+    // Find the maximum node that the smaller than node created by this client.
+    int currentId = Integer.parseInt(zkNodePath.substring(zkNodePath.indexOf(guid) + guid.length() + 1));
+    String nodeToWatch = null;
+    int maxOfMins = Integer.MIN_VALUE;
+    for (String node : nodes) {
+      int nodeId = Integer.parseInt(node.substring(guid.length() + 1));
+      if (nodeId < currentId && nodeId > maxOfMins) {
+        maxOfMins = nodeId;
+        nodeToWatch = node;
+      }
+    }
+
+    return nodeToWatch == null ? Optional.<String>absent() : Optional.of(nodeToWatch);
+  }
+
+  /**
+   * Watches lower node.
+   */
+  private class LowerNodeWatcher implements Watcher {
+    @Override
+    public void process(WatchedEvent event) {
+      if (state != State.CANCELLED && event.getType() == Event.EventType.NodeDeleted) {
+        LOG.debug("Lower node deleted {} for election {}.", event, zkNodePath);
+        runElection();
+      }
+    }
+  }
+
+  /**
+   * Watches zookeeper connection.
+   */
+  private class ConnectionWatcher implements Watcher {
+    private boolean expired;
+    private boolean disconnected;
+
+    @Override
+    public void process(WatchedEvent event) {
+      switch (event.getState()) {
+        case Disconnected:
+          disconnected = true;
+          LOG.info("Disconnected from ZK: {} for {}", zkClient.getConnectString(), zkFolderPath);
+          if (state == State.LEADER) {
+            // becomeFollower has to be called in disconnect so that no two active leader is possible.
+            LOG.info("Stepping down as leader due to disconnect: {} for {}", zkClient.getConnectString(), zkFolderPath);
+            becomeFollower();
+          }
+          break;
+        case SyncConnected:
+          boolean runElection = disconnected && !expired && state != State.IN_PROGRESS;
+          boolean runRegister = disconnected && expired && state != State.IN_PROGRESS;
+          disconnected = false;
+          expired = false;
+          if (runElection) {
+            // If the state is cancelled (meaning a cancel happens between disconnect and connect),
+            // still runElection() so that it has chance to delete the node (as it's not expired, the node stays
+            // after reconnection).
+            if (state != State.CANCELLED) {
+              state = State.IN_PROGRESS;
+            }
+            LOG.info("Connected to ZK, running election: {} for {}", zkClient.getConnectString(), zkFolderPath);
+            runElection();
+          } else if (runRegister && state != State.CANCELLED) {
+            LOG.info("Connected to ZK, registering: {} for {}", zkClient.getConnectString(), zkFolderPath);
+            register();
+          }
+
+          break;
+        case Expired:
+          LOG.info("ZK session expired: {} for {}", zkClient.getConnectString(), zkFolderPath);
+          expired = true;
+          break;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/94b53664/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java b/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java
new file mode 100644
index 0000000..468ba59
--- /dev/null
+++ b/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java
@@ -0,0 +1,274 @@
+package org.apache.twill.internal.zookeeper;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.twill.api.ElectionHandler;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Test for {@link LeaderElection}.
+ */
+public class LeaderElectionTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LeaderElectionTest.class);
+
+  @ClassRule
+  public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  private static InMemoryZKServer zkServer;
+
+  @Test(timeout = 5000)
+  public void testElection() throws ExecutionException, InterruptedException, BrokenBarrierException {
+    ExecutorService executor = Executors.newCachedThreadPool();
+
+    int participantCount = 5;
+    final CyclicBarrier barrier = new CyclicBarrier(participantCount + 1);
+    final Semaphore leaderSem = new Semaphore(0);
+    final Semaphore followerSem = new Semaphore(0);
+    final CountDownLatch[] stopLatch = new CountDownLatch[participantCount];
+    final List<ZKClientService> zkClients = Lists.newArrayList();
+
+    try {
+      final AtomicInteger currentLeader = new AtomicInteger(-1);
+      for (int i = 0; i < participantCount; i++) {
+        final ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+        zkClient.startAndWait();
+        stopLatch[i] = new CountDownLatch(1);
+        zkClients.add(zkClient);
+
+        final int idx = i;
+        executor.submit(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              barrier.await();
+
+              LeaderElection leaderElection = new LeaderElection(zkClient, "/test", new ElectionHandler() {
+                @Override
+                public void leader() {
+                  currentLeader.set(idx);
+                  leaderSem.release();
+                }
+
+                @Override
+                public void follower() {
+                  followerSem.release();
+                }
+              });
+              leaderElection.start();
+
+              stopLatch[idx].await(10, TimeUnit.SECONDS);
+              leaderElection.stopAndWait();
+
+            } catch (Exception e) {
+              LOG.error(e.getMessage(), e);
+            }
+          }
+        });
+      }
+
+      barrier.await();
+      leaderSem.tryAcquire(10, TimeUnit.SECONDS);
+      followerSem.tryAcquire(participantCount - 1, 10, TimeUnit.SECONDS);
+
+      // Continuously stopping leader until there is one left.
+      for (int i = 0; i < participantCount - 1; i++) {
+        stopLatch[currentLeader.get()].countDown();
+        leaderSem.tryAcquire(10, TimeUnit.SECONDS);
+        followerSem.tryAcquire(10, TimeUnit.SECONDS);
+      }
+
+      stopLatch[currentLeader.get()].countDown();
+
+    } finally {
+      executor.shutdown();
+      executor.awaitTermination(5L, TimeUnit.SECONDS);
+
+      for (ZKClientService zkClient : zkClients) {
+        zkClient.stopAndWait();
+      }
+    }
+  }
+
+  @Test(timeout = 10000)
+  public void testCancel() throws InterruptedException, IOException {
+    List<LeaderElection> leaderElections = Lists.newArrayList();
+    List<ZKClientService> zkClients = Lists.newArrayList();
+
+    // Creates two participants
+    final Semaphore leaderSem = new Semaphore(0);
+    final Semaphore followerSem = new Semaphore(0);
+    final AtomicInteger leaderIdx = new AtomicInteger();
+
+    try {
+      for (int i = 0; i < 2; i++) {
+        ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+        zkClient.startAndWait();
+
+        zkClients.add(zkClient);
+
+        final int finalI = i;
+        leaderElections.add(new LeaderElection(zkClient, "/testCancel", new ElectionHandler() {
+          @Override
+          public void leader() {
+            leaderIdx.set(finalI);
+            leaderSem.release();
+          }
+
+          @Override
+          public void follower() {
+            followerSem.release();
+          }
+        }));
+      }
+
+      for (LeaderElection leaderElection : leaderElections) {
+        leaderElection.start();
+      }
+
+      leaderSem.tryAcquire(10, TimeUnit.SECONDS);
+      followerSem.tryAcquire(10, TimeUnit.SECONDS);
+
+      int leader = leaderIdx.get();
+      int follower = 1 - leader;
+
+      // Kill the follower session
+      KillZKSession.kill(zkClients.get(follower).getZooKeeperSupplier().get(),
+                         zkClients.get(follower).getConnectString(), 5000);
+
+      // Cancel the leader
+      leaderElections.get(leader).stopAndWait();
+
+      // Now follower should still be able to become leader.
+      leaderSem.tryAcquire(10, TimeUnit.SECONDS);
+
+      leader = leaderIdx.get();
+      follower = 1 - leader;
+
+      // Create another participant (use the old leader zkClient)
+      leaderElections.set(follower, new LeaderElection(zkClients.get(follower), "/testCancel", new ElectionHandler() {
+        @Override
+        public void leader() {
+          leaderSem.release();
+        }
+
+        @Override
+        public void follower() {
+          followerSem.release();
+        }
+      }));
+      leaderElections.get(follower).start();
+
+      // Cancel the follower first.
+      leaderElections.get(follower).stopAndWait();
+
+      // Cancel the leader.
+      leaderElections.get(leader).stopAndWait();
+
+      // Since the follower has been cancelled before leader, there should be no leader.
+      Assert.assertFalse(leaderSem.tryAcquire(2, TimeUnit.SECONDS));
+    } finally {
+      for (ZKClientService zkClient : zkClients) {
+        zkClient.stopAndWait();
+      }
+    }
+  }
+
+  @Test(timeout = 10000)
+  public void testDisconnect() throws IOException, InterruptedException {
+    File zkDataDir = tmpFolder.newFolder();
+    InMemoryZKServer ownZKServer = InMemoryZKServer.builder().setDataDir(zkDataDir).build();
+    ownZKServer.startAndWait();
+    try {
+      ZKClientService zkClient = ZKClientService.Builder.of(ownZKServer.getConnectionStr()).build();
+      zkClient.startAndWait();
+
+      try {
+        final Semaphore leaderSem = new Semaphore(0);
+        final Semaphore followerSem = new Semaphore(0);
+
+        LeaderElection leaderElection = new LeaderElection(zkClient, "/testDisconnect", new ElectionHandler() {
+          @Override
+          public void leader() {
+            leaderSem.release();
+          }
+
+          @Override
+          public void follower() {
+            followerSem.release();
+          }
+        });
+        leaderElection.start();
+
+        leaderSem.tryAcquire(10, TimeUnit.SECONDS);
+
+        int zkPort = ownZKServer.getLocalAddress().getPort();
+
+        // Disconnect by shutting the server and restart it on the same port
+        ownZKServer.stopAndWait();
+
+        // Right after disconnect, it should become follower
+        followerSem.tryAcquire(10, TimeUnit.SECONDS);
+
+        ownZKServer = InMemoryZKServer.builder().setDataDir(zkDataDir).setPort(zkPort).build();
+        ownZKServer.startAndWait();
+
+        // Right after reconnect, it should be leader again.
+        leaderSem.tryAcquire(10, TimeUnit.SECONDS);
+
+        // Now disconnect it again, but then cancel it before reconnect, it shouldn't become leader
+        ownZKServer.stopAndWait();
+
+        // Right after disconnect, it should become follower
+        followerSem.tryAcquire(10, TimeUnit.SECONDS);
+
+        ListenableFuture<?> cancelFuture = leaderElection.stop();
+
+        ownZKServer = InMemoryZKServer.builder().setDataDir(zkDataDir).setPort(zkPort).build();
+        ownZKServer.startAndWait();
+
+        Futures.getUnchecked(cancelFuture);
+
+        // After reconnect, it should not be leader
+        Assert.assertFalse(leaderSem.tryAcquire(2, TimeUnit.SECONDS));
+      } finally {
+        zkClient.stopAndWait();
+      }
+    } finally {
+      ownZKServer.stopAndWait();
+    }
+  }
+
+  @BeforeClass
+  public static void init() throws IOException {
+    zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
+    zkServer.startAndWait();
+  }
+
+  @AfterClass
+  public static void finish() {
+    zkServer.stopAndWait();
+  }
+}