You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by dm...@apache.org on 2016/12/01 17:02:32 UTC

[3/4] aurora git commit: Revert removal of twitter/commons/zk based leadership code

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java
new file mode 100644
index 0000000..ce243fb
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java
@@ -0,0 +1,372 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.net.InetSocketAddressHelper;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.common.PathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages a connection to a ZooKeeper cluster.
+ */
+public class ZooKeeperClient {
+
+  /**
+   * Indicates an error connecting to a zookeeper cluster.
+   */
+  public class ZooKeeperConnectionException extends Exception {
+    ZooKeeperConnectionException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  private final class SessionState {
+    private final long sessionId;
+    private final byte[] sessionPasswd;
+
+    private SessionState(long sessionId, byte[] sessionPasswd) {
+      this.sessionId = sessionId;
+      this.sessionPasswd = sessionPasswd;
+    }
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperClient.class);
+
+  private static final Amount<Long,Time> WAIT_FOREVER = Amount.of(0L, Time.MILLISECONDS);
+
+  private final int sessionTimeoutMs;
+  private final Optional<Credentials> credentials;
+  private final String zooKeeperServers;
+  // GuardedBy "this", but still volatile for tests, where we want to be able to see writes
+  // made from within long synchronized blocks.
+  private volatile ZooKeeper zooKeeper;
+  private SessionState sessionState;
+
+  private final Set<Watcher> watchers = new CopyOnWriteArraySet<Watcher>();
+  private final BlockingQueue<WatchedEvent> eventQueue = new LinkedBlockingQueue<WatchedEvent>();
+
+  private static Iterable<InetSocketAddress> combine(InetSocketAddress address,
+      InetSocketAddress... addresses) {
+    return ImmutableSet.<InetSocketAddress>builder().add(address).add(addresses).build();
+  }
+
+  /**
+   * Creates an unconnected client that will lazily attempt to connect on the first call to
+   * {@link #get()}.
+   *
+   * @param sessionTimeout the ZK session timeout
+   * @param zooKeeperServer the first, required ZK server
+   * @param zooKeeperServers any additional servers forming the ZK cluster
+   */
+  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, InetSocketAddress zooKeeperServer,
+      InetSocketAddress... zooKeeperServers) {
+    this(sessionTimeout, combine(zooKeeperServer, zooKeeperServers));
+  }
+
+  /**
+   * Creates an unconnected client that will lazily attempt to connect on the first call to
+   * {@link #get}.
+   *
+   * @param sessionTimeout the ZK session timeout
+   * @param zooKeeperServers the set of servers forming the ZK cluster
+   */
+  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout,
+      Iterable<InetSocketAddress> zooKeeperServers) {
+    this(sessionTimeout, Optional.absent(), Optional.absent(), zooKeeperServers);
+  }
+
+  /**
+   * Creates an unconnected client that will lazily attempt to connect on the first call to
+   * {@link #get()}.  All successful connections will be authenticated with the given
+   * {@code credentials}.
+   *
+   * @param sessionTimeout the ZK session timeout
+   * @param credentials the credentials to authenticate with
+   * @param zooKeeperServer the first, required ZK server
+   * @param zooKeeperServers any additional servers forming the ZK cluster
+   */
+  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials,
+      InetSocketAddress zooKeeperServer, InetSocketAddress... zooKeeperServers) {
+    this(sessionTimeout,
+        Optional.of(credentials),
+        Optional.absent(),
+        combine(zooKeeperServer, zooKeeperServers));
+  }
+
+  /**
+   * Creates an unconnected client that will lazily attempt to connect on the first call to
+   * {@link #get}.  All successful connections will be authenticated with the given
+   * {@code credentials}.
+   *
+   * @param sessionTimeout the ZK session timeout
+   * @param credentials the credentials to authenticate with
+   * @param zooKeeperServers the set of servers forming the ZK cluster
+   */
+  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials,
+      Iterable<InetSocketAddress> zooKeeperServers) {
+        this(sessionTimeout,
+            Optional.of(credentials),
+            Optional.absent(),
+            zooKeeperServers);
+      }
+
+  /**
+   * Creates an unconnected client that will lazily attempt to connect on the first call to
+   * {@link #get}.  All successful connections will be authenticated with the given
+   * {@code credentials}.
+   *
+   * @param sessionTimeout the ZK session timeout
+   * @param credentials the credentials to authenticate with
+   * @param chrootPath an optional chroot path
+   * @param zooKeeperServers the set of servers forming the ZK cluster
+   */
+  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Optional<Credentials> credentials,
+      Optional<String> chrootPath, Iterable<InetSocketAddress> zooKeeperServers) {
+    this.sessionTimeoutMs = Preconditions.checkNotNull(sessionTimeout).as(Time.MILLISECONDS);
+    this.credentials = Preconditions.checkNotNull(credentials);
+
+    if (chrootPath.isPresent()) {
+      PathUtils.validatePath(chrootPath.get());
+    }
+
+    Preconditions.checkNotNull(zooKeeperServers);
+    Preconditions.checkArgument(!Iterables.isEmpty(zooKeeperServers),
+        "Must present at least 1 ZK server");
+
+    Thread watcherProcessor = new Thread("ZookeeperClient-watcherProcessor") {
+      @Override
+      public void run() {
+        while (true) {
+          try {
+            WatchedEvent event = eventQueue.take();
+            for (Watcher watcher : watchers) {
+              watcher.process(event);
+            }
+          } catch (InterruptedException e) { /* ignore */ }
+        }
+      }
+    };
+    watcherProcessor.setDaemon(true);
+    watcherProcessor.start();
+
+    Iterable<String> servers =
+        Iterables.transform(ImmutableSet.copyOf(zooKeeperServers),
+            InetSocketAddressHelper::toString);
+    this.zooKeeperServers = Joiner.on(',').join(servers).concat(chrootPath.or(""));
+  }
+
+  /**
+   * Returns the current active ZK connection or establishes a new one if none has yet been
+   * established or a previous connection was disconnected or had its session time out.  This method
+   * will attempt to re-use sessions when possible.  Equivalent to:
+   * <pre>get(Amount.of(0L, ...)</pre>.
+   *
+   * @return a connected ZooKeeper client
+   * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster
+   * @throws InterruptedException if interrupted while waiting for a connection to be established
+   */
+  public synchronized ZooKeeper get() throws ZooKeeperConnectionException, InterruptedException {
+    try {
+      return get(WAIT_FOREVER);
+    } catch (TimeoutException e) {
+      InterruptedException interruptedException =
+          new InterruptedException("Got an unexpected TimeoutException for 0 wait");
+      interruptedException.initCause(e);
+      throw interruptedException;
+    }
+  }
+
+  /**
+   * Returns the current active ZK connection or establishes a new one if none has yet been
+   * established or a previous connection was disconnected or had its session time out.  This
+   * method will attempt to re-use sessions when possible.
+   *
+   * @param connectionTimeout the maximum amount of time to wait for the connection to the ZK
+   *     cluster to be established; 0 to wait forever
+   * @return a connected ZooKeeper client
+   * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster
+   * @throws InterruptedException if interrupted while waiting for a connection to be established
+   * @throws TimeoutException if a connection could not be established within the configured
+   *     session timeout
+   */
+  public synchronized ZooKeeper get(Amount<Long, Time> connectionTimeout)
+      throws ZooKeeperConnectionException, InterruptedException, TimeoutException {
+
+    if (zooKeeper == null) {
+      final CountDownLatch connected = new CountDownLatch(1);
+      Watcher watcher = event -> {
+        switch (event.getType()) {
+          // Guard the None type since this watch may be used as the default watch on calls by
+          // the client outside our control.
+          case None:
+            switch (event.getState()) {
+              case Expired:
+                LOG.info("Zookeeper session expired. Event: " + event);
+                close();
+                break;
+              case SyncConnected:
+                connected.countDown();
+                break;
+            }
+        }
+
+        eventQueue.offer(event);
+      };
+
+      try {
+        zooKeeper = (sessionState != null)
+          ? new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher, sessionState.sessionId,
+            sessionState.sessionPasswd)
+          : new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher);
+      } catch (IOException e) {
+        throw new ZooKeeperConnectionException(
+            "Problem connecting to servers: " + zooKeeperServers, e);
+      }
+
+      if (connectionTimeout.getValue() > 0) {
+        if (!connected.await(connectionTimeout.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS)) {
+          close();
+          throw new TimeoutException("Timed out waiting for a ZK connection after "
+                                     + connectionTimeout);
+        }
+      } else {
+        try {
+          connected.await();
+        } catch (InterruptedException ex) {
+          LOG.info("Interrupted while waiting to connect to zooKeeper");
+          close();
+          throw ex;
+        }
+      }
+      if (credentials.isPresent()) {
+        Credentials zkCredentials = credentials.get();
+        zooKeeper.addAuthInfo(zkCredentials.scheme(), zkCredentials.authToken());
+      }
+
+      sessionState = new SessionState(zooKeeper.getSessionId(), zooKeeper.getSessionPasswd());
+    }
+    return zooKeeper;
+  }
+
+  /**
+   * Clients that need to re-establish state after session expiration can register an
+   * {@code onExpired} command to execute.
+   *
+   * @param onExpired the {@code Command} to register
+   * @return the new {@link Watcher} which can later be passed to {@link #unregister} for
+   *     removal.
+   */
+  public Watcher registerExpirationHandler(final Command onExpired) {
+    Watcher watcher = event -> {
+      if (event.getType() == EventType.None && event.getState() == KeeperState.Expired) {
+        onExpired.execute();
+      }
+    };
+    register(watcher);
+    return watcher;
+  }
+
+  /**
+   * Clients that need to register a top-level {@code Watcher} should do so using this method.  The
+   * registered {@code watcher} will remain registered across re-connects and session expiration
+   * events.
+   *
+   * @param watcher the {@code Watcher to register}
+   */
+  public void register(Watcher watcher) {
+    watchers.add(watcher);
+  }
+
+  /**
+   * Clients can attempt to unregister a top-level {@code Watcher} that has previously been
+   * registered.
+   *
+   * @param watcher the {@code Watcher} to unregister as a top-level, persistent watch
+   * @return whether the given {@code Watcher} was found and removed from the active set
+   */
+  public boolean unregister(Watcher watcher) {
+    return watchers.remove(watcher);
+  }
+
+  /**
+   * Checks to see if the client might reasonably re-try an operation given the exception thrown
+   * while attempting it.  If the ZooKeeper session should be expired to enable the re-try to
+   * succeed this method will expire it as a side-effect.
+   *
+   * @param e the exception to test
+   * @return true if a retry can be attempted
+   */
+  public boolean shouldRetry(KeeperException e) {
+    if (e instanceof SessionExpiredException) {
+      close();
+    }
+    return ZooKeeperUtils.isRetryable(e);
+  }
+
+  /**
+   * Closes the current connection if any expiring the current ZooKeeper session.  Any subsequent
+   * calls to this method will no-op until the next successful {@link #get}.
+   */
+  public synchronized void close() {
+    if (zooKeeper != null) {
+      try {
+        zooKeeper.close();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOG.warn("Interrupted trying to close zooKeeper");
+      } finally {
+        zooKeeper = null;
+        sessionState = null;
+      }
+    }
+  }
+
+  @VisibleForTesting
+  synchronized boolean isClosed() {
+    return zooKeeper == null;
+  }
+
+  @VisibleForTesting
+  ZooKeeper getZooKeeperClientForTests() {
+    return zooKeeper;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java
new file mode 100644
index 0000000..2ada264
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utilities for dealing with zoo keeper.
+ */
+public final class ZooKeeperUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperUtils.class);
+
+  /**
+   * An appropriate default session timeout for Twitter ZooKeeper clusters.
+   */
+  public static final Amount<Integer,Time> DEFAULT_ZK_SESSION_TIMEOUT = Amount.of(4, Time.SECONDS);
+
+  /**
+   * The magic version number that allows any mutation to always succeed regardless of actual
+   * version number.
+   */
+  public static final int ANY_VERSION = -1;
+
+  /**
+   * An ACL that gives all permissions any user authenticated or not.
+   */
+  public static final ImmutableList<ACL> OPEN_ACL_UNSAFE =
+      ImmutableList.copyOf(Ids.OPEN_ACL_UNSAFE);
+
+  /**
+   * An ACL that gives all permissions to node creators and read permissions only to everyone else.
+   */
+  public static final ImmutableList<ACL> EVERYONE_READ_CREATOR_ALL =
+      ImmutableList.<ACL>builder()
+          .addAll(Ids.CREATOR_ALL_ACL)
+          .addAll(Ids.READ_ACL_UNSAFE)
+          .build();
+
+  /**
+   * Returns true if the given exception indicates an error that can be resolved by retrying the
+   * operation without modification.
+   *
+   * @param e the exception to check
+   * @return true if the causing operation is strictly retryable
+   */
+  public static boolean isRetryable(KeeperException e) {
+    Preconditions.checkNotNull(e);
+
+    switch (e.code()) {
+      case CONNECTIONLOSS:
+      case SESSIONEXPIRED:
+      case SESSIONMOVED:
+      case OPERATIONTIMEOUT:
+        return true;
+
+      case RUNTIMEINCONSISTENCY:
+      case DATAINCONSISTENCY:
+      case MARSHALLINGERROR:
+      case BADARGUMENTS:
+      case NONODE:
+      case NOAUTH:
+      case BADVERSION:
+      case NOCHILDRENFOREPHEMERALS:
+      case NODEEXISTS:
+      case NOTEMPTY:
+      case INVALIDCALLBACK:
+      case INVALIDACL:
+      case AUTHFAILED:
+      case UNIMPLEMENTED:
+
+      // These two should not be encountered - they are used internally by ZK to specify ranges
+      case SYSTEMERROR:
+      case APIERROR:
+
+      case OK: // This is actually an invalid ZK exception code
+
+      default:
+        return false;
+    }
+  }
+
+  /**
+   * Ensures the given {@code path} exists in the ZK cluster accessed by {@code zkClient}.  If the
+   * path already exists, nothing is done; however if any portion of the path is missing, it will be
+   * created with the given {@code acl} as a persistent zookeeper node.  The given {@code path} must
+   * be a valid zookeeper absolute path.
+   *
+   * @param zkClient the client to use to access the ZK cluster
+   * @param acl the acl to use if creating path nodes
+   * @param path the path to ensure exists
+   * @throws ZooKeeperConnectionException if there was a problem accessing the ZK cluster
+   * @throws InterruptedException if we were interrupted attempting to connect to the ZK cluster
+   * @throws KeeperException if there was a problem in ZK
+   */
+  public static void ensurePath(ZooKeeperClient zkClient, List<ACL> acl, String path)
+      throws ZooKeeperConnectionException, InterruptedException, KeeperException {
+    Preconditions.checkNotNull(zkClient);
+    Preconditions.checkNotNull(path);
+    Preconditions.checkArgument(path.startsWith("/"));
+
+    ensurePathInternal(zkClient, acl, path);
+  }
+
+  private static void ensurePathInternal(ZooKeeperClient zkClient, List<ACL> acl, String path)
+      throws ZooKeeperConnectionException, InterruptedException, KeeperException {
+    if (zkClient.get().exists(path, false) == null) {
+      // The current path does not exist; so back up a level and ensure the parent path exists
+      // unless we're already a root-level path.
+      int lastPathIndex = path.lastIndexOf('/');
+      if (lastPathIndex > 0) {
+        ensurePathInternal(zkClient, acl, path.substring(0, lastPathIndex));
+      }
+
+      // We've ensured our parent path (if any) exists so we can proceed to create our path.
+      try {
+        zkClient.get().create(path, null, acl, CreateMode.PERSISTENT);
+      } catch (KeeperException.NodeExistsException e) {
+        // This ensures we don't die if a race condition was met between checking existence and
+        // trying to create the node.
+        LOG.info("Node existed when trying to ensure path " + path + ", somebody beat us to it?");
+      }
+    }
+  }
+
+  /**
+   * Validate and return a normalized zookeeper path which doesn't contain consecutive slashes and
+   * never ends with a slash (except for root path).
+   *
+   * @param path the path to be normalized
+   * @return normalized path string
+   */
+  public static String normalizePath(String path) {
+    String normalizedPath = path.replaceAll("//+", "/").replaceFirst("(.+)/$", "$1");
+    PathUtils.validatePath(normalizedPath);
+    return normalizedPath;
+  }
+
+  private ZooKeeperUtils() {
+    // utility
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java
new file mode 100644
index 0000000..ba09279
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper.testing;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.Credentials;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient;
+
+/**
+ * A base-class for tests that interact with ZooKeeper via the commons ZooKeeperClient.
+ */
+public abstract class BaseZooKeeperClientTest extends BaseZooKeeperTest {
+
+  private final Amount<Integer, Time> defaultSessionTimeout;
+
+  /**
+   * Creates a test case where the test server uses its
+   * {@link ZooKeeperTestServer#DEFAULT_SESSION_TIMEOUT} for clients created without an explicit
+   * session timeout.
+   */
+  public BaseZooKeeperClientTest() {
+    this(ZooKeeperTestServer.DEFAULT_SESSION_TIMEOUT);
+  }
+
+  /**
+   * Creates a test case where the test server uses the given {@code defaultSessionTimeout} for
+   * clients created without an explicit session timeout.
+   */
+  public BaseZooKeeperClientTest(Amount<Integer, Time> defaultSessionTimeout) {
+    this.defaultSessionTimeout = Preconditions.checkNotNull(defaultSessionTimeout);
+  }
+
+
+  /**
+   * Starts zookeeper back up on the last used port.
+   */
+  protected final void restartNetwork() throws IOException, InterruptedException {
+    getServer().restartNetwork();
+  }
+
+  /**
+   * Shuts down the in-process zookeeper network server.
+   */
+  protected final void shutdownNetwork() {
+    getServer().shutdownNetwork();
+  }
+
+  /**
+   * Expires the active session for the given client.  The client should be one returned from
+   * {@link #createZkClient}.
+   *
+   * @param zkClient the client to expire
+   * @throws ZooKeeperClient.ZooKeeperConnectionException if a problem is encountered connecting to
+   *    the local zk server while trying to expire the session
+   * @throws InterruptedException if interrupted while requesting expiration
+   */
+  protected final void expireSession(ZooKeeperClient zkClient)
+      throws ZooKeeperClient.ZooKeeperConnectionException, InterruptedException {
+    getServer().expireClientSession(zkClient.get().getSessionId());
+  }
+
+  /**
+   * Returns the current port to connect to the in-process zookeeper instance.
+   */
+  protected final int getPort() {
+    return getServer().getPort();
+  }
+
+  /**
+   * Returns a new unauthenticated zookeeper client connected to the in-process zookeeper server
+   * with the default session timeout.
+   */
+  protected final ZooKeeperClient createZkClient() {
+    return createZkClient(defaultSessionTimeout, Optional.absent(), Optional.absent());
+  }
+
+  /**
+   * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with
+   * the default session timeout.
+   */
+  protected final ZooKeeperClient createZkClient(Credentials credentials) {
+    return createZkClient(defaultSessionTimeout, Optional.of(credentials), Optional.absent());
+  }
+
+  /**
+   * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with
+   * the default session timeout.  The client is authenticated in the digest authentication scheme
+   * with the given {@code username} and {@code password}.
+   */
+  protected final ZooKeeperClient createZkClient(String username, String password) {
+    return createZkClient(Credentials.digestCredentials(username, password));
+  }
+
+  /**
+   * Returns a new unauthenticated zookeeper client connected to the in-process zookeeper server
+   * with a custom {@code sessionTimeout}.
+   */
+  protected final ZooKeeperClient createZkClient(Amount<Integer, Time> sessionTimeout) {
+    return createZkClient(sessionTimeout, Optional.absent(), Optional.absent());
+  }
+
+  /**
+   * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with
+   * the default session timeout and the custom chroot path.
+   */
+  protected final ZooKeeperClient createZkClient(String chrootPath) {
+    return createZkClient(defaultSessionTimeout, Optional.absent(),
+        Optional.of(chrootPath));
+  }
+
+  private ZooKeeperClient createZkClient(
+      Amount<Integer, Time> sessionTimeout,
+      Optional<Credentials> credentials,
+      Optional<String> chrootPath) {
+
+    ZooKeeperClient client = new ZooKeeperClient(sessionTimeout, credentials, chrootPath,
+        ImmutableList.of(InetSocketAddress.createUnresolved("127.0.0.1", getPort())));
+    addTearDown(client::close);
+    return client;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java
new file mode 100644
index 0000000..0e68987
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper.testing;
+
+import org.apache.aurora.common.testing.TearDownTestCase;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * A base-class for in-process zookeeper tests.
+ */
+public abstract class BaseZooKeeperTest extends TearDownTestCase {
+
+  private ZooKeeperTestServer zkTestServer;
+
+  @Rule
+  public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @Before
+  public final void setUp() throws Exception {
+    zkTestServer = new ZooKeeperTestServer(tmpFolder.newFolder(), tmpFolder.newFolder());
+    addTearDown(zkTestServer::stop);
+    zkTestServer.startNetwork();
+  }
+
+  /**
+   * Returns the running in-process ZooKeeper server.
+   *
+   * @return The in-process ZooKeeper server.
+   */
+  protected final ZooKeeperTestServer getServer() {
+    return zkTestServer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
new file mode 100644
index 0000000..50acaeb
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper.testing;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooKeeperServer.BasicDataTreeBuilder;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+
+/**
+ * A helper class for starting in-process ZooKeeper server and clients.
+ *
+ * <p>This is ONLY meant to be used for testing.
+ */
+public class ZooKeeperTestServer {
+
+  static final Amount<Integer, Time> DEFAULT_SESSION_TIMEOUT = Amount.of(100, Time.MILLISECONDS);
+
+  private final File dataDir;
+  private final File snapDir;
+
+  private ZooKeeperServer zooKeeperServer;
+  private ServerCnxnFactory connectionFactory;
+  private int port;
+
+  public ZooKeeperTestServer(File dataDir, File snapDir) {
+    this.dataDir = Preconditions.checkNotNull(dataDir);
+    this.snapDir = Preconditions.checkNotNull(snapDir);
+  }
+
+  /**
+   * Starts zookeeper up on an ephemeral port.
+   */
+  public void startNetwork() throws IOException, InterruptedException {
+    zooKeeperServer =
+        new ZooKeeperServer(
+            new FileTxnSnapLog(dataDir, snapDir),
+            new BasicDataTreeBuilder()) {
+
+          // TODO(John Sirois): Introduce a builder to configure the in-process server if and when
+          // some folks need JMX for in-process tests.
+          @Override protected void registerJMX() {
+            // noop
+          }
+        };
+
+    connectionFactory = new NIOServerCnxnFactory();
+    connectionFactory.configure(
+        new InetSocketAddress(port),
+        60 /* Semi-arbitrary, max 60 connections is the default used by NIOServerCnxnFactory */);
+    connectionFactory.startup(zooKeeperServer);
+    port = zooKeeperServer.getClientPort();
+  }
+
+  /**
+   * Stops the zookeeper server.
+   */
+  public void stop() {
+    shutdownNetwork();
+  }
+
+  /**
+   * Starts zookeeper back up on the last used port.
+   */
+  final void restartNetwork() throws IOException, InterruptedException {
+    checkEphemeralPortAssigned();
+    Preconditions.checkState(connectionFactory == null);
+    startNetwork();
+  }
+
+  /**
+   * Shuts down the in-process zookeeper network server.
+   */
+  final void shutdownNetwork() {
+    if (connectionFactory != null) {
+      connectionFactory.shutdown(); // Also shuts down zooKeeperServer.
+      connectionFactory = null;
+    }
+  }
+
+  /**
+   * Expires the client session with the given {@code sessionId}.
+   *
+   * @param sessionId The id of the client session to expire.
+   */
+  public final void expireClientSession(long sessionId) {
+    zooKeeperServer.closeSession(sessionId);
+  }
+
+  /**
+   * Returns the current port to connect to the in-process zookeeper instance.
+   */
+  public final int getPort() {
+    checkEphemeralPortAssigned();
+    return port;
+  }
+
+  private void checkEphemeralPortAssigned() {
+    Preconditions.checkState(port > 0, "startNetwork must be called first");
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java
new file mode 100644
index 0000000..9c0cebe
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+
+import org.apache.aurora.common.base.ExceptionalCommand;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+public class CandidateImplTest extends BaseZooKeeperClientTest {
+  private static final List<ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+  private static final String SERVICE = "/twitter/services/puffin_linkhose/leader";
+  private static final Amount<Integer, Time> TIMEOUT = Amount.of(1, Time.MINUTES);
+
+  private LinkedBlockingDeque<CandidateImpl> candidateBuffer;
+
+  @Before
+  public void mySetUp() throws IOException {
+    candidateBuffer = new LinkedBlockingDeque<>();
+  }
+
+  private Group createGroup(ZooKeeperClient zkClient) throws IOException {
+    return new Group(zkClient, ACL, SERVICE);
+  }
+
+  private class Reign implements Candidate.Leader {
+    private ExceptionalCommand<Group.JoinException> abdicate;
+    private final CandidateImpl candidate;
+    private final String id;
+    private CountDownLatch defeated = new CountDownLatch(1);
+
+    Reign(String id, CandidateImpl candidate) {
+      this.id = id;
+      this.candidate = candidate;
+    }
+
+    @Override
+    public void onElected(ExceptionalCommand<Group.JoinException> abdicate) {
+      candidateBuffer.offerFirst(candidate);
+      this.abdicate = abdicate;
+    }
+
+    @Override
+    public void onDefeated() {
+      defeated.countDown();
+    }
+
+    public void abdicate() throws Group.JoinException {
+      Preconditions.checkState(abdicate != null);
+      abdicate.execute();
+    }
+
+    public void expectDefeated() throws InterruptedException {
+      defeated.await();
+    }
+
+    @Override
+    public String toString() {
+      return id;
+    }
+  }
+
+  @Test
+  public void testOfferLeadership() throws Exception {
+    ZooKeeperClient zkClient1 = createZkClient(TIMEOUT);
+    final CandidateImpl candidate1 = new CandidateImpl(createGroup(zkClient1)) {
+      @Override public String toString() {
+        return "Leader1";
+      }
+    };
+    ZooKeeperClient zkClient2 = createZkClient(TIMEOUT);
+    final CandidateImpl candidate2 = new CandidateImpl(createGroup(zkClient2)) {
+      @Override public String toString() {
+        return "Leader2";
+      }
+    };
+    ZooKeeperClient zkClient3 = createZkClient(TIMEOUT);
+    final CandidateImpl candidate3 = new CandidateImpl(createGroup(zkClient3)) {
+      @Override public String toString() {
+        return "Leader3";
+      }
+    };
+
+    Reign candidate1Reign = new Reign("1", candidate1);
+    Reign candidate2Reign = new Reign("2", candidate2);
+    Reign candidate3Reign = new Reign("3", candidate3);
+
+    Supplier<Boolean> candidate1Leader = candidate1.offerLeadership(candidate1Reign);
+    Supplier<Boolean> candidate2Leader = candidate2.offerLeadership(candidate2Reign);
+    Supplier<Boolean> candidate3Leader = candidate3.offerLeadership(candidate3Reign);
+
+    assertTrue("Since initial group join is synchronous, candidate 1 should be the first leader",
+        candidate1Leader.get());
+
+    shutdownNetwork();
+    restartNetwork();
+
+    assertTrue("A re-connect without a session expiration should leave the leader elected",
+        candidate1Leader.get());
+
+    candidate1Reign.abdicate();
+    assertSame(candidate1, candidateBuffer.takeLast());
+    assertFalse(candidate1Leader.get());
+    // Active abdication should trigger defeat.
+    candidate1Reign.expectDefeated();
+
+    CandidateImpl secondCandidate = candidateBuffer.takeLast();
+    assertTrue("exactly 1 remaining candidate should now be leader: " + secondCandidate + " "
+               + candidateBuffer,
+        candidate2Leader.get() ^ candidate3Leader.get());
+
+    if (secondCandidate == candidate2) {
+      expireSession(zkClient2);
+      assertSame(candidate3, candidateBuffer.takeLast());
+      assertTrue(candidate3Leader.get());
+      // Passive expiration should trigger defeat.
+      candidate2Reign.expectDefeated();
+    } else {
+      expireSession(zkClient3);
+      assertSame(candidate2, candidateBuffer.takeLast());
+      assertTrue(candidate2Leader.get());
+      // Passive expiration should trigger defeat.
+      candidate3Reign.expectDefeated();
+    }
+  }
+
+  @Test
+  public void testEmptyMembership() throws Exception {
+    ZooKeeperClient zkClient1 = createZkClient(TIMEOUT);
+    final CandidateImpl candidate1 = new CandidateImpl(createGroup(zkClient1));
+    Reign candidate1Reign = new Reign("1", candidate1);
+
+    candidate1.offerLeadership(candidate1Reign);
+    assertSame(candidate1, candidateBuffer.takeLast());
+    candidate1Reign.abdicate();
+    assertFalse(candidate1.getLeaderData().isPresent());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java
new file mode 100644
index 0000000..97a42d1
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.common.zookeeper.Group.GroupChangeListener;
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+import org.apache.aurora.common.zookeeper.Group.Membership;
+import org.apache.aurora.common.zookeeper.Group.NodeScheme;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.fail;
+
+public class GroupTest extends BaseZooKeeperClientTest {
+
+  private ZooKeeperClient zkClient;
+  private Group joinGroup;
+  private Group watchGroup;
+  private Command stopWatching;
+  private Command onLoseMembership;
+
+  private RecordingListener listener;
+
+  public GroupTest() {
+    super(Amount.of(1, Time.DAYS));
+  }
+
+  @Before
+  public void mySetUp() throws Exception {
+    onLoseMembership = createMock(Command.class);
+
+    zkClient = createZkClient("group", "test");
+    joinGroup = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group");
+    watchGroup = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group");
+
+    listener = new RecordingListener();
+    stopWatching = watchGroup.watch(listener);
+  }
+
+  private static class RecordingListener implements GroupChangeListener {
+    private final LinkedBlockingQueue<Iterable<String>> membershipChanges =
+        new LinkedBlockingQueue<Iterable<String>>();
+
+    @Override
+    public void onGroupChange(Iterable<String> memberIds) {
+      membershipChanges.add(memberIds);
+    }
+
+    public Iterable<String> take() throws InterruptedException {
+      return membershipChanges.take();
+    }
+
+    public void assertEmpty() {
+      assertEquals(ImmutableList.<Iterable<String>>of(), ImmutableList.copyOf(membershipChanges));
+    }
+
+    @Override
+    public String toString() {
+      return membershipChanges.toString();
+    }
+  }
+
+  private static class CustomScheme implements NodeScheme {
+    static final String NODE_NAME = "custom_name";
+
+    @Override
+    public boolean isMember(String nodeName) {
+      return NODE_NAME.equals(nodeName);
+    }
+
+    @Override
+    public String createName(byte[] membershipData) {
+      return NODE_NAME;
+    }
+
+    @Override
+    public boolean isSequential() {
+      return false;
+    }
+  }
+
+  @Test
+  public void testSessionExpirationTriggersOnLoseMembership() throws Exception {
+    final CountDownLatch lostMembership = new CountDownLatch(1);
+    Command onLoseMembership = lostMembership::countDown;
+    assertEmptyMembershipObserved();
+
+    Membership membership = joinGroup.join(onLoseMembership);
+    assertMembershipObserved(membership.getMemberId());
+    expireSession(zkClient);
+
+    lostMembership.await(); // Will hang this test if onLoseMembership event is not propagated.
+  }
+
+  @Test
+  public void testNodeDeleteTriggersOnLoseMembership() throws Exception {
+    final CountDownLatch lostMembership = new CountDownLatch(1);
+    Command onLoseMembership = lostMembership::countDown;
+    assertEmptyMembershipObserved();
+
+    Membership membership = joinGroup.join(onLoseMembership);
+    assertMembershipObserved(membership.getMemberId());
+    membership.cancel();
+
+    lostMembership.await(); // Will hang this test if onLoseMembership event is not propagated.
+  }
+
+  @Test
+  public void testJoinsAndWatchesSurviveDisconnect() throws Exception {
+    replay(onLoseMembership);
+
+    assertEmptyMembershipObserved();
+
+    Membership membership = joinGroup.join();
+    String originalMemberId = membership.getMemberId();
+    assertMembershipObserved(originalMemberId);
+
+    shutdownNetwork();
+    restartNetwork();
+
+    // The member should still be present under existing ephemeral node since session did not
+    // expire.
+    watchGroup.watch(listener);
+    assertMembershipObserved(originalMemberId);
+
+    membership.cancel();
+
+    assertEmptyMembershipObserved();
+    assertEmptyMembershipObserved(); // and again for 2nd listener
+
+    listener.assertEmpty();
+
+    verify(onLoseMembership);
+    reset(onLoseMembership); // Turn off expectations during ZK server shutdown.
+  }
+
+  @Test
+  public void testJoinsAndWatchesSurviveExpiredSession() throws Exception {
+    onLoseMembership.execute();
+    replay(onLoseMembership);
+
+    assertEmptyMembershipObserved();
+
+    Membership membership = joinGroup.join(onLoseMembership);
+    String originalMemberId = membership.getMemberId();
+    assertMembershipObserved(originalMemberId);
+
+    expireSession(zkClient);
+
+    // We should have lost our group membership and then re-gained it with a new ephemeral node.
+    // We may or may-not see the intermediate state change but we must see the final state
+    Iterable<String> members = listener.take();
+    if (Iterables.isEmpty(members)) {
+      members = listener.take();
+    }
+    assertEquals(1, Iterables.size(members));
+    assertNotEquals(originalMemberId, Iterables.getOnlyElement(members));
+    assertNotEquals(originalMemberId, membership.getMemberId());
+
+    listener.assertEmpty();
+
+    verify(onLoseMembership);
+    reset(onLoseMembership); // Turn off expectations during ZK server shutdown.
+  }
+
+  @Test
+  public void testJoinCustomNamingScheme() throws Exception {
+    Group group = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group",
+        new CustomScheme());
+
+    listener = new RecordingListener();
+    group.watch(listener);
+    assertEmptyMembershipObserved();
+
+    Membership membership = group.join();
+    String memberId = membership.getMemberId();
+
+    assertEquals("Wrong member ID.", CustomScheme.NODE_NAME, memberId);
+    assertMembershipObserved(memberId);
+
+    expireSession(zkClient);
+  }
+
+  @Test
+  public void testUpdateMembershipData() throws Exception {
+    Supplier<byte[]> dataSupplier = new EasyMockTest.Clazz<Supplier<byte[]>>() {}.createMock();
+
+    byte[] initial = "start".getBytes();
+    expect(dataSupplier.get()).andReturn(initial);
+
+    byte[] second = "update".getBytes();
+    expect(dataSupplier.get()).andReturn(second);
+
+    replay(dataSupplier);
+
+    Membership membership = joinGroup.join(dataSupplier, onLoseMembership);
+    assertArrayEquals("Initial setting is incorrect.", initial, zkClient.get()
+        .getData(membership.getMemberPath(), false, null));
+
+    assertArrayEquals("Updating supplier should not change membership data",
+        initial, zkClient.get().getData(membership.getMemberPath(), false, null));
+
+    membership.updateMemberData();
+    assertArrayEquals("Updating membership should change data",
+        second, zkClient.get().getData(membership.getMemberPath(), false, null));
+
+    verify(dataSupplier);
+  }
+
+  @Test
+  public void testAcls() throws Exception {
+    Group securedMembership =
+        new Group(createZkClient("secured", "group"), ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL,
+            "/secured/group/membership");
+
+    String memberId = securedMembership.join().getMemberId();
+
+    Group unauthenticatedObserver =
+        new Group(createZkClient(),
+            Ids.READ_ACL_UNSAFE,
+            "/secured/group/membership");
+    RecordingListener unauthenticatedListener = new RecordingListener();
+    unauthenticatedObserver.watch(unauthenticatedListener);
+
+    assertMembershipObserved(unauthenticatedListener, memberId);
+
+    try {
+      unauthenticatedObserver.join();
+      fail("Expected join exception for unauthenticated observer");
+    } catch (JoinException e) {
+      // expected
+    }
+
+    Group unauthorizedObserver =
+        new Group(createZkClient("joe", "schmoe"),
+            Ids.READ_ACL_UNSAFE,
+            "/secured/group/membership");
+    RecordingListener unauthorizedListener = new RecordingListener();
+    unauthorizedObserver.watch(unauthorizedListener);
+
+    assertMembershipObserved(unauthorizedListener, memberId);
+
+    try {
+      unauthorizedObserver.join();
+      fail("Expected join exception for unauthorized observer");
+    } catch (JoinException e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testStopWatching() throws Exception {
+    replay(onLoseMembership);
+
+    assertEmptyMembershipObserved();
+
+    Membership member1 = joinGroup.join();
+    String memberId1 = member1.getMemberId();
+    assertMembershipObserved(memberId1);
+
+    Membership member2 = joinGroup.join();
+    String memberId2 = member2.getMemberId();
+    assertMembershipObserved(memberId1, memberId2);
+
+    stopWatching.execute();
+
+    member1.cancel();
+    Membership member3 = joinGroup.join();
+    member2.cancel();
+    member3.cancel();
+
+    listener.assertEmpty();
+  }
+
+  private void assertEmptyMembershipObserved() throws InterruptedException {
+    assertMembershipObserved();
+  }
+
+  private void assertMembershipObserved(String... expectedMemberIds) throws InterruptedException {
+    assertMembershipObserved(listener, expectedMemberIds);
+  }
+
+  private void assertMembershipObserved(RecordingListener listener, String... expectedMemberIds)
+      throws InterruptedException {
+
+    assertEquals(ImmutableSet.copyOf(expectedMemberIds), ImmutableSet.copyOf(listener.take()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java
new file mode 100644
index 0000000..2166123
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.Gson;
+import com.google.gson.JsonIOException;
+
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(Gson.class)
+public class JsonCodecTest {
+
+  private static final Codec<ServiceInstance> STANDARD_JSON_CODEC = new JsonCodec();
+
+  @Test
+  public void testJsonCodecRoundtrip() throws Exception {
+    Codec<ServiceInstance> codec = STANDARD_JSON_CODEC;
+    ServiceInstance instance1 = new ServiceInstance(
+        new Endpoint("foo", 1000),
+        ImmutableMap.of("http", new Endpoint("foo", 8080)),
+        Status.ALIVE)
+        .setShard(0);
+    byte[] data = ServerSets.serializeServiceInstance(instance1, codec);
+    assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+    assertTrue(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+
+    ServiceInstance instance2 = new ServiceInstance(
+        new Endpoint("foo", 1000),
+        ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)),
+        Status.ALIVE);
+    data = ServerSets.serializeServiceInstance(instance2, codec);
+    assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+    assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+
+    ServiceInstance instance3 = new ServiceInstance(
+        new Endpoint("foo", 1000),
+        ImmutableMap.<String, Endpoint>of(),
+        Status.ALIVE);
+    data = ServerSets.serializeServiceInstance(instance3, codec);
+    assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+    assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+  }
+
+  @Test
+  public void testJsonCompatibility() throws IOException {
+    ServiceInstance instance = new ServiceInstance(
+        new Endpoint("foo", 1000),
+        ImmutableMap.of("http", new Endpoint("foo", 8080)),
+        Status.ALIVE).setShard(42);
+
+    ByteArrayOutputStream results = new ByteArrayOutputStream();
+    STANDARD_JSON_CODEC.serialize(instance, results);
+    assertEquals(
+        "{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000},"
+            + "\"additionalEndpoints\":{\"http\":{\"host\":\"foo\",\"port\":8080}},"
+            + "\"status\":\"ALIVE\","
+            + "\"shard\":42}",
+        results.toString());
+  }
+
+  @Test
+  public void testInvalidSerialize() {
+    // Gson is final so we need to call on PowerMock here.
+    Gson gson = PowerMock.createMock(Gson.class);
+    gson.toJson(EasyMock.isA(Object.class), EasyMock.isA(Appendable.class));
+    EasyMock.expectLastCall().andThrow(new JsonIOException("error"));
+    PowerMock.replay(gson);
+
+    ServiceInstance instance =
+        new ServiceInstance(new Endpoint("foo", 1000), ImmutableMap.of(), Status.ALIVE);
+
+    try {
+      new JsonCodec(gson).serialize(instance, new ByteArrayOutputStream());
+      fail();
+    } catch (IOException e) {
+      // Expected.
+    }
+
+    PowerMock.verify(gson);
+  }
+
+  @Test
+  public void testDeserializeMinimal() throws IOException {
+    String minimal = "{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000},\"status\":\"ALIVE\"}";
+    ByteArrayInputStream source = new ByteArrayInputStream(minimal.getBytes(Charsets.UTF_8));
+    ServiceInstance actual = STANDARD_JSON_CODEC.deserialize(source);
+    ServiceInstance expected =
+        new ServiceInstance(new Endpoint("foo", 1000), ImmutableMap.of(), Status.ALIVE);
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testInvalidDeserialize() {
+    // Not JSON.
+    assertInvalidDeserialize(new byte[] {0xC, 0xA, 0xF, 0xE});
+
+    // No JSON object.
+    assertInvalidDeserialize("");
+    assertInvalidDeserialize("[]");
+
+    // Missing required fields.
+    assertInvalidDeserialize("{}");
+    assertInvalidDeserialize("{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000}}");
+    assertInvalidDeserialize("{\"status\":\"ALIVE\"}");
+  }
+
+  private void assertInvalidDeserialize(String data) {
+    assertInvalidDeserialize(data.getBytes(Charsets.UTF_8));
+  }
+
+  private void assertInvalidDeserialize(byte[] data) {
+    try {
+      STANDARD_JSON_CODEC.deserialize(new ByteArrayInputStream(data));
+      fail();
+    } catch (IOException e) {
+      // Expected.
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
new file mode 100644
index 0000000..f0c0cb4
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.net.pool.DynamicHostSet;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.easymock.IMocksControl;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createControl;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ *
+ * TODO(William Farner): Change this to remove thrift dependency.
+ */
+public class ServerSetImplTest extends BaseZooKeeperClientTest {
+  private static final List<ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+  private static final String SERVICE = "/twitter/services/puffin_hosebird";
+
+  private LinkedBlockingQueue<ImmutableSet<ServiceInstance>> serverSetBuffer;
+  private DynamicHostSet.HostChangeMonitor<ServiceInstance> serverSetMonitor;
+
+  @Before
+  public void mySetUp() throws IOException {
+    serverSetBuffer = new LinkedBlockingQueue<>();
+    serverSetMonitor = serverSetBuffer::offer;
+  }
+
+  private ServerSetImpl createServerSet() throws IOException {
+    return new ServerSetImpl(createZkClient(), ACL, SERVICE);
+  }
+
+  @Test
+  public void testLifecycle() throws Exception {
+    ServerSetImpl client = createServerSet();
+    client.watch(serverSetMonitor);
+    assertChangeFiredEmpty();
+
+    ServerSetImpl server = createServerSet();
+    ServerSet.EndpointStatus status = server.join(
+        InetSocketAddress.createUnresolved("foo", 1234), makePortMap("http-admin", 8080));
+
+    ServiceInstance serviceInstance = new ServiceInstance(
+        new Endpoint("foo", 1234),
+        ImmutableMap.of("http-admin", new Endpoint("foo", 8080)),
+        Status.ALIVE);
+
+    assertChangeFired(serviceInstance);
+
+    status.leave();
+    assertChangeFiredEmpty();
+    assertTrue(serverSetBuffer.isEmpty());
+  }
+
+  @Test
+  public void testMembershipChanges() throws Exception {
+    ServerSetImpl client = createServerSet();
+    client.watch(serverSetMonitor);
+    assertChangeFiredEmpty();
+
+    ServerSetImpl server = createServerSet();
+
+    ServerSet.EndpointStatus foo = join(server, "foo");
+    assertChangeFired("foo");
+
+    expireSession(client.getZkClient());
+
+    ServerSet.EndpointStatus bar = join(server, "bar");
+
+    // We should've auto re-monitored membership, but not been notifed of "foo" since this was not a
+    // change, just "foo", "bar" since this was an addition.
+    assertChangeFired("foo", "bar");
+
+    foo.leave();
+    assertChangeFired("bar");
+
+    ServerSet.EndpointStatus baz = join(server, "baz");
+    assertChangeFired("bar", "baz");
+
+    baz.leave();
+    assertChangeFired("bar");
+
+    bar.leave();
+    assertChangeFiredEmpty();
+
+    assertTrue(serverSetBuffer.isEmpty());
+  }
+
+  @Test
+  public void testStopMonitoring() throws Exception {
+    ServerSetImpl client = createServerSet();
+    Command stopMonitoring = client.watch(serverSetMonitor);
+    assertChangeFiredEmpty();
+
+    ServerSetImpl server = createServerSet();
+
+    ServerSet.EndpointStatus foo = join(server, "foo");
+    assertChangeFired("foo");
+    ServerSet.EndpointStatus bar = join(server, "bar");
+    assertChangeFired("foo", "bar");
+
+    stopMonitoring.execute();
+
+    // No new updates should be received since monitoring has stopped.
+    foo.leave();
+    assertTrue(serverSetBuffer.isEmpty());
+
+    // Expiration event.
+    assertTrue(serverSetBuffer.isEmpty());
+  }
+
+  @Test
+  public void testOrdering() throws Exception {
+    ServerSetImpl client = createServerSet();
+    client.watch(serverSetMonitor);
+    assertChangeFiredEmpty();
+
+    Map<String, InetSocketAddress> server1Ports = makePortMap("http-admin1", 8080);
+    Map<String, InetSocketAddress> server2Ports = makePortMap("http-admin2", 8081);
+    Map<String, InetSocketAddress> server3Ports = makePortMap("http-admin3", 8082);
+
+    ServerSetImpl server1 = createServerSet();
+    ServerSetImpl server2 = createServerSet();
+    ServerSetImpl server3 = createServerSet();
+
+    ServiceInstance instance1 = new ServiceInstance(
+        new Endpoint("foo", 1000),
+        ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)),
+        Status.ALIVE);
+    ServiceInstance instance2 = new ServiceInstance(
+        new Endpoint("foo", 1001),
+        ImmutableMap.of("http-admin2", new Endpoint("foo", 8081)),
+        Status.ALIVE);
+    ServiceInstance instance3 = new ServiceInstance(
+        new Endpoint("foo", 1002),
+        ImmutableMap.of("http-admin3", new Endpoint("foo", 8082)),
+        Status.ALIVE);
+
+    server1.join(InetSocketAddress.createUnresolved("foo", 1000), server1Ports);
+    assertEquals(ImmutableList.of(instance1), ImmutableList.copyOf(serverSetBuffer.take()));
+
+    ServerSet.EndpointStatus status2 = server2.join(
+        InetSocketAddress.createUnresolved("foo", 1001),
+        server2Ports);
+    assertEquals(ImmutableList.of(instance1, instance2),
+        ImmutableList.copyOf(serverSetBuffer.take()));
+
+    server3.join(InetSocketAddress.createUnresolved("foo", 1002), server3Ports);
+    assertEquals(ImmutableList.of(instance1, instance2, instance3),
+        ImmutableList.copyOf(serverSetBuffer.take()));
+
+    status2.leave();
+    assertEquals(ImmutableList.of(instance1, instance3),
+        ImmutableList.copyOf(serverSetBuffer.take()));
+  }
+
+  @Test
+  public void testUnwatchOnException() throws Exception {
+    IMocksControl control = createControl();
+
+    ZooKeeperClient zkClient = control.createMock(ZooKeeperClient.class);
+    Watcher onExpirationWatcher = control.createMock(Watcher.class);
+
+    expect(zkClient.registerExpirationHandler(anyObject(Command.class)))
+        .andReturn(onExpirationWatcher);
+
+    expect(zkClient.get()).andThrow(new InterruptedException());  // See interrupted() note below.
+    expect(zkClient.unregister(onExpirationWatcher)).andReturn(true);
+    control.replay();
+
+    Group group = new Group(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, "/blabla");
+    ServerSetImpl serverset = new ServerSetImpl(zkClient, group);
+
+    try {
+      serverset.watch(hostSet -> {});
+      fail("Expected MonitorException");
+    } catch (DynamicHostSet.MonitorException e) {
+      // NB: The assert is not important to this test, but the call to `Thread.interrupted()` is.
+      // That call both returns the current interrupted status as well as clearing it.  The clearing
+      // is crucial depending on the order tests are run in this class.  If this test runs before
+      // one of the tests above that uses a `ZooKeeperClient` for example, those tests will fail
+      // executing `ZooKeeperClient.get` which internally blocks on s sync-point that takes part in
+      // the interruption mechanism and so immediately throws `InterruptedException` based on the
+      // un-cleared interrupted bit.
+      assertTrue(Thread.interrupted());
+    }
+    control.verify();
+  }
+
+  private static Map<String, InetSocketAddress> makePortMap(String name, int port) {
+    return ImmutableMap.of(name, InetSocketAddress.createUnresolved("foo", port));
+  }
+
+  private ServerSet.EndpointStatus join(ServerSet serverSet, String host)
+      throws JoinException, InterruptedException {
+
+    return serverSet.join(
+        InetSocketAddress.createUnresolved(host, 42), ImmutableMap.<String, InetSocketAddress>of());
+  }
+
+  private void assertChangeFired(String... serviceHosts)
+      throws InterruptedException {
+
+    assertChangeFired(ImmutableSet.copyOf(Iterables.transform(ImmutableSet.copyOf(serviceHosts),
+        serviceHost -> new ServiceInstance(new Endpoint(serviceHost, 42),
+            ImmutableMap.<String, Endpoint>of(), Status.ALIVE))));
+  }
+
+  protected void assertChangeFiredEmpty() throws InterruptedException {
+    assertChangeFired(ImmutableSet.<ServiceInstance>of());
+  }
+
+  protected void assertChangeFired(ServiceInstance... serviceInstances)
+      throws InterruptedException {
+    assertChangeFired(ImmutableSet.copyOf(serviceInstances));
+  }
+
+  protected void assertChangeFired(ImmutableSet<ServiceInstance> serviceInstances)
+      throws InterruptedException {
+    assertEquals(serviceInstances, serverSetBuffer.take());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java
new file mode 100644
index 0000000..0e67191
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ServerSetsTest {
+  @Test
+  public void testSimpleSerialization() throws Exception {
+    InetSocketAddress endpoint = new InetSocketAddress(12345);
+    Map<String, Endpoint > additionalEndpoints = ImmutableMap.of();
+    Status status = Status.ALIVE;
+
+    byte[] data = ServerSets.serializeServiceInstance(
+        endpoint, additionalEndpoints, status, ServerSet.JSON_CODEC);
+
+    ServiceInstance instance = ServerSets.deserializeServiceInstance(data, ServerSet.JSON_CODEC);
+
+    assertEquals(endpoint.getPort(), instance.getServiceEndpoint().getPort());
+    assertEquals(additionalEndpoints, instance.getAdditionalEndpoints());
+    assertEquals(Status.ALIVE, instance.getStatus());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java
new file mode 100644
index 0000000..5f6cdd8
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import org.apache.aurora.common.base.ExceptionalCommand;
+import org.apache.aurora.common.zookeeper.Candidate.Leader;
+import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl;
+import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
+import org.easymock.Capture;
+import org.easymock.IExpectationSetters;
+import org.easymock.IMocksControl;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.common.testing.easymock.EasyMockTest.createCapture;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.createControl;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.fail;
+
+public class SingletonServiceImplTest extends BaseZooKeeperClientTest {
+  private static final int PORT_A = 1234;
+  private static final int PORT_B = 8080;
+  private static final InetSocketAddress PRIMARY_ENDPOINT =
+      InetSocketAddress.createUnresolved("foo", PORT_A);
+  private static final Map<String, InetSocketAddress> AUX_ENDPOINTS =
+      ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo", PORT_B));
+
+  private IMocksControl control;
+  private SingletonServiceImpl.LeadershipListener listener;
+  private ServerSet serverSet;
+  private ServerSet.EndpointStatus endpointStatus;
+  private Candidate candidate;
+  private ExceptionalCommand<Group.JoinException> abdicate;
+
+  private SingletonService service;
+
+  @Before
+  @SuppressWarnings("unchecked")
+  public void mySetUp() throws IOException {
+    control = createControl();
+    addTearDown(control::verify);
+    listener = control.createMock(SingletonServiceImpl.LeadershipListener.class);
+    serverSet = control.createMock(ServerSet.class);
+    candidate = control.createMock(Candidate.class);
+    endpointStatus = control.createMock(ServerSet.EndpointStatus.class);
+    abdicate = control.createMock(ExceptionalCommand.class);
+
+    service = new SingletonServiceImpl(serverSet, candidate);
+  }
+
+  private void newLeader(
+      final String hostName,
+      Capture<Leader> leader,
+      LeadershipListener listener) throws Exception {
+
+    service.lead(InetSocketAddress.createUnresolved(hostName, PORT_A),
+        ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved(hostName, PORT_B)),
+        listener);
+
+    // This actually elects the leader.
+    leader.getValue().onElected(abdicate);
+  }
+
+  private void newLeader(String hostName, Capture<Leader> leader) throws Exception {
+    newLeader(hostName, leader, listener);
+  }
+
+  private IExpectationSetters<ServerSet.EndpointStatus> expectJoin() throws Exception {
+    return expect(serverSet.join(PRIMARY_ENDPOINT, AUX_ENDPOINTS));
+  }
+
+  @Test
+  public void testLeadAdvertise() throws Exception {
+    Capture<Leader> leaderCapture = createCapture();
+
+    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+    Capture<LeaderControl> controlCapture = createCapture();
+    listener.onLeading(capture(controlCapture));
+
+    expectJoin().andReturn(endpointStatus);
+    endpointStatus.leave();
+    abdicate.execute();
+
+    control.replay();
+
+    newLeader("foo", leaderCapture);
+    controlCapture.getValue().advertise();
+    controlCapture.getValue().leave();
+  }
+
+  @Test
+  public void teatLeadLeaveNoAdvertise() throws Exception {
+    Capture<Leader> leaderCapture = createCapture();
+
+    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+    abdicate.execute();
+
+    Capture<LeaderControl> controlCapture = createCapture();
+    listener.onLeading(capture(controlCapture));
+
+    control.replay();
+
+    newLeader("foo", leaderCapture);
+    controlCapture.getValue().leave();
+  }
+
+  @Test
+  public void testLeadJoinFailure() throws Exception {
+    Capture<Leader> leaderCapture = new Capture<Leader>();
+
+    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+    Capture<LeaderControl> controlCapture = createCapture();
+    listener.onLeading(capture(controlCapture));
+
+    expectJoin().andThrow(new Group.JoinException("Injected join failure.", new Exception()));
+    abdicate.execute();
+
+    control.replay();
+
+    newLeader("foo", leaderCapture);
+
+    try {
+      controlCapture.getValue().advertise();
+      fail("Join should have failed.");
+    } catch (SingletonService.AdvertiseException e) {
+      // Expected.
+    }
+
+    controlCapture.getValue().leave();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testMultipleAdvertise() throws Exception {
+    Capture<Leader> leaderCapture = createCapture();
+
+    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+    Capture<LeaderControl> controlCapture = createCapture();
+    listener.onLeading(capture(controlCapture));
+
+    expectJoin().andReturn(endpointStatus);
+
+    control.replay();
+
+    newLeader("foo", leaderCapture);
+    controlCapture.getValue().advertise();
+    controlCapture.getValue().advertise();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testMultipleLeave() throws Exception {
+    Capture<Leader> leaderCapture = createCapture();
+
+    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+    Capture<LeaderControl> controlCapture = createCapture();
+    listener.onLeading(capture(controlCapture));
+
+    expectJoin().andReturn(endpointStatus);
+    endpointStatus.leave();
+    abdicate.execute();
+
+    control.replay();
+
+    newLeader("foo", leaderCapture);
+    controlCapture.getValue().advertise();
+    controlCapture.getValue().leave();
+    controlCapture.getValue().leave();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testAdvertiseAfterLeave() throws Exception {
+    Capture<Leader> leaderCapture = createCapture();
+
+    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+    Capture<LeaderControl> controlCapture = createCapture();
+    listener.onLeading(capture(controlCapture));
+
+    abdicate.execute();
+
+    control.replay();
+
+    newLeader("foo", leaderCapture);
+    controlCapture.getValue().leave();
+    controlCapture.getValue().advertise();
+  }
+
+  @Test
+  public void testLeadMulti() throws Exception {
+    List<Capture<Leader>> leaderCaptures = Lists.newArrayList();
+    List<Capture<LeaderControl>> leaderControlCaptures = Lists.newArrayList();
+
+    for (int i = 0; i < 5; i++) {
+      Capture<Leader> leaderCapture = new Capture<Leader>();
+      leaderCaptures.add(leaderCapture);
+      Capture<LeaderControl> controlCapture = createCapture();
+      leaderControlCaptures.add(controlCapture);
+
+      expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+      listener.onLeading(capture(controlCapture));
+      InetSocketAddress primary = InetSocketAddress.createUnresolved("foo" + i, PORT_A);
+      Map<String, InetSocketAddress> aux =
+          ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo" + i, PORT_B));
+      expect(serverSet.join(primary, aux)).andReturn(endpointStatus);
+      endpointStatus.leave();
+      abdicate.execute();
+    }
+
+    control.replay();
+
+    for (int i = 0; i < 5; i++) {
+      final String leaderName = "foo" + i;
+      newLeader(leaderName, leaderCaptures.get(i));
+      leaderControlCaptures.get(i).getValue().advertise();
+      leaderControlCaptures.get(i).getValue().leave();
+    }
+  }
+
+  @Test
+  public void testLeaderLeaves() throws Exception {
+    control.replay();
+    shutdownNetwork();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java
new file mode 100644
index 0000000..5eee235
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
+import org.apache.zookeeper.KeeperException.NoAuthException;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * @author John Sirois
+ */
+public class ZooKeeperClientTest extends BaseZooKeeperClientTest {
+
+  public ZooKeeperClientTest() {
+    super(Amount.of(1, Time.DAYS));
+  }
+
+  @Test
+  public void testGet() throws Exception {
+    final ZooKeeperClient zkClient = createZkClient();
+    shutdownNetwork();
+    try {
+      zkClient.get(Amount.of(50L, Time.MILLISECONDS));
+      fail("Expected client connection to timeout while network down");
+    } catch (TimeoutException e) {
+      assertTrue(zkClient.isClosed());
+    }
+    assertNull(zkClient.getZooKeeperClientForTests());
+
+    final CountDownLatch blockingGetComplete = new CountDownLatch(1);
+    final AtomicReference<ZooKeeper> client = new AtomicReference<ZooKeeper>();
+    new Thread(() -> {
+      try {
+        client.set(zkClient.get());
+      } catch (ZooKeeperConnectionException e) {
+        throw new RuntimeException(e);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      } finally {
+        blockingGetComplete.countDown();
+      }
+    }).start();
+
+    restartNetwork();
+
+    // Hung blocking connects should succeed when server connection comes up
+    blockingGetComplete.await();
+    assertNotNull(client.get());
+
+    // New connections should succeed now that network is back up
+    long sessionId = zkClient.get().getSessionId();
+
+    // While connected the same client should be reused (no new connections while healthy)
+    assertSame(client.get(), zkClient.get());
+
+    shutdownNetwork();
+    // Our client doesn't know the network is down yet so we should be able to get()
+    ZooKeeper zooKeeper = zkClient.get();
+    try {
+      zooKeeper.exists("/", false);
+      fail("Expected client operation to fail while network down");
+    } catch (ConnectionLossException e) {
+      // expected
+    }
+
+    restartNetwork();
+    assertEquals("Expected connection to be re-established with existing session",
+        sessionId, zkClient.get().getSessionId());
+  }
+
+  /**
+   * Test that if a blocking get() call gets interrupted, after a connection has been created
+   * but before it's connected, the zk connection gets closed.
+   */
+  @Test
+  public void testGetInterrupted() throws Exception {
+    final ZooKeeperClient zkClient = createZkClient();
+    shutdownNetwork();
+
+    final CountDownLatch blockingGetComplete = new CountDownLatch(1);
+    final AtomicBoolean interrupted = new AtomicBoolean();
+    final AtomicReference<ZooKeeper> client = new AtomicReference<ZooKeeper>();
+    Thread getThread = new Thread(() -> {
+      try {
+        client.set(zkClient.get());
+      } catch (ZooKeeperConnectionException e) {
+        throw new RuntimeException(e);
+      } catch (InterruptedException e) {
+        interrupted.set(true);
+        throw new RuntimeException(e);
+      } finally {
+        blockingGetComplete.countDown();
+      }
+    });
+    getThread.start();
+
+    while (zkClient.getZooKeeperClientForTests() == null) {
+      Thread.sleep(100);
+    }
+
+    getThread.interrupt();
+    blockingGetComplete.await();
+
+    assertNull("The zk connection should have been closed", zkClient.getZooKeeperClientForTests());
+    assertTrue("The waiter thread should have been interrupted", interrupted.get());
+    assertTrue(zkClient.isClosed());
+  }
+
+  @Test
+  public void testClose() throws Exception {
+    ZooKeeperClient zkClient = createZkClient();
+    zkClient.close();
+
+    // Close should be idempotent
+    zkClient.close();
+
+    long firstSessionId = zkClient.get().getSessionId();
+
+    // Close on an open client should force session re-establishment
+    zkClient.close();
+
+    assertNotEquals(firstSessionId, zkClient.get().getSessionId());
+  }
+
+  @Test
+  public void testCredentials() throws Exception {
+    String path = "/test";
+    ZooKeeperClient authenticatedClient = createZkClient("creator", "creator");
+    assertEquals(path,
+        authenticatedClient.get().create(path, "42".getBytes(),
+            ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT));
+
+    ZooKeeperClient unauthenticatedClient = createZkClient();
+    assertEquals("42", getData(unauthenticatedClient, path));
+    try {
+      setData(unauthenticatedClient, path, "37");
+      fail("Expected unauthenticated write attempt to fail");
+    } catch (NoAuthException e) {
+      assertEquals("42", getData(unauthenticatedClient, path));
+    }
+
+    ZooKeeperClient nonOwnerClient = createZkClient("nonowner", "nonowner");
+    assertEquals("42", getData(nonOwnerClient, path));
+    try {
+      setData(nonOwnerClient, path, "37");
+      fail("Expected non owner write attempt to fail");
+    } catch (NoAuthException e) {
+      assertEquals("42", getData(nonOwnerClient, path));
+    }
+
+    ZooKeeperClient authenticatedClient2 = createZkClient("creator", "creator");
+    setData(authenticatedClient2, path, "37");
+    assertEquals("37", getData(authenticatedClient2, path));
+  }
+
+  @Test
+  public void testChrootPath() throws Exception {
+    ZooKeeperClient rootClient = createZkClient();
+    String rootPath = "/test";
+    String subPath = "/test/subtest";
+    assertEquals(rootPath,
+            rootClient.get().create(rootPath, "42".getBytes(),
+                ZooKeeperUtils.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+    assertEquals(subPath,
+            rootClient.get().create(subPath, "37".getBytes(),
+                ZooKeeperUtils.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+
+    ZooKeeperClient chrootedClient = createZkClient(rootPath);
+    assertArrayEquals("37".getBytes(), chrootedClient.get().getData("/subtest", false, null));
+  }
+
+  private void setData(ZooKeeperClient zkClient, String path, String data) throws Exception {
+    zkClient.get().setData(path, data.getBytes(), ZooKeeperUtils.ANY_VERSION);
+  }
+
+  private String getData(ZooKeeperClient zkClient, String path) throws Exception {
+    return new String(zkClient.get().getData(path, false, null));
+  }
+}