You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ga...@apache.org on 2014/07/07 21:00:47 UTC
git commit: (TWILL-85) Make ZK-coordinated leader election available
to twill applications
Repository: incubator-twill
Updated Branches:
refs/heads/master 36b9df175 -> 94b53664a
(TWILL-85) Make ZK-coordinated leader election available to twill applications
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/94b53664
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/94b53664
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/94b53664
Branch: refs/heads/master
Commit: 94b53664a2366194407cf5d6c7e45607deb078e1
Parents: 36b9df1
Author: Gary Helmling <ga...@apache.org>
Authored: Mon Jul 7 12:00:20 2014 -0700
Committer: Gary Helmling <ga...@apache.org>
Committed: Mon Jul 7 12:00:20 2014 -0700
----------------------------------------------------------------------
.../org/apache/twill/api/ElectionHandler.java | 21 +
.../java/org/apache/twill/api/TwillContext.java | 9 +
.../twill/internal/BasicTwillContext.java | 16 +
.../apache/twill/internal/ElectionRegistry.java | 61 +++
.../internal/container/TwillContainerMain.java | 17 +-
.../container/TwillContainerService.java | 1 +
twill-zookeeper/pom.xml | 5 +
.../internal/zookeeper/LeaderElection.java | 388 +++++++++++++++++++
.../internal/zookeeper/LeaderElectionTest.java | 274 +++++++++++++
9 files changed, 788 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/94b53664/twill-api/src/main/java/org/apache/twill/api/ElectionHandler.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/ElectionHandler.java b/twill-api/src/main/java/org/apache/twill/api/ElectionHandler.java
new file mode 100644
index 0000000..4b8b1db
--- /dev/null
+++ b/twill-api/src/main/java/org/apache/twill/api/ElectionHandler.java
@@ -0,0 +1,21 @@
+package org.apache.twill.api;
+
+/**
+ * Handles events of election/un-election of leader.
+ */
+public interface ElectionHandler {
+
+ /**
+ * This method will get invoked when a participant becomes a leader in a
+ * leader election process. It is guaranteed that this method won't get called
+ * consecutively (i.e. called twice or more in a row).
+ */
+ void leader();
+
+ /**
+ * This method will get invoked when a participant is a follower in a
+ * leader election process. This method might get called multiple times without
+ * the {@link #leader()} method being called.
+ */
+ void follower();
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/94b53664/twill-api/src/main/java/org/apache/twill/api/TwillContext.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillContext.java b/twill-api/src/main/java/org/apache/twill/api/TwillContext.java
index f7a7ac1..10f4499 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillContext.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillContext.java
@@ -17,6 +17,7 @@
*/
package org.apache.twill.api;
+import org.apache.twill.common.Cancellable;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.apache.twill.discovery.ServiceDiscovered;
@@ -85,4 +86,12 @@ public interface TwillContext extends ServiceAnnouncer, DiscoveryServiceClient {
*/
@Override
ServiceDiscovered discover(String name);
+
+ /**
+ * Register to participate in a leader election by instances within the same {@link TwillApplication}.
+ *
+ * @param name Unique name for the election
+ * @return A {@link org.apache.twill.common.Cancellable} object representing this candidate's participation.
+ */
+ Cancellable electLeader(String name, ElectionHandler participantHandler);
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/94b53664/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java b/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
index 4a503e0..6eb3072 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
@@ -17,6 +17,7 @@
*/
package org.apache.twill.internal;
+import org.apache.twill.api.ElectionHandler;
import org.apache.twill.api.RunId;
import org.apache.twill.api.TwillContext;
import org.apache.twill.api.TwillRunnableSpecification;
@@ -46,10 +47,12 @@ public final class BasicTwillContext implements TwillContext {
private final int allowedMemoryMB;
private final int virtualCores;
private volatile int instanceCount;
+ private final ElectionRegistry elections;
public BasicTwillContext(RunId runId, RunId appRunId, InetAddress host, String[] args, String[] appArgs,
TwillRunnableSpecification spec, int instanceId,
DiscoveryService discoveryService, DiscoveryServiceClient discoveryServiceClient,
+ ElectionRegistry electionRegistry,
int instanceCount, int allowedMemoryMB, int virtualCores) {
this.runId = runId;
this.appRunId = appRunId;
@@ -60,6 +63,7 @@ public final class BasicTwillContext implements TwillContext {
this.instanceId = instanceId;
this.discoveryService = discoveryService;
this.discoveryServiceClient = discoveryServiceClient;
+ this.elections = electionRegistry;
this.instanceCount = instanceCount;
this.allowedMemoryMB = allowedMemoryMB;
this.virtualCores = virtualCores;
@@ -138,4 +142,16 @@ public final class BasicTwillContext implements TwillContext {
public ServiceDiscovered discover(String name) {
return discoveryServiceClient.discover(name);
}
+
+ @Override
+ public Cancellable electLeader(String name, ElectionHandler participantHandler) {
+ return elections.register(name, participantHandler);
+ }
+
+ /**
+ * Stops and frees any currently allocated resources.
+ */
+ public void stop() {
+ elections.shutdown();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/94b53664/twill-core/src/main/java/org/apache/twill/internal/ElectionRegistry.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ElectionRegistry.java b/twill-core/src/main/java/org/apache/twill/internal/ElectionRegistry.java
new file mode 100644
index 0000000..e2fe43e
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/ElectionRegistry.java
@@ -0,0 +1,61 @@
+package org.apache.twill.internal;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import org.apache.twill.api.ElectionHandler;
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.internal.zookeeper.LeaderElection;
+import org.apache.twill.zookeeper.ZKClient;
+
+/**
+ * Tracks currently active leader elections within the Twill container.
+ */
+public class ElectionRegistry {
+ private final ZKClient zkClient;
+ private final Multimap<String, LeaderElection> registry;
+
+ public ElectionRegistry(ZKClient zkClient) {
+ this.zkClient = zkClient;
+ Multimap<String, LeaderElection> multimap = HashMultimap.create();
+ this.registry = Multimaps.synchronizedMultimap(multimap);
+ }
+
+ /**
+ * Creates a new {@link LeaderElection} for the given arguments, starts the service, and adds it to the registry.
+ * @param name Name for the election.
+ * @param handler Callback to handle leader and follower transitions.
+ * @return An object to cancel the election participation.
+ */
+ public Cancellable register(String name, ElectionHandler handler) {
+ LeaderElection election = new LeaderElection(zkClient, name, handler);
+ election.start();
+ registry.put(name, election);
+ return new CancellableElection(name, election);
+ }
+
+ /**
+ * Stops all active {@link LeaderElection} processes.
+ */
+ public void shutdown() {
+ for (LeaderElection election : registry.values()) {
+ election.stop();
+ }
+ }
+
+ private class CancellableElection implements Cancellable {
+ private final String name;
+ private final LeaderElection election;
+
+ public CancellableElection(String name, LeaderElection election) {
+ this.name = name;
+ this.election = election;
+ }
+
+ @Override
+ public void cancel() {
+ election.stop();
+ registry.remove(name, election);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/94b53664/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
index c3aece6..1e2241e 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
@@ -36,6 +36,7 @@ import org.apache.twill.internal.Arguments;
import org.apache.twill.internal.BasicTwillContext;
import org.apache.twill.internal.Constants;
import org.apache.twill.internal.ContainerInfo;
+import org.apache.twill.internal.ElectionRegistry;
import org.apache.twill.internal.EnvContainerInfo;
import org.apache.twill.internal.EnvKeys;
import org.apache.twill.internal.RunIds;
@@ -87,6 +88,10 @@ public final class TwillContainerMain extends ServiceMain {
ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClientService);
+ ZKClient electionZKClient = getAppRunZKClient(zkClientService, appRunId);
+ // leader elections are namespaced by the application
+ ElectionRegistry electionRegistry = new ElectionRegistry(electionZKClient);
+
TwillSpecification twillSpec = loadTwillSpec(twillSpecFile);
renameLocalFiles(twillSpec.getRunnables().get(runnableName));
@@ -97,13 +102,13 @@ public final class TwillContainerMain extends ServiceMain {
runId, appRunId, containerInfo.getHost(),
arguments.getRunnableArguments().get(runnableName).toArray(new String[0]),
arguments.getArguments().toArray(new String[0]),
- runnableSpec, instanceId, discoveryService, discoveryService, instanceCount,
- containerInfo.getMemoryMB(), containerInfo.getVirtualCores()
+ runnableSpec, instanceId, discoveryService, discoveryService, electionRegistry,
+ instanceCount, containerInfo.getMemoryMB(), containerInfo.getVirtualCores()
);
+ ZKClient containerZKClient = getContainerZKClient(zkClientService, appRunId, runnableName);
Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
- Service service = new TwillContainerService(context, containerInfo,
- getContainerZKClient(zkClientService, appRunId, runnableName),
+ Service service = new TwillContainerService(context, containerInfo, containerZKClient,
runId, runnableSpec, getClassLoader(),
createAppLocation(conf));
new TwillContainerMain().doMain(zkClientService, service);
@@ -141,6 +146,10 @@ public final class TwillContainerMain extends ServiceMain {
}
}
+ private static ZKClient getAppRunZKClient(ZKClient zkClient, RunId appRunId) {
+ return ZKClients.namespace(zkClient, String.format("/%s", appRunId));
+ }
+
private static ZKClient getContainerZKClient(ZKClient zkClient, RunId appRunId, String runnableName) {
return ZKClients.namespace(zkClient, String.format("/%s/runnables/%s", appRunId, runnableName));
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/94b53664/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
index b212a29..a476bc1 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
@@ -158,6 +158,7 @@ public final class TwillContainerService extends AbstractTwillService {
protected void shutDown() throws Exception {
commandExecutor.shutdownNow();
runnable.destroy();
+ context.stop();
Loggings.forceFlush();
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/94b53664/twill-zookeeper/pom.xml
----------------------------------------------------------------------
diff --git a/twill-zookeeper/pom.xml b/twill-zookeeper/pom.xml
index 1b37cc5..5aba362 100644
--- a/twill-zookeeper/pom.xml
+++ b/twill-zookeeper/pom.xml
@@ -32,6 +32,11 @@
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
+ <artifactId>twill-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
<artifactId>twill-common</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/94b53664/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java
new file mode 100644
index 0000000..50dc6a8
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java
@@ -0,0 +1,388 @@
+package org.apache.twill.internal.zookeeper;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.twill.api.ElectionHandler;
+import org.apache.twill.common.Threads;
+import org.apache.twill.zookeeper.NodeChildren;
+import org.apache.twill.zookeeper.OperationFuture;
+import org.apache.twill.zookeeper.ZKClient;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Performs leader election as specified in
+ * <a href="http://zookeeper.apache.org/doc/trunk/recipes.html#sc_leaderElection">Zookeeper recipes</a>.
+ *
+ * It will enter the leader election process when {@link #start()} is called and leave the process when
+ * {@link #stop()} is invoked.
+ */
+public final class LeaderElection extends AbstractService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LeaderElection.class);
+
+ private enum State {
+ IN_PROGRESS,
+ LEADER,
+ FOLLOWER,
+ CANCELLED
+ }
+
+ private final String guid;
+
+ private final ZKClient zkClient;
+ private final String zkFolderPath;
+ private final ElectionHandler handler;
+
+ private ExecutorService executor;
+ private String zkNodePath;
+ private State state;
+
+ public LeaderElection(ZKClient zkClient, String prefix, ElectionHandler handler) {
+ this.guid = UUID.randomUUID().toString();
+ this.zkClient = zkClient;
+ this.zkFolderPath = prefix.startsWith("/") ? prefix : "/" + prefix;
+ this.handler = handler;
+ }
+
+ @Override
+ protected void doStart() {
+ LOG.info("Start leader election on {}{} with guid {}", zkClient.getConnectString(), zkFolderPath, guid);
+
+ executor = Executors.newSingleThreadExecutor(
+ Threads.createDaemonThreadFactory("leader-election" + zkFolderPath.replace('/', '-')));
+
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ register();
+ LeaderElection.this.zkClient.addConnectionWatcher(wrapWatcher(new ConnectionWatcher()));
+ }
+ });
+ notifyStarted();
+ }
+
+ @Override
+ protected void doStop() {
+ final SettableFuture<String> completion = SettableFuture.create();
+ Futures.addCallback(completion, new FutureCallback<String>() {
+ @Override
+ public void onSuccess(String result) {
+ notifyStopped();
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ notifyFailed(t);
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ if (state != State.CANCELLED) {
+ // becomeFollower has to be called before deleting node to make sure no two active leader.
+ if (state == State.LEADER) {
+ becomeFollower();
+ }
+ state = State.CANCELLED;
+ doDeleteNode(completion);
+ }
+ }
+ });
+ }
+
+ private byte[] getNodeData() {
+ String hostname;
+ try {
+ hostname = InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (Exception e) {
+ LOG.warn("Failed to get local hostname.", e);
+ hostname = "unknown";
+ }
+ return hostname.getBytes(Charsets.UTF_8);
+ }
+
+ private void register() {
+ state = State.IN_PROGRESS;
+ zkNodePath = null;
+
+ // Register for election
+ final String path = String.format("%s/%s-", zkFolderPath, guid);
+ LOG.debug("Registering for election {} with path {}", zkFolderPath, path);
+
+ OperationFuture<String> createFuture = zkClient.create(path, getNodeData(), CreateMode.EPHEMERAL_SEQUENTIAL, true);
+ Futures.addCallback(createFuture, new FutureCallback<String>() {
+
+ @Override
+ public void onSuccess(String result) {
+ LOG.debug("Created zk node {}", result);
+ zkNodePath = result;
+ if (state == State.CANCELLED) {
+ // If cancel was called after create(), but before callback trigger, delete the node created.
+ deleteNode();
+ } else {
+ runElection();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Got exception during node creation for folder {}", path, t);
+ // The node may created successfully on server and then server crash,
+ // which client might receive failure instead.
+ // Not checking for cancel here, as we don't know the zkNodePath.
+ // Needs to rely on runElection to handle cancel.
+ runElection();
+ }
+ }, executor);
+ }
+
+ private void runElection() {
+ LOG.debug("Running election for {}", zkNodePath);
+
+ OperationFuture<NodeChildren> childrenFuture = zkClient.getChildren(zkFolderPath);
+ Futures.addCallback(childrenFuture, new FutureCallback<NodeChildren>() {
+ @Override
+ public void onSuccess(NodeChildren result) {
+ Optional<String> nodeToWatch = findNodeToWatch(result.getChildren());
+
+ if (state == State.CANCELLED) {
+ deleteNode();
+ return;
+ }
+
+ if (nodeToWatch == null) {
+ // zkNodePath unknown, need to run register.
+ register();
+ return;
+ }
+
+ if (nodeToWatch.isPresent()) {
+ // Watch for deletion of largest node smaller than current node
+ watchNode(zkFolderPath + "/" + nodeToWatch.get(), new LowerNodeWatcher());
+ } else {
+ // This is leader
+ becomeLeader();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.warn("Got exception during children fetch for {}. Retry.", zkFolderPath, t);
+ // If cancel has been called before this callback and the zkNodePath is known, we can simply
+ // delete the node. Otherwise, runElection() is needed to determine the zkNodePath if it is cancelled.
+ if (state == State.CANCELLED && zkNodePath != null) {
+ deleteNode();
+ } else {
+ runElection();
+ }
+ }
+ }, executor);
+ }
+
+ private void becomeLeader() {
+ state = State.LEADER;
+ LOG.debug("Become leader for {}.", zkNodePath);
+ try {
+ handler.leader();
+ } catch (Throwable t) {
+ LOG.warn("Exception thrown when calling leader() method. Withdraw from the leader election process.", t);
+ stop();
+ }
+ }
+
+ private void becomeFollower() {
+ state = State.FOLLOWER;
+ LOG.debug("Become follower for {}", zkNodePath);
+ try {
+ handler.follower();
+ } catch (Throwable t) {
+ LOG.warn("Exception thrown when calling follower() method. Withdraw from the leader election process.", t);
+ stop();
+ }
+ }
+
+ /**
+ * Starts watching for the max. of smaller node.
+ */
+ private void watchNode(final String nodePath, Watcher watcher) {
+ OperationFuture<Stat> watchFuture = zkClient.exists(nodePath, watcher);
+ Futures.addCallback(watchFuture, new FutureCallback<Stat>() {
+ @Override
+ public void onSuccess(Stat result) {
+ if (state != State.CANCELLED) {
+ becomeFollower();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.warn("Exception while setting watch on node {}. Retry.", nodePath, t);
+ runElection();
+ }
+ }, executor);
+ }
+
+ private ListenableFuture<String> deleteNode() {
+ SettableFuture<String> completion = SettableFuture.create();
+ doDeleteNode(completion);
+ return completion;
+ }
+
+ private void doDeleteNode(final SettableFuture<String> completion) {
+ if (zkNodePath == null) {
+ completion.set(null);
+ return;
+ }
+ try {
+ Futures.addCallback(zkClient.delete(zkNodePath), new FutureCallback<String>() {
+ @Override
+ public void onSuccess(String result) {
+ LOG.debug("Node deleted: {}", result);
+ completion.set(result);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.warn("Fail to delete node: {}", zkNodePath);
+ if (!(t instanceof KeeperException.NoNodeException)) {
+ LOG.debug("Retry delete node: {}", zkNodePath);
+ doDeleteNode(completion);
+ } else {
+ completion.setException(t);
+ }
+ }
+ }, executor);
+ } catch (Throwable t) {
+ // If any exception happens when calling delete, treats it as completed with failure.
+ completion.setException(t);
+ }
+ }
+
+ private Watcher wrapWatcher(final Watcher watcher) {
+ return new Watcher() {
+ @Override
+ public void process(final WatchedEvent event) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ watcher.process(event);
+ }
+ });
+ }
+ };
+ }
+
+ /**
+ * Find the node to watch for and return it in the {@link com.google.common.base.Optional} value. If this client is
+ * the leader, return an {@link com.google.common.base.Optional#absent()}. This method also tries to set the
+ * zkNodePath if it is not set and return {@code null} if the zkNodePath cannot be determined.
+ */
+ private Optional<String> findNodeToWatch(List<String> nodes) {
+ // If this node path is not know, find it first.
+ if (zkNodePath == null) {
+ for (String node : nodes) {
+ if (node.startsWith(guid)) {
+ zkNodePath = zkFolderPath + "/" + node;
+ break;
+ }
+ }
+ }
+
+ if (zkNodePath == null) {
+ // Cannot find the node path, return null
+ return null;
+ }
+
+ // Find the maximum node that the smaller than node created by this client.
+ int currentId = Integer.parseInt(zkNodePath.substring(zkNodePath.indexOf(guid) + guid.length() + 1));
+ String nodeToWatch = null;
+ int maxOfMins = Integer.MIN_VALUE;
+ for (String node : nodes) {
+ int nodeId = Integer.parseInt(node.substring(guid.length() + 1));
+ if (nodeId < currentId && nodeId > maxOfMins) {
+ maxOfMins = nodeId;
+ nodeToWatch = node;
+ }
+ }
+
+ return nodeToWatch == null ? Optional.<String>absent() : Optional.of(nodeToWatch);
+ }
+
+ /**
+ * Watches lower node.
+ */
+ private class LowerNodeWatcher implements Watcher {
+ @Override
+ public void process(WatchedEvent event) {
+ if (state != State.CANCELLED && event.getType() == Event.EventType.NodeDeleted) {
+ LOG.debug("Lower node deleted {} for election {}.", event, zkNodePath);
+ runElection();
+ }
+ }
+ }
+
+ /**
+ * Watches zookeeper connection.
+ */
+ private class ConnectionWatcher implements Watcher {
+ private boolean expired;
+ private boolean disconnected;
+
+ @Override
+ public void process(WatchedEvent event) {
+ switch (event.getState()) {
+ case Disconnected:
+ disconnected = true;
+ LOG.info("Disconnected from ZK: {} for {}", zkClient.getConnectString(), zkFolderPath);
+ if (state == State.LEADER) {
+ // becomeFollower has to be called in disconnect so that no two active leader is possible.
+ LOG.info("Stepping down as leader due to disconnect: {} for {}", zkClient.getConnectString(), zkFolderPath);
+ becomeFollower();
+ }
+ break;
+ case SyncConnected:
+ boolean runElection = disconnected && !expired && state != State.IN_PROGRESS;
+ boolean runRegister = disconnected && expired && state != State.IN_PROGRESS;
+ disconnected = false;
+ expired = false;
+ if (runElection) {
+ // If the state is cancelled (meaning a cancel happens between disconnect and connect),
+ // still runElection() so that it has chance to delete the node (as it's not expired, the node stays
+ // after reconnection).
+ if (state != State.CANCELLED) {
+ state = State.IN_PROGRESS;
+ }
+ LOG.info("Connected to ZK, running election: {} for {}", zkClient.getConnectString(), zkFolderPath);
+ runElection();
+ } else if (runRegister && state != State.CANCELLED) {
+ LOG.info("Connected to ZK, registering: {} for {}", zkClient.getConnectString(), zkFolderPath);
+ register();
+ }
+
+ break;
+ case Expired:
+ LOG.info("ZK session expired: {} for {}", zkClient.getConnectString(), zkFolderPath);
+ expired = true;
+ break;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/94b53664/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java b/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java
new file mode 100644
index 0000000..468ba59
--- /dev/null
+++ b/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java
@@ -0,0 +1,274 @@
+package org.apache.twill.internal.zookeeper;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.twill.api.ElectionHandler;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Test for {@link LeaderElection}.
+ */
+public class LeaderElectionTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LeaderElectionTest.class);
+
+ @ClassRule
+ public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ private static InMemoryZKServer zkServer;
+
+ @Test(timeout = 5000)
+ public void testElection() throws ExecutionException, InterruptedException, BrokenBarrierException {
+ ExecutorService executor = Executors.newCachedThreadPool();
+
+ int participantCount = 5;
+ final CyclicBarrier barrier = new CyclicBarrier(participantCount + 1);
+ final Semaphore leaderSem = new Semaphore(0);
+ final Semaphore followerSem = new Semaphore(0);
+ final CountDownLatch[] stopLatch = new CountDownLatch[participantCount];
+ final List<ZKClientService> zkClients = Lists.newArrayList();
+
+ try {
+ final AtomicInteger currentLeader = new AtomicInteger(-1);
+ for (int i = 0; i < participantCount; i++) {
+ final ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+ zkClient.startAndWait();
+ stopLatch[i] = new CountDownLatch(1);
+ zkClients.add(zkClient);
+
+ final int idx = i;
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ barrier.await();
+
+ LeaderElection leaderElection = new LeaderElection(zkClient, "/test", new ElectionHandler() {
+ @Override
+ public void leader() {
+ currentLeader.set(idx);
+ leaderSem.release();
+ }
+
+ @Override
+ public void follower() {
+ followerSem.release();
+ }
+ });
+ leaderElection.start();
+
+ stopLatch[idx].await(10, TimeUnit.SECONDS);
+ leaderElection.stopAndWait();
+
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ });
+ }
+
+ barrier.await();
+ leaderSem.tryAcquire(10, TimeUnit.SECONDS);
+ followerSem.tryAcquire(participantCount - 1, 10, TimeUnit.SECONDS);
+
+ // Continuously stopping leader until there is one left.
+ for (int i = 0; i < participantCount - 1; i++) {
+ stopLatch[currentLeader.get()].countDown();
+ leaderSem.tryAcquire(10, TimeUnit.SECONDS);
+ followerSem.tryAcquire(10, TimeUnit.SECONDS);
+ }
+
+ stopLatch[currentLeader.get()].countDown();
+
+ } finally {
+ executor.shutdown();
+ executor.awaitTermination(5L, TimeUnit.SECONDS);
+
+ for (ZKClientService zkClient : zkClients) {
+ zkClient.stopAndWait();
+ }
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testCancel() throws InterruptedException, IOException {
+ List<LeaderElection> leaderElections = Lists.newArrayList();
+ List<ZKClientService> zkClients = Lists.newArrayList();
+
+ // Creates two participants
+ final Semaphore leaderSem = new Semaphore(0);
+ final Semaphore followerSem = new Semaphore(0);
+ final AtomicInteger leaderIdx = new AtomicInteger();
+
+ try {
+ for (int i = 0; i < 2; i++) {
+ ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+ zkClient.startAndWait();
+
+ zkClients.add(zkClient);
+
+ final int finalI = i;
+ leaderElections.add(new LeaderElection(zkClient, "/testCancel", new ElectionHandler() {
+ @Override
+ public void leader() {
+ leaderIdx.set(finalI);
+ leaderSem.release();
+ }
+
+ @Override
+ public void follower() {
+ followerSem.release();
+ }
+ }));
+ }
+
+ for (LeaderElection leaderElection : leaderElections) {
+ leaderElection.start();
+ }
+
+ leaderSem.tryAcquire(10, TimeUnit.SECONDS);
+ followerSem.tryAcquire(10, TimeUnit.SECONDS);
+
+ int leader = leaderIdx.get();
+ int follower = 1 - leader;
+
+ // Kill the follower session
+ KillZKSession.kill(zkClients.get(follower).getZooKeeperSupplier().get(),
+ zkClients.get(follower).getConnectString(), 5000);
+
+ // Cancel the leader
+ leaderElections.get(leader).stopAndWait();
+
+ // Now follower should still be able to become leader.
+ leaderSem.tryAcquire(10, TimeUnit.SECONDS);
+
+ leader = leaderIdx.get();
+ follower = 1 - leader;
+
+ // Create another participant (use the old leader zkClient)
+ leaderElections.set(follower, new LeaderElection(zkClients.get(follower), "/testCancel", new ElectionHandler() {
+ @Override
+ public void leader() {
+ leaderSem.release();
+ }
+
+ @Override
+ public void follower() {
+ followerSem.release();
+ }
+ }));
+ leaderElections.get(follower).start();
+
+ // Cancel the follower first.
+ leaderElections.get(follower).stopAndWait();
+
+ // Cancel the leader.
+ leaderElections.get(leader).stopAndWait();
+
+ // Since the follower has been cancelled before leader, there should be no leader.
+ Assert.assertFalse(leaderSem.tryAcquire(2, TimeUnit.SECONDS));
+ } finally {
+ for (ZKClientService zkClient : zkClients) {
+ zkClient.stopAndWait();
+ }
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testDisconnect() throws IOException, InterruptedException {
+ File zkDataDir = tmpFolder.newFolder();
+ InMemoryZKServer ownZKServer = InMemoryZKServer.builder().setDataDir(zkDataDir).build();
+ ownZKServer.startAndWait();
+ try {
+ ZKClientService zkClient = ZKClientService.Builder.of(ownZKServer.getConnectionStr()).build();
+ zkClient.startAndWait();
+
+ try {
+ final Semaphore leaderSem = new Semaphore(0);
+ final Semaphore followerSem = new Semaphore(0);
+
+ LeaderElection leaderElection = new LeaderElection(zkClient, "/testDisconnect", new ElectionHandler() {
+ @Override
+ public void leader() {
+ leaderSem.release();
+ }
+
+ @Override
+ public void follower() {
+ followerSem.release();
+ }
+ });
+ leaderElection.start();
+
+ leaderSem.tryAcquire(10, TimeUnit.SECONDS);
+
+ int zkPort = ownZKServer.getLocalAddress().getPort();
+
+ // Disconnect by shutting the server and restart it on the same port
+ ownZKServer.stopAndWait();
+
+ // Right after disconnect, it should become follower
+ followerSem.tryAcquire(10, TimeUnit.SECONDS);
+
+ ownZKServer = InMemoryZKServer.builder().setDataDir(zkDataDir).setPort(zkPort).build();
+ ownZKServer.startAndWait();
+
+ // Right after reconnect, it should be leader again.
+ leaderSem.tryAcquire(10, TimeUnit.SECONDS);
+
+ // Now disconnect it again, but then cancel it before reconnect, it shouldn't become leader
+ ownZKServer.stopAndWait();
+
+ // Right after disconnect, it should become follower
+ followerSem.tryAcquire(10, TimeUnit.SECONDS);
+
+ ListenableFuture<?> cancelFuture = leaderElection.stop();
+
+ ownZKServer = InMemoryZKServer.builder().setDataDir(zkDataDir).setPort(zkPort).build();
+ ownZKServer.startAndWait();
+
+ Futures.getUnchecked(cancelFuture);
+
+ // After reconnect, it should not be leader
+ Assert.assertFalse(leaderSem.tryAcquire(2, TimeUnit.SECONDS));
+ } finally {
+ zkClient.stopAndWait();
+ }
+ } finally {
+ ownZKServer.stopAndWait();
+ }
+ }
+
+ @BeforeClass
+ public static void init() throws IOException {
+ zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
+ zkServer.startAndWait();
+ }
+
+ @AfterClass
+ public static void finish() {
+ zkServer.stopAndWait();
+ }
+}