You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by dm...@apache.org on 2016/12/01 17:02:30 UTC

[1/4] aurora git commit: Revert removal of twitter/commons/zk based leadership code

Repository: aurora
Updated Branches:
  refs/heads/master 8bcad84dc -> 16e4651d5


http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/JsonCodecTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/JsonCodecTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/JsonCodecTest.java
deleted file mode 100644
index b88ba37..0000000
--- a/src/test/java/org/apache/aurora/scheduler/discovery/JsonCodecTest.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableMap;
-import com.google.gson.Gson;
-import com.google.gson.JsonIOException;
-
-import org.apache.aurora.common.thrift.Endpoint;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.thrift.Status;
-import org.easymock.EasyMock;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Gson.class)
-public class JsonCodecTest {
-
-  private static byte[] serializeServiceInstance(ServiceInstance serviceInstance)
-      throws IOException {
-
-    ByteArrayOutputStream output = new ByteArrayOutputStream();
-    JsonCodec.INSTANCE.serialize(serviceInstance, output);
-    return output.toByteArray();
-  }
-
-  private static ServiceInstance deserializeServiceInstance(byte[] data) throws IOException {
-    return JsonCodec.INSTANCE.deserialize(new ByteArrayInputStream(data));
-  }
-
-  @Test
-  public void testJsonCodecRoundtrip() throws Exception {
-    ServiceInstance instance1 = new ServiceInstance(
-        new Endpoint("foo", 1000),
-        ImmutableMap.of("http", new Endpoint("foo", 8080)),
-        Status.ALIVE)
-        .setShard(0);
-    byte[] data = serializeServiceInstance(instance1);
-    assertTrue(deserializeServiceInstance(data).getServiceEndpoint().isSetPort());
-    assertTrue(deserializeServiceInstance(data).isSetShard());
-
-    ServiceInstance instance2 = new ServiceInstance(
-        new Endpoint("foo", 1000),
-        ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)),
-        Status.ALIVE);
-    data = serializeServiceInstance(instance2);
-    assertTrue(deserializeServiceInstance(data).getServiceEndpoint().isSetPort());
-    assertFalse(deserializeServiceInstance(data).isSetShard());
-
-    ServiceInstance instance3 = new ServiceInstance(
-        new Endpoint("foo", 1000),
-        ImmutableMap.of(),
-        Status.ALIVE);
-    data = serializeServiceInstance(instance3);
-    assertTrue(deserializeServiceInstance(data).getServiceEndpoint().isSetPort());
-    assertFalse(deserializeServiceInstance(data).isSetShard());
-  }
-
-  @Test
-  public void testJsonCompatibility() throws IOException {
-    ServiceInstance instance = new ServiceInstance(
-        new Endpoint("foo", 1000),
-        ImmutableMap.of("http", new Endpoint("foo", 8080)),
-        Status.ALIVE).setShard(42);
-
-    ByteArrayOutputStream results = new ByteArrayOutputStream();
-    JsonCodec.INSTANCE.serialize(instance, results);
-    assertEquals(
-        "{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000},"
-            + "\"additionalEndpoints\":{\"http\":{\"host\":\"foo\",\"port\":8080}},"
-            + "\"status\":\"ALIVE\","
-            + "\"shard\":42}",
-        results.toString(Charsets.UTF_8.name()));
-  }
-
-  @Test
-  public void testInvalidSerialize() {
-    // Gson is final so we need to call on PowerMock here.
-    Gson gson = PowerMock.createMock(Gson.class);
-    gson.toJson(EasyMock.isA(Object.class), EasyMock.isA(Appendable.class));
-    EasyMock.expectLastCall().andThrow(new JsonIOException("error"));
-    PowerMock.replay(gson);
-
-    ServiceInstance instance =
-        new ServiceInstance(new Endpoint("foo", 1000), ImmutableMap.of(), Status.ALIVE);
-
-    try {
-      new JsonCodec(gson).serialize(instance, new ByteArrayOutputStream());
-      fail();
-    } catch (IOException e) {
-      // Expected.
-    }
-
-    PowerMock.verify(gson);
-  }
-
-  @Test
-  public void testDeserializeMinimal() throws IOException {
-    String minimal = "{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000},\"status\":\"ALIVE\"}";
-    ByteArrayInputStream source = new ByteArrayInputStream(minimal.getBytes(Charsets.UTF_8));
-    ServiceInstance actual = JsonCodec.INSTANCE.deserialize(source);
-    ServiceInstance expected =
-        new ServiceInstance(new Endpoint("foo", 1000), ImmutableMap.of(), Status.ALIVE);
-    assertEquals(expected, actual);
-  }
-
-  @Test
-  public void testInvalidDeserialize() {
-    // Not JSON.
-    assertInvalidDeserialize(new byte[] {0xC, 0xA, 0xF, 0xE});
-
-    // No JSON object.
-    assertInvalidDeserialize("");
-    assertInvalidDeserialize("[]");
-
-    // Missing required fields.
-    assertInvalidDeserialize("{}");
-    assertInvalidDeserialize("{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000}}");
-    assertInvalidDeserialize("{\"status\":\"ALIVE\"}");
-  }
-
-  private void assertInvalidDeserialize(String data) {
-    assertInvalidDeserialize(data.getBytes(Charsets.UTF_8));
-  }
-
-  private void assertInvalidDeserialize(byte[] data) {
-    try {
-      JsonCodec.INSTANCE.deserialize(new ByteArrayInputStream(data));
-      fail();
-    } catch (IOException e) {
-      // Expected.
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfigTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfigTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfigTest.java
index 4d833f2..a065505 100644
--- a/src/test/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfigTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfigTest.java
@@ -14,12 +14,14 @@
 package org.apache.aurora.scheduler.discovery;
 
 import java.net.InetSocketAddress;
-import java.util.Optional;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.Credentials;
+import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -35,22 +37,24 @@ public class ZooKeeperConfigTest {
   @Test(expected = IllegalArgumentException.class)
   public void testEmptyServers() {
     new ZooKeeperConfig(
+        false,
         ImmutableList.of(),
-        Optional.empty(),
+        Optional.absent(),
         false,
         Amount.of(1, Time.DAYS),
-        Optional.empty());
+        Optional.absent());
   }
 
   @Test
   public void testWithCredentials() {
     ZooKeeperConfig config =
         new ZooKeeperConfig(
+            false,
             SERVERS,
-            Optional.empty(),
+            Optional.absent(),
             false,
             Amount.of(1, Time.HOURS),
-            Optional.empty()); // credentials
+            Optional.absent()); // credentials
     assertFalse(config.getCredentials().isPresent());
 
     Credentials joeCreds = Credentials.digestCredentials("Joe", "Schmoe");
@@ -66,8 +70,9 @@ public class ZooKeeperConfigTest {
 
   @Test
   public void testCreateFactory() {
-    ZooKeeperConfig config = ZooKeeperConfig.create(SERVERS);
+    ZooKeeperConfig config = ZooKeeperConfig.create(true, SERVERS);
 
+    assertTrue(config.isUseCurator());
     assertEquals(SERVERS, ImmutableList.copyOf(config.getServers()));
     assertFalse(config.getChrootPath().isPresent());
     assertFalse(config.isInProcess());

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java b/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
index d9e7374..fb03f25 100644
--- a/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
@@ -48,9 +48,9 @@ import org.apache.aurora.scheduler.AppStartup;
 import org.apache.aurora.scheduler.SchedulerServicesModule;
 import org.apache.aurora.scheduler.TierManager;
 import org.apache.aurora.scheduler.app.LifecycleModule;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
 import org.apache.aurora.scheduler.async.AsyncModule;
 import org.apache.aurora.scheduler.cron.CronJobManager;
-import org.apache.aurora.scheduler.discovery.ServiceGroupMonitor;
 import org.apache.aurora.scheduler.http.api.GsonMessageBodyHandler;
 import org.apache.aurora.scheduler.offers.OfferManager;
 import org.apache.aurora.scheduler.scheduling.RescheduleCalculator;
@@ -132,8 +132,9 @@ public abstract class AbstractJettyTest extends EasyMockTest {
             bindMock(Thread.UncaughtExceptionHandler.class);
             bindMock(TaskGroups.TaskGroupBatchWorker.class);
 
-            bind(ServletContextListener.class)
-                .toProvider(() -> makeServletContextListener(injector, getChildServletModule()));
+            bind(ServletContextListener.class).toProvider(() -> {
+              return makeServletContextListener(injector, getChildServletModule());
+            });
           }
         },
         new JettyServerModule(false));
@@ -146,12 +147,12 @@ public abstract class AbstractJettyTest extends EasyMockTest {
     expect(serviceGroupMonitor.get()).andAnswer(schedulers::get).anyTimes();
   }
 
-  void setLeadingScheduler(String host, int port) {
+  protected void setLeadingScheduler(String host, int port) {
     schedulers.set(
         ImmutableSet.of(new ServiceInstance().setServiceEndpoint(new Endpoint(host, port))));
   }
 
-  void unsetLeadingSchduler() {
+  protected void unsetLeadingSchduler() {
     schedulers.set(ImmutableSet.of());
   }
 
@@ -161,7 +162,9 @@ public abstract class AbstractJettyTest extends EasyMockTest {
       ServiceManagerIface service =
           injector.getInstance(Key.get(ServiceManagerIface.class, AppStartup.class));
       service.startAsync().awaitHealthy();
-      addTearDown(() -> service.stopAsync().awaitStopped(5L, TimeUnit.SECONDS));
+      addTearDown(() -> {
+        service.stopAsync().awaitStopped(5L, TimeUnit.SECONDS);
+      });
     } catch (Exception e) {
       throw Throwables.propagate(e);
     }

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/http/LeaderRedirectTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/http/LeaderRedirectTest.java b/src/test/java/org/apache/aurora/scheduler/http/LeaderRedirectTest.java
index a308ba2..a16058f 100644
--- a/src/test/java/org/apache/aurora/scheduler/http/LeaderRedirectTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/http/LeaderRedirectTest.java
@@ -27,8 +27,8 @@ import com.google.common.net.HostAndPort;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.common.thrift.Endpoint;
 import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.scheduler.discovery.ServiceGroupMonitor;
-import org.apache.aurora.scheduler.discovery.ServiceGroupMonitor.MonitorException;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor.MonitorException;
 import org.junit.Before;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
index 0119ccb..86861e1 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
@@ -42,12 +42,12 @@ import org.apache.aurora.gen.TaskQuery;
 import org.apache.aurora.scheduler.TierModule;
 import org.apache.aurora.scheduler.app.AppModule;
 import org.apache.aurora.scheduler.app.LifecycleModule;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
 import org.apache.aurora.scheduler.app.local.FakeNonVolatileStorage;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager.ConfigurationManagerSettings;
 import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
 import org.apache.aurora.scheduler.cron.quartz.CronModule;
-import org.apache.aurora.scheduler.discovery.ServiceGroupMonitor;
 import org.apache.aurora.scheduler.mesos.DriverFactory;
 import org.apache.aurora.scheduler.mesos.DriverSettings;
 import org.apache.aurora.scheduler.mesos.TestExecutorSettings;


[3/4] aurora git commit: Revert removal of twitter/commons/zk based leadership code

Posted by dm...@apache.org.
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java
new file mode 100644
index 0000000..ce243fb
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java
@@ -0,0 +1,372 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.net.InetSocketAddressHelper;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.common.PathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages a connection to a ZooKeeper cluster.
+ */
+public class ZooKeeperClient {
+
+  /**
+   * Indicates an error connecting to a zookeeper cluster.
+   */
+  public class ZooKeeperConnectionException extends Exception {
+    ZooKeeperConnectionException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  private final class SessionState {
+    private final long sessionId;
+    private final byte[] sessionPasswd;
+
+    private SessionState(long sessionId, byte[] sessionPasswd) {
+      this.sessionId = sessionId;
+      this.sessionPasswd = sessionPasswd;
+    }
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperClient.class);
+
+  private static final Amount<Long,Time> WAIT_FOREVER = Amount.of(0L, Time.MILLISECONDS);
+
+  private final int sessionTimeoutMs;
+  private final Optional<Credentials> credentials;
+  private final String zooKeeperServers;
+  // GuardedBy "this", but still volatile for tests, where we want to be able to see writes
+  // made from within long synchronized blocks.
+  private volatile ZooKeeper zooKeeper;
+  private SessionState sessionState;
+
+  private final Set<Watcher> watchers = new CopyOnWriteArraySet<Watcher>();
+  private final BlockingQueue<WatchedEvent> eventQueue = new LinkedBlockingQueue<WatchedEvent>();
+
+  private static Iterable<InetSocketAddress> combine(InetSocketAddress address,
+      InetSocketAddress... addresses) {
+    return ImmutableSet.<InetSocketAddress>builder().add(address).add(addresses).build();
+  }
+
+  /**
+   * Creates an unconnected client that will lazily attempt to connect on the first call to
+   * {@link #get()}.
+   *
+   * @param sessionTimeout the ZK session timeout
+   * @param zooKeeperServer the first, required ZK server
+   * @param zooKeeperServers any additional servers forming the ZK cluster
+   */
+  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, InetSocketAddress zooKeeperServer,
+      InetSocketAddress... zooKeeperServers) {
+    this(sessionTimeout, combine(zooKeeperServer, zooKeeperServers));
+  }
+
+  /**
+   * Creates an unconnected client that will lazily attempt to connect on the first call to
+   * {@link #get}.
+   *
+   * @param sessionTimeout the ZK session timeout
+   * @param zooKeeperServers the set of servers forming the ZK cluster
+   */
+  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout,
+      Iterable<InetSocketAddress> zooKeeperServers) {
+    this(sessionTimeout, Optional.absent(), Optional.absent(), zooKeeperServers);
+  }
+
+  /**
+   * Creates an unconnected client that will lazily attempt to connect on the first call to
+   * {@link #get()}.  All successful connections will be authenticated with the given
+   * {@code credentials}.
+   *
+   * @param sessionTimeout the ZK session timeout
+   * @param credentials the credentials to authenticate with
+   * @param zooKeeperServer the first, required ZK server
+   * @param zooKeeperServers any additional servers forming the ZK cluster
+   */
+  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials,
+      InetSocketAddress zooKeeperServer, InetSocketAddress... zooKeeperServers) {
+    this(sessionTimeout,
+        Optional.of(credentials),
+        Optional.absent(),
+        combine(zooKeeperServer, zooKeeperServers));
+  }
+
+  /**
+   * Creates an unconnected client that will lazily attempt to connect on the first call to
+   * {@link #get}.  All successful connections will be authenticated with the given
+   * {@code credentials}.
+   *
+   * @param sessionTimeout the ZK session timeout
+   * @param credentials the credentials to authenticate with
+   * @param zooKeeperServers the set of servers forming the ZK cluster
+   */
+  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials,
+      Iterable<InetSocketAddress> zooKeeperServers) {
+        this(sessionTimeout,
+            Optional.of(credentials),
+            Optional.absent(),
+            zooKeeperServers);
+      }
+
+  /**
+   * Creates an unconnected client that will lazily attempt to connect on the first call to
+   * {@link #get}.  All successful connections will be authenticated with the given
+   * {@code credentials}.
+   *
+   * @param sessionTimeout the ZK session timeout
+   * @param credentials the credentials to authenticate with
+   * @param chrootPath an optional chroot path
+   * @param zooKeeperServers the set of servers forming the ZK cluster
+   */
+  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Optional<Credentials> credentials,
+      Optional<String> chrootPath, Iterable<InetSocketAddress> zooKeeperServers) {
+    this.sessionTimeoutMs = Preconditions.checkNotNull(sessionTimeout).as(Time.MILLISECONDS);
+    this.credentials = Preconditions.checkNotNull(credentials);
+
+    if (chrootPath.isPresent()) {
+      PathUtils.validatePath(chrootPath.get());
+    }
+
+    Preconditions.checkNotNull(zooKeeperServers);
+    Preconditions.checkArgument(!Iterables.isEmpty(zooKeeperServers),
+        "Must present at least 1 ZK server");
+
+    Thread watcherProcessor = new Thread("ZookeeperClient-watcherProcessor") {
+      @Override
+      public void run() {
+        while (true) {
+          try {
+            WatchedEvent event = eventQueue.take();
+            for (Watcher watcher : watchers) {
+              watcher.process(event);
+            }
+          } catch (InterruptedException e) { /* ignore */ }
+        }
+      }
+    };
+    watcherProcessor.setDaemon(true);
+    watcherProcessor.start();
+
+    Iterable<String> servers =
+        Iterables.transform(ImmutableSet.copyOf(zooKeeperServers),
+            InetSocketAddressHelper::toString);
+    this.zooKeeperServers = Joiner.on(',').join(servers).concat(chrootPath.or(""));
+  }
+
+  /**
+   * Returns the current active ZK connection or establishes a new one if none has yet been
+   * established or a previous connection was disconnected or had its session time out.  This method
+   * will attempt to re-use sessions when possible.  Equivalent to:
+   * <pre>get(Amount.of(0L, ...)</pre>.
+   *
+   * @return a connected ZooKeeper client
+   * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster
+   * @throws InterruptedException if interrupted while waiting for a connection to be established
+   */
+  public synchronized ZooKeeper get() throws ZooKeeperConnectionException, InterruptedException {
+    try {
+      return get(WAIT_FOREVER);
+    } catch (TimeoutException e) {
+      InterruptedException interruptedException =
+          new InterruptedException("Got an unexpected TimeoutException for 0 wait");
+      interruptedException.initCause(e);
+      throw interruptedException;
+    }
+  }
+
+  /**
+   * Returns the current active ZK connection or establishes a new one if none has yet been
+   * established or a previous connection was disconnected or had its session time out.  This
+   * method will attempt to re-use sessions when possible.
+   *
+   * @param connectionTimeout the maximum amount of time to wait for the connection to the ZK
+   *     cluster to be established; 0 to wait forever
+   * @return a connected ZooKeeper client
+   * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster
+   * @throws InterruptedException if interrupted while waiting for a connection to be established
+   * @throws TimeoutException if a connection could not be established within the configured
+   *     session timeout
+   */
+  public synchronized ZooKeeper get(Amount<Long, Time> connectionTimeout)
+      throws ZooKeeperConnectionException, InterruptedException, TimeoutException {
+
+    if (zooKeeper == null) {
+      final CountDownLatch connected = new CountDownLatch(1);
+      Watcher watcher = event -> {
+        switch (event.getType()) {
+          // Guard the None type since this watch may be used as the default watch on calls by
+          // the client outside our control.
+          case None:
+            switch (event.getState()) {
+              case Expired:
+                LOG.info("Zookeeper session expired. Event: " + event);
+                close();
+                break;
+              case SyncConnected:
+                connected.countDown();
+                break;
+            }
+        }
+
+        eventQueue.offer(event);
+      };
+
+      try {
+        zooKeeper = (sessionState != null)
+          ? new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher, sessionState.sessionId,
+            sessionState.sessionPasswd)
+          : new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher);
+      } catch (IOException e) {
+        throw new ZooKeeperConnectionException(
+            "Problem connecting to servers: " + zooKeeperServers, e);
+      }
+
+      if (connectionTimeout.getValue() > 0) {
+        if (!connected.await(connectionTimeout.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS)) {
+          close();
+          throw new TimeoutException("Timed out waiting for a ZK connection after "
+                                     + connectionTimeout);
+        }
+      } else {
+        try {
+          connected.await();
+        } catch (InterruptedException ex) {
+          LOG.info("Interrupted while waiting to connect to zooKeeper");
+          close();
+          throw ex;
+        }
+      }
+      if (credentials.isPresent()) {
+        Credentials zkCredentials = credentials.get();
+        zooKeeper.addAuthInfo(zkCredentials.scheme(), zkCredentials.authToken());
+      }
+
+      sessionState = new SessionState(zooKeeper.getSessionId(), zooKeeper.getSessionPasswd());
+    }
+    return zooKeeper;
+  }
+
+  /**
+   * Clients that need to re-establish state after session expiration can register an
+   * {@code onExpired} command to execute.
+   *
+   * @param onExpired the {@code Command} to register
+   * @return the new {@link Watcher} which can later be passed to {@link #unregister} for
+   *     removal.
+   */
+  public Watcher registerExpirationHandler(final Command onExpired) {
+    Watcher watcher = event -> {
+      if (event.getType() == EventType.None && event.getState() == KeeperState.Expired) {
+        onExpired.execute();
+      }
+    };
+    register(watcher);
+    return watcher;
+  }
+
+  /**
+   * Clients that need to register a top-level {@code Watcher} should do so using this method.  The
+   * registered {@code watcher} will remain registered across re-connects and session expiration
+   * events.
+   *
+   * @param watcher the {@code Watcher to register}
+   */
+  public void register(Watcher watcher) {
+    watchers.add(watcher);
+  }
+
+  /**
+   * Clients can attempt to unregister a top-level {@code Watcher} that has previously been
+   * registered.
+   *
+   * @param watcher the {@code Watcher} to unregister as a top-level, persistent watch
+   * @return whether the given {@code Watcher} was found and removed from the active set
+   */
+  public boolean unregister(Watcher watcher) {
+    return watchers.remove(watcher);
+  }
+
+  /**
+   * Checks to see if the client might reasonably re-try an operation given the exception thrown
+   * while attempting it.  If the ZooKeeper session should be expired to enable the re-try to
+   * succeed this method will expire it as a side-effect.
+   *
+   * @param e the exception to test
+   * @return true if a retry can be attempted
+   */
+  public boolean shouldRetry(KeeperException e) {
+    if (e instanceof SessionExpiredException) {
+      close();
+    }
+    return ZooKeeperUtils.isRetryable(e);
+  }
+
+  /**
+   * Closes the current connection if any expiring the current ZooKeeper session.  Any subsequent
+   * calls to this method will no-op until the next successful {@link #get}.
+   */
+  public synchronized void close() {
+    if (zooKeeper != null) {
+      try {
+        zooKeeper.close();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOG.warn("Interrupted trying to close zooKeeper");
+      } finally {
+        zooKeeper = null;
+        sessionState = null;
+      }
+    }
+  }
+
+  @VisibleForTesting
+  synchronized boolean isClosed() {
+    return zooKeeper == null;
+  }
+
+  @VisibleForTesting
+  ZooKeeper getZooKeeperClientForTests() {
+    return zooKeeper;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java
new file mode 100644
index 0000000..2ada264
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utilities for dealing with zoo keeper.
+ */
+public final class ZooKeeperUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperUtils.class);
+
+  /**
+   * An appropriate default session timeout for Twitter ZooKeeper clusters.
+   */
+  public static final Amount<Integer,Time> DEFAULT_ZK_SESSION_TIMEOUT = Amount.of(4, Time.SECONDS);
+
+  /**
+   * The magic version number that allows any mutation to always succeed regardless of actual
+   * version number.
+   */
+  public static final int ANY_VERSION = -1;
+
+  /**
+   * An ACL that gives all permissions any user authenticated or not.
+   */
+  public static final ImmutableList<ACL> OPEN_ACL_UNSAFE =
+      ImmutableList.copyOf(Ids.OPEN_ACL_UNSAFE);
+
+  /**
+   * An ACL that gives all permissions to node creators and read permissions only to everyone else.
+   */
+  public static final ImmutableList<ACL> EVERYONE_READ_CREATOR_ALL =
+      ImmutableList.<ACL>builder()
+          .addAll(Ids.CREATOR_ALL_ACL)
+          .addAll(Ids.READ_ACL_UNSAFE)
+          .build();
+
+  /**
+   * Returns true if the given exception indicates an error that can be resolved by retrying the
+   * operation without modification.
+   *
+   * @param e the exception to check
+   * @return true if the causing operation is strictly retryable
+   */
+  public static boolean isRetryable(KeeperException e) {
+    Preconditions.checkNotNull(e);
+
+    switch (e.code()) {
+      case CONNECTIONLOSS:
+      case SESSIONEXPIRED:
+      case SESSIONMOVED:
+      case OPERATIONTIMEOUT:
+        return true;
+
+      case RUNTIMEINCONSISTENCY:
+      case DATAINCONSISTENCY:
+      case MARSHALLINGERROR:
+      case BADARGUMENTS:
+      case NONODE:
+      case NOAUTH:
+      case BADVERSION:
+      case NOCHILDRENFOREPHEMERALS:
+      case NODEEXISTS:
+      case NOTEMPTY:
+      case INVALIDCALLBACK:
+      case INVALIDACL:
+      case AUTHFAILED:
+      case UNIMPLEMENTED:
+
+      // These two should not be encountered - they are used internally by ZK to specify ranges
+      case SYSTEMERROR:
+      case APIERROR:
+
+      case OK: // This is actually an invalid ZK exception code
+
+      default:
+        return false;
+    }
+  }
+
+  /**
+   * Ensures the given {@code path} exists in the ZK cluster accessed by {@code zkClient}.  If the
+   * path already exists, nothing is done; however if any portion of the path is missing, it will be
+   * created with the given {@code acl} as a persistent zookeeper node.  The given {@code path} must
+   * be a valid zookeeper absolute path.
+   *
+   * @param zkClient the client to use to access the ZK cluster
+   * @param acl the acl to use if creating path nodes
+   * @param path the path to ensure exists
+   * @throws ZooKeeperConnectionException if there was a problem accessing the ZK cluster
+   * @throws InterruptedException if we were interrupted attempting to connect to the ZK cluster
+   * @throws KeeperException if there was a problem in ZK
+   */
+  public static void ensurePath(ZooKeeperClient zkClient, List<ACL> acl, String path)
+      throws ZooKeeperConnectionException, InterruptedException, KeeperException {
+    Preconditions.checkNotNull(zkClient);
+    Preconditions.checkNotNull(path);
+    Preconditions.checkArgument(path.startsWith("/"));
+
+    ensurePathInternal(zkClient, acl, path);
+  }
+
+  private static void ensurePathInternal(ZooKeeperClient zkClient, List<ACL> acl, String path)
+      throws ZooKeeperConnectionException, InterruptedException, KeeperException {
+    if (zkClient.get().exists(path, false) == null) {
+      // The current path does not exist; so back up a level and ensure the parent path exists
+      // unless we're already a root-level path.
+      int lastPathIndex = path.lastIndexOf('/');
+      if (lastPathIndex > 0) {
+        ensurePathInternal(zkClient, acl, path.substring(0, lastPathIndex));
+      }
+
+      // We've ensured our parent path (if any) exists so we can proceed to create our path.
+      try {
+        zkClient.get().create(path, null, acl, CreateMode.PERSISTENT);
+      } catch (KeeperException.NodeExistsException e) {
+        // This ensures we don't die if a race condition was met between checking existence and
+        // trying to create the node.
+        LOG.info("Node existed when trying to ensure path " + path + ", somebody beat us to it?");
+      }
+    }
+  }
+
+  /**
+   * Validate and return a normalized zookeeper path which doesn't contain consecutive slashes and
+   * never ends with a slash (except for root path).
+   *
+   * @param path the path to be normalized
+   * @return normalized path string
+   */
+  public static String normalizePath(String path) {
+    String normalizedPath = path.replaceAll("//+", "/").replaceFirst("(.+)/$", "$1");
+    PathUtils.validatePath(normalizedPath);
+    return normalizedPath;
+  }
+
+  private ZooKeeperUtils() {
+    // utility
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java
new file mode 100644
index 0000000..ba09279
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper.testing;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.Credentials;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient;
+
+/**
+ * A base-class for tests that interact with ZooKeeper via the commons ZooKeeperClient.
+ */
+public abstract class BaseZooKeeperClientTest extends BaseZooKeeperTest {
+
+  private final Amount<Integer, Time> defaultSessionTimeout;
+
+  /**
+   * Creates a test case where the test server uses its
+   * {@link ZooKeeperTestServer#DEFAULT_SESSION_TIMEOUT} for clients created without an explicit
+   * session timeout.
+   */
+  public BaseZooKeeperClientTest() {
+    this(ZooKeeperTestServer.DEFAULT_SESSION_TIMEOUT);
+  }
+
+  /**
+   * Creates a test case where the test server uses the given {@code defaultSessionTimeout} for
+   * clients created without an explicit session timeout.
+   */
+  public BaseZooKeeperClientTest(Amount<Integer, Time> defaultSessionTimeout) {
+    this.defaultSessionTimeout = Preconditions.checkNotNull(defaultSessionTimeout);
+  }
+
+
+  /**
+   * Starts zookeeper back up on the last used port.
+   */
+  protected final void restartNetwork() throws IOException, InterruptedException {
+    getServer().restartNetwork();
+  }
+
+  /**
+   * Shuts down the in-process zookeeper network server.
+   */
+  protected final void shutdownNetwork() {
+    getServer().shutdownNetwork();
+  }
+
+  /**
+   * Expires the active session for the given client.  The client should be one returned from
+   * {@link #createZkClient}.
+   *
+   * @param zkClient the client to expire
+   * @throws ZooKeeperClient.ZooKeeperConnectionException if a problem is encountered connecting to
+   *    the local zk server while trying to expire the session
+   * @throws InterruptedException if interrupted while requesting expiration
+   */
+  protected final void expireSession(ZooKeeperClient zkClient)
+      throws ZooKeeperClient.ZooKeeperConnectionException, InterruptedException {
+    getServer().expireClientSession(zkClient.get().getSessionId());
+  }
+
+  /**
+   * Returns the current port to connect to the in-process zookeeper instance.
+   */
+  protected final int getPort() {
+    return getServer().getPort();
+  }
+
+  /**
+   * Returns a new unauthenticated zookeeper client connected to the in-process zookeeper server
+   * with the default session timeout.
+   */
+  protected final ZooKeeperClient createZkClient() {
+    return createZkClient(defaultSessionTimeout, Optional.absent(), Optional.absent());
+  }
+
+  /**
+   * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with
+   * the default session timeout.
+   */
+  protected final ZooKeeperClient createZkClient(Credentials credentials) {
+    return createZkClient(defaultSessionTimeout, Optional.of(credentials), Optional.absent());
+  }
+
+  /**
+   * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with
+   * the default session timeout.  The client is authenticated in the digest authentication scheme
+   * with the given {@code username} and {@code password}.
+   */
+  protected final ZooKeeperClient createZkClient(String username, String password) {
+    return createZkClient(Credentials.digestCredentials(username, password));
+  }
+
+  /**
+   * Returns a new unauthenticated zookeeper client connected to the in-process zookeeper server
+   * with a custom {@code sessionTimeout}.
+   */
+  protected final ZooKeeperClient createZkClient(Amount<Integer, Time> sessionTimeout) {
+    return createZkClient(sessionTimeout, Optional.absent(), Optional.absent());
+  }
+
+  /**
+   * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with
+   * the default session timeout and the custom chroot path.
+   */
+  protected final ZooKeeperClient createZkClient(String chrootPath) {
+    return createZkClient(defaultSessionTimeout, Optional.absent(),
+        Optional.of(chrootPath));
+  }
+
+  private ZooKeeperClient createZkClient(
+      Amount<Integer, Time> sessionTimeout,
+      Optional<Credentials> credentials,
+      Optional<String> chrootPath) {
+
+    ZooKeeperClient client = new ZooKeeperClient(sessionTimeout, credentials, chrootPath,
+        ImmutableList.of(InetSocketAddress.createUnresolved("127.0.0.1", getPort())));
+    addTearDown(client::close);
+    return client;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java
new file mode 100644
index 0000000..0e68987
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper.testing;
+
+import org.apache.aurora.common.testing.TearDownTestCase;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * A base-class for in-process zookeeper tests.
+ */
+public abstract class BaseZooKeeperTest extends TearDownTestCase {
+
+  private ZooKeeperTestServer zkTestServer;
+
+  @Rule
+  public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @Before
+  public final void setUp() throws Exception {
+    zkTestServer = new ZooKeeperTestServer(tmpFolder.newFolder(), tmpFolder.newFolder());
+    addTearDown(zkTestServer::stop);
+    zkTestServer.startNetwork();
+  }
+
+  /**
+   * Returns the running in-process ZooKeeper server.
+   *
+   * @return The in-process ZooKeeper server.
+   */
+  protected final ZooKeeperTestServer getServer() {
+    return zkTestServer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
new file mode 100644
index 0000000..50acaeb
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper.testing;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooKeeperServer.BasicDataTreeBuilder;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+
+/**
+ * A helper class for starting in-process ZooKeeper server and clients.
+ *
+ * <p>This is ONLY meant to be used for testing.
+ */
+public class ZooKeeperTestServer {
+
+  static final Amount<Integer, Time> DEFAULT_SESSION_TIMEOUT = Amount.of(100, Time.MILLISECONDS);
+
+  private final File dataDir;
+  private final File snapDir;
+
+  private ZooKeeperServer zooKeeperServer;
+  private ServerCnxnFactory connectionFactory;
+  private int port;
+
+  public ZooKeeperTestServer(File dataDir, File snapDir) {
+    this.dataDir = Preconditions.checkNotNull(dataDir);
+    this.snapDir = Preconditions.checkNotNull(snapDir);
+  }
+
+  /**
+   * Starts zookeeper up on an ephemeral port.
+   */
+  public void startNetwork() throws IOException, InterruptedException {
+    zooKeeperServer =
+        new ZooKeeperServer(
+            new FileTxnSnapLog(dataDir, snapDir),
+            new BasicDataTreeBuilder()) {
+
+          // TODO(John Sirois): Introduce a builder to configure the in-process server if and when
+          // some folks need JMX for in-process tests.
+          @Override protected void registerJMX() {
+            // noop
+          }
+        };
+
+    connectionFactory = new NIOServerCnxnFactory();
+    connectionFactory.configure(
+        new InetSocketAddress(port),
+        60 /* Semi-arbitrary, max 60 connections is the default used by NIOServerCnxnFactory */);
+    connectionFactory.startup(zooKeeperServer);
+    port = zooKeeperServer.getClientPort();
+  }
+
+  /**
+   * Stops the zookeeper server.
+   */
+  public void stop() {
+    shutdownNetwork();
+  }
+
+  /**
+   * Starts zookeeper back up on the last used port.
+   */
+  final void restartNetwork() throws IOException, InterruptedException {
+    checkEphemeralPortAssigned();
+    Preconditions.checkState(connectionFactory == null);
+    startNetwork();
+  }
+
+  /**
+   * Shuts down the in-process zookeeper network server.
+   */
+  final void shutdownNetwork() {
+    if (connectionFactory != null) {
+      connectionFactory.shutdown(); // Also shuts down zooKeeperServer.
+      connectionFactory = null;
+    }
+  }
+
+  /**
+   * Expires the client session with the given {@code sessionId}.
+   *
+   * @param sessionId The id of the client session to expire.
+   */
+  public final void expireClientSession(long sessionId) {
+    zooKeeperServer.closeSession(sessionId);
+  }
+
+  /**
+   * Returns the current port to connect to the in-process zookeeper instance.
+   */
+  public final int getPort() {
+    checkEphemeralPortAssigned();
+    return port;
+  }
+
+  private void checkEphemeralPortAssigned() {
+    Preconditions.checkState(port > 0, "startNetwork must be called first");
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java
new file mode 100644
index 0000000..9c0cebe
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+
+import org.apache.aurora.common.base.ExceptionalCommand;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+public class CandidateImplTest extends BaseZooKeeperClientTest {
+  private static final List<ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+  private static final String SERVICE = "/twitter/services/puffin_linkhose/leader";
+  private static final Amount<Integer, Time> TIMEOUT = Amount.of(1, Time.MINUTES);
+
+  private LinkedBlockingDeque<CandidateImpl> candidateBuffer;
+
+  @Before
+  public void mySetUp() throws IOException {
+    candidateBuffer = new LinkedBlockingDeque<>();
+  }
+
+  private Group createGroup(ZooKeeperClient zkClient) throws IOException {
+    return new Group(zkClient, ACL, SERVICE);
+  }
+
+  private class Reign implements Candidate.Leader {
+    private ExceptionalCommand<Group.JoinException> abdicate;
+    private final CandidateImpl candidate;
+    private final String id;
+    private CountDownLatch defeated = new CountDownLatch(1);
+
+    Reign(String id, CandidateImpl candidate) {
+      this.id = id;
+      this.candidate = candidate;
+    }
+
+    @Override
+    public void onElected(ExceptionalCommand<Group.JoinException> abdicate) {
+      candidateBuffer.offerFirst(candidate);
+      this.abdicate = abdicate;
+    }
+
+    @Override
+    public void onDefeated() {
+      defeated.countDown();
+    }
+
+    public void abdicate() throws Group.JoinException {
+      Preconditions.checkState(abdicate != null);
+      abdicate.execute();
+    }
+
+    public void expectDefeated() throws InterruptedException {
+      defeated.await();
+    }
+
+    @Override
+    public String toString() {
+      return id;
+    }
+  }
+
+  @Test
+  public void testOfferLeadership() throws Exception {
+    ZooKeeperClient zkClient1 = createZkClient(TIMEOUT);
+    final CandidateImpl candidate1 = new CandidateImpl(createGroup(zkClient1)) {
+      @Override public String toString() {
+        return "Leader1";
+      }
+    };
+    ZooKeeperClient zkClient2 = createZkClient(TIMEOUT);
+    final CandidateImpl candidate2 = new CandidateImpl(createGroup(zkClient2)) {
+      @Override public String toString() {
+        return "Leader2";
+      }
+    };
+    ZooKeeperClient zkClient3 = createZkClient(TIMEOUT);
+    final CandidateImpl candidate3 = new CandidateImpl(createGroup(zkClient3)) {
+      @Override public String toString() {
+        return "Leader3";
+      }
+    };
+
+    Reign candidate1Reign = new Reign("1", candidate1);
+    Reign candidate2Reign = new Reign("2", candidate2);
+    Reign candidate3Reign = new Reign("3", candidate3);
+
+    Supplier<Boolean> candidate1Leader = candidate1.offerLeadership(candidate1Reign);
+    Supplier<Boolean> candidate2Leader = candidate2.offerLeadership(candidate2Reign);
+    Supplier<Boolean> candidate3Leader = candidate3.offerLeadership(candidate3Reign);
+
+    assertTrue("Since initial group join is synchronous, candidate 1 should be the first leader",
+        candidate1Leader.get());
+
+    shutdownNetwork();
+    restartNetwork();
+
+    assertTrue("A re-connect without a session expiration should leave the leader elected",
+        candidate1Leader.get());
+
+    candidate1Reign.abdicate();
+    assertSame(candidate1, candidateBuffer.takeLast());
+    assertFalse(candidate1Leader.get());
+    // Active abdication should trigger defeat.
+    candidate1Reign.expectDefeated();
+
+    CandidateImpl secondCandidate = candidateBuffer.takeLast();
+    assertTrue("exactly 1 remaining candidate should now be leader: " + secondCandidate + " "
+               + candidateBuffer,
+        candidate2Leader.get() ^ candidate3Leader.get());
+
+    if (secondCandidate == candidate2) {
+      expireSession(zkClient2);
+      assertSame(candidate3, candidateBuffer.takeLast());
+      assertTrue(candidate3Leader.get());
+      // Passive expiration should trigger defeat.
+      candidate2Reign.expectDefeated();
+    } else {
+      expireSession(zkClient3);
+      assertSame(candidate2, candidateBuffer.takeLast());
+      assertTrue(candidate2Leader.get());
+      // Passive expiration should trigger defeat.
+      candidate3Reign.expectDefeated();
+    }
+  }
+
+  @Test
+  public void testEmptyMembership() throws Exception {
+    ZooKeeperClient zkClient1 = createZkClient(TIMEOUT);
+    final CandidateImpl candidate1 = new CandidateImpl(createGroup(zkClient1));
+    Reign candidate1Reign = new Reign("1", candidate1);
+
+    candidate1.offerLeadership(candidate1Reign);
+    assertSame(candidate1, candidateBuffer.takeLast());
+    candidate1Reign.abdicate();
+    assertFalse(candidate1.getLeaderData().isPresent());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java
new file mode 100644
index 0000000..97a42d1
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.common.zookeeper.Group.GroupChangeListener;
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+import org.apache.aurora.common.zookeeper.Group.Membership;
+import org.apache.aurora.common.zookeeper.Group.NodeScheme;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.fail;
+
+public class GroupTest extends BaseZooKeeperClientTest {
+
+  private ZooKeeperClient zkClient;
+  private Group joinGroup;
+  private Group watchGroup;
+  private Command stopWatching;
+  private Command onLoseMembership;
+
+  private RecordingListener listener;
+
+  public GroupTest() {
+    super(Amount.of(1, Time.DAYS));
+  }
+
+  @Before
+  public void mySetUp() throws Exception {
+    onLoseMembership = createMock(Command.class);
+
+    zkClient = createZkClient("group", "test");
+    joinGroup = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group");
+    watchGroup = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group");
+
+    listener = new RecordingListener();
+    stopWatching = watchGroup.watch(listener);
+  }
+
+  private static class RecordingListener implements GroupChangeListener {
+    private final LinkedBlockingQueue<Iterable<String>> membershipChanges =
+        new LinkedBlockingQueue<Iterable<String>>();
+
+    @Override
+    public void onGroupChange(Iterable<String> memberIds) {
+      membershipChanges.add(memberIds);
+    }
+
+    public Iterable<String> take() throws InterruptedException {
+      return membershipChanges.take();
+    }
+
+    public void assertEmpty() {
+      assertEquals(ImmutableList.<Iterable<String>>of(), ImmutableList.copyOf(membershipChanges));
+    }
+
+    @Override
+    public String toString() {
+      return membershipChanges.toString();
+    }
+  }
+
+  private static class CustomScheme implements NodeScheme {
+    static final String NODE_NAME = "custom_name";
+
+    @Override
+    public boolean isMember(String nodeName) {
+      return NODE_NAME.equals(nodeName);
+    }
+
+    @Override
+    public String createName(byte[] membershipData) {
+      return NODE_NAME;
+    }
+
+    @Override
+    public boolean isSequential() {
+      return false;
+    }
+  }
+
+  @Test
+  public void testSessionExpirationTriggersOnLoseMembership() throws Exception {
+    final CountDownLatch lostMembership = new CountDownLatch(1);
+    Command onLoseMembership = lostMembership::countDown;
+    assertEmptyMembershipObserved();
+
+    Membership membership = joinGroup.join(onLoseMembership);
+    assertMembershipObserved(membership.getMemberId());
+    expireSession(zkClient);
+
+    lostMembership.await(); // Will hang this test if onLoseMembership event is not propagated.
+  }
+
+  @Test
+  public void testNodeDeleteTriggersOnLoseMembership() throws Exception {
+    final CountDownLatch lostMembership = new CountDownLatch(1);
+    Command onLoseMembership = lostMembership::countDown;
+    assertEmptyMembershipObserved();
+
+    Membership membership = joinGroup.join(onLoseMembership);
+    assertMembershipObserved(membership.getMemberId());
+    membership.cancel();
+
+    lostMembership.await(); // Will hang this test if onLoseMembership event is not propagated.
+  }
+
+  @Test
+  public void testJoinsAndWatchesSurviveDisconnect() throws Exception {
+    replay(onLoseMembership);
+
+    assertEmptyMembershipObserved();
+
+    Membership membership = joinGroup.join();
+    String originalMemberId = membership.getMemberId();
+    assertMembershipObserved(originalMemberId);
+
+    shutdownNetwork();
+    restartNetwork();
+
+    // The member should still be present under existing ephemeral node since session did not
+    // expire.
+    watchGroup.watch(listener);
+    assertMembershipObserved(originalMemberId);
+
+    membership.cancel();
+
+    assertEmptyMembershipObserved();
+    assertEmptyMembershipObserved(); // and again for 2nd listener
+
+    listener.assertEmpty();
+
+    verify(onLoseMembership);
+    reset(onLoseMembership); // Turn off expectations during ZK server shutdown.
+  }
+
+  @Test
+  public void testJoinsAndWatchesSurviveExpiredSession() throws Exception {
+    onLoseMembership.execute();
+    replay(onLoseMembership);
+
+    assertEmptyMembershipObserved();
+
+    Membership membership = joinGroup.join(onLoseMembership);
+    String originalMemberId = membership.getMemberId();
+    assertMembershipObserved(originalMemberId);
+
+    expireSession(zkClient);
+
+    // We should have lost our group membership and then re-gained it with a new ephemeral node.
+    // We may or may-not see the intermediate state change but we must see the final state
+    Iterable<String> members = listener.take();
+    if (Iterables.isEmpty(members)) {
+      members = listener.take();
+    }
+    assertEquals(1, Iterables.size(members));
+    assertNotEquals(originalMemberId, Iterables.getOnlyElement(members));
+    assertNotEquals(originalMemberId, membership.getMemberId());
+
+    listener.assertEmpty();
+
+    verify(onLoseMembership);
+    reset(onLoseMembership); // Turn off expectations during ZK server shutdown.
+  }
+
+  @Test
+  public void testJoinCustomNamingScheme() throws Exception {
+    Group group = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group",
+        new CustomScheme());
+
+    listener = new RecordingListener();
+    group.watch(listener);
+    assertEmptyMembershipObserved();
+
+    Membership membership = group.join();
+    String memberId = membership.getMemberId();
+
+    assertEquals("Wrong member ID.", CustomScheme.NODE_NAME, memberId);
+    assertMembershipObserved(memberId);
+
+    expireSession(zkClient);
+  }
+
+  @Test
+  public void testUpdateMembershipData() throws Exception {
+    Supplier<byte[]> dataSupplier = new EasyMockTest.Clazz<Supplier<byte[]>>() {}.createMock();
+
+    byte[] initial = "start".getBytes();
+    expect(dataSupplier.get()).andReturn(initial);
+
+    byte[] second = "update".getBytes();
+    expect(dataSupplier.get()).andReturn(second);
+
+    replay(dataSupplier);
+
+    Membership membership = joinGroup.join(dataSupplier, onLoseMembership);
+    assertArrayEquals("Initial setting is incorrect.", initial, zkClient.get()
+        .getData(membership.getMemberPath(), false, null));
+
+    assertArrayEquals("Updating supplier should not change membership data",
+        initial, zkClient.get().getData(membership.getMemberPath(), false, null));
+
+    membership.updateMemberData();
+    assertArrayEquals("Updating membership should change data",
+        second, zkClient.get().getData(membership.getMemberPath(), false, null));
+
+    verify(dataSupplier);
+  }
+
+  @Test
+  public void testAcls() throws Exception {
+    Group securedMembership =
+        new Group(createZkClient("secured", "group"), ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL,
+            "/secured/group/membership");
+
+    String memberId = securedMembership.join().getMemberId();
+
+    Group unauthenticatedObserver =
+        new Group(createZkClient(),
+            Ids.READ_ACL_UNSAFE,
+            "/secured/group/membership");
+    RecordingListener unauthenticatedListener = new RecordingListener();
+    unauthenticatedObserver.watch(unauthenticatedListener);
+
+    assertMembershipObserved(unauthenticatedListener, memberId);
+
+    try {
+      unauthenticatedObserver.join();
+      fail("Expected join exception for unauthenticated observer");
+    } catch (JoinException e) {
+      // expected
+    }
+
+    Group unauthorizedObserver =
+        new Group(createZkClient("joe", "schmoe"),
+            Ids.READ_ACL_UNSAFE,
+            "/secured/group/membership");
+    RecordingListener unauthorizedListener = new RecordingListener();
+    unauthorizedObserver.watch(unauthorizedListener);
+
+    assertMembershipObserved(unauthorizedListener, memberId);
+
+    try {
+      unauthorizedObserver.join();
+      fail("Expected join exception for unauthorized observer");
+    } catch (JoinException e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testStopWatching() throws Exception {
+    replay(onLoseMembership);
+
+    assertEmptyMembershipObserved();
+
+    Membership member1 = joinGroup.join();
+    String memberId1 = member1.getMemberId();
+    assertMembershipObserved(memberId1);
+
+    Membership member2 = joinGroup.join();
+    String memberId2 = member2.getMemberId();
+    assertMembershipObserved(memberId1, memberId2);
+
+    stopWatching.execute();
+
+    member1.cancel();
+    Membership member3 = joinGroup.join();
+    member2.cancel();
+    member3.cancel();
+
+    listener.assertEmpty();
+  }
+
+  private void assertEmptyMembershipObserved() throws InterruptedException {
+    assertMembershipObserved();
+  }
+
+  private void assertMembershipObserved(String... expectedMemberIds) throws InterruptedException {
+    assertMembershipObserved(listener, expectedMemberIds);
+  }
+
+  private void assertMembershipObserved(RecordingListener listener, String... expectedMemberIds)
+      throws InterruptedException {
+
+    assertEquals(ImmutableSet.copyOf(expectedMemberIds), ImmutableSet.copyOf(listener.take()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java
new file mode 100644
index 0000000..2166123
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.Gson;
+import com.google.gson.JsonIOException;
+
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(Gson.class)
+public class JsonCodecTest {
+
+  private static final Codec<ServiceInstance> STANDARD_JSON_CODEC = new JsonCodec();
+
+  @Test
+  public void testJsonCodecRoundtrip() throws Exception {
+    Codec<ServiceInstance> codec = STANDARD_JSON_CODEC;
+    ServiceInstance instance1 = new ServiceInstance(
+        new Endpoint("foo", 1000),
+        ImmutableMap.of("http", new Endpoint("foo", 8080)),
+        Status.ALIVE)
+        .setShard(0);
+    byte[] data = ServerSets.serializeServiceInstance(instance1, codec);
+    assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+    assertTrue(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+
+    ServiceInstance instance2 = new ServiceInstance(
+        new Endpoint("foo", 1000),
+        ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)),
+        Status.ALIVE);
+    data = ServerSets.serializeServiceInstance(instance2, codec);
+    assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+    assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+
+    ServiceInstance instance3 = new ServiceInstance(
+        new Endpoint("foo", 1000),
+        ImmutableMap.<String, Endpoint>of(),
+        Status.ALIVE);
+    data = ServerSets.serializeServiceInstance(instance3, codec);
+    assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+    assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+  }
+
+  @Test
+  public void testJsonCompatibility() throws IOException {
+    ServiceInstance instance = new ServiceInstance(
+        new Endpoint("foo", 1000),
+        ImmutableMap.of("http", new Endpoint("foo", 8080)),
+        Status.ALIVE).setShard(42);
+
+    ByteArrayOutputStream results = new ByteArrayOutputStream();
+    STANDARD_JSON_CODEC.serialize(instance, results);
+    assertEquals(
+        "{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000},"
+            + "\"additionalEndpoints\":{\"http\":{\"host\":\"foo\",\"port\":8080}},"
+            + "\"status\":\"ALIVE\","
+            + "\"shard\":42}",
+        results.toString());
+  }
+
+  @Test
+  public void testInvalidSerialize() {
+    // Gson is final so we need to call on PowerMock here.
+    Gson gson = PowerMock.createMock(Gson.class);
+    gson.toJson(EasyMock.isA(Object.class), EasyMock.isA(Appendable.class));
+    EasyMock.expectLastCall().andThrow(new JsonIOException("error"));
+    PowerMock.replay(gson);
+
+    ServiceInstance instance =
+        new ServiceInstance(new Endpoint("foo", 1000), ImmutableMap.of(), Status.ALIVE);
+
+    try {
+      new JsonCodec(gson).serialize(instance, new ByteArrayOutputStream());
+      fail();
+    } catch (IOException e) {
+      // Expected.
+    }
+
+    PowerMock.verify(gson);
+  }
+
+  @Test
+  public void testDeserializeMinimal() throws IOException {
+    String minimal = "{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000},\"status\":\"ALIVE\"}";
+    ByteArrayInputStream source = new ByteArrayInputStream(minimal.getBytes(Charsets.UTF_8));
+    ServiceInstance actual = STANDARD_JSON_CODEC.deserialize(source);
+    ServiceInstance expected =
+        new ServiceInstance(new Endpoint("foo", 1000), ImmutableMap.of(), Status.ALIVE);
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testInvalidDeserialize() {
+    // Not JSON.
+    assertInvalidDeserialize(new byte[] {0xC, 0xA, 0xF, 0xE});
+
+    // No JSON object.
+    assertInvalidDeserialize("");
+    assertInvalidDeserialize("[]");
+
+    // Missing required fields.
+    assertInvalidDeserialize("{}");
+    assertInvalidDeserialize("{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000}}");
+    assertInvalidDeserialize("{\"status\":\"ALIVE\"}");
+  }
+
+  private void assertInvalidDeserialize(String data) {
+    assertInvalidDeserialize(data.getBytes(Charsets.UTF_8));
+  }
+
+  private void assertInvalidDeserialize(byte[] data) {
+    try {
+      STANDARD_JSON_CODEC.deserialize(new ByteArrayInputStream(data));
+      fail();
+    } catch (IOException e) {
+      // Expected.
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
new file mode 100644
index 0000000..f0c0cb4
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.net.pool.DynamicHostSet;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.easymock.IMocksControl;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createControl;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ *
+ * TODO(William Farner): Change this to remove thrift dependency.
+ */
+public class ServerSetImplTest extends BaseZooKeeperClientTest {
+  private static final List<ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+  private static final String SERVICE = "/twitter/services/puffin_hosebird";
+
+  private LinkedBlockingQueue<ImmutableSet<ServiceInstance>> serverSetBuffer;
+  private DynamicHostSet.HostChangeMonitor<ServiceInstance> serverSetMonitor;
+
+  @Before
+  public void mySetUp() throws IOException {
+    serverSetBuffer = new LinkedBlockingQueue<>();
+    serverSetMonitor = serverSetBuffer::offer;
+  }
+
+  private ServerSetImpl createServerSet() throws IOException {
+    return new ServerSetImpl(createZkClient(), ACL, SERVICE);
+  }
+
+  @Test
+  public void testLifecycle() throws Exception {
+    ServerSetImpl client = createServerSet();
+    client.watch(serverSetMonitor);
+    assertChangeFiredEmpty();
+
+    ServerSetImpl server = createServerSet();
+    ServerSet.EndpointStatus status = server.join(
+        InetSocketAddress.createUnresolved("foo", 1234), makePortMap("http-admin", 8080));
+
+    ServiceInstance serviceInstance = new ServiceInstance(
+        new Endpoint("foo", 1234),
+        ImmutableMap.of("http-admin", new Endpoint("foo", 8080)),
+        Status.ALIVE);
+
+    assertChangeFired(serviceInstance);
+
+    status.leave();
+    assertChangeFiredEmpty();
+    assertTrue(serverSetBuffer.isEmpty());
+  }
+
+  @Test
+  public void testMembershipChanges() throws Exception {
+    ServerSetImpl client = createServerSet();
+    client.watch(serverSetMonitor);
+    assertChangeFiredEmpty();
+
+    ServerSetImpl server = createServerSet();
+
+    ServerSet.EndpointStatus foo = join(server, "foo");
+    assertChangeFired("foo");
+
+    expireSession(client.getZkClient());
+
+    ServerSet.EndpointStatus bar = join(server, "bar");
+
+    // We should've auto re-monitored membership, but not been notifed of "foo" since this was not a
+    // change, just "foo", "bar" since this was an addition.
+    assertChangeFired("foo", "bar");
+
+    foo.leave();
+    assertChangeFired("bar");
+
+    ServerSet.EndpointStatus baz = join(server, "baz");
+    assertChangeFired("bar", "baz");
+
+    baz.leave();
+    assertChangeFired("bar");
+
+    bar.leave();
+    assertChangeFiredEmpty();
+
+    assertTrue(serverSetBuffer.isEmpty());
+  }
+
+  @Test
+  public void testStopMonitoring() throws Exception {
+    ServerSetImpl client = createServerSet();
+    Command stopMonitoring = client.watch(serverSetMonitor);
+    assertChangeFiredEmpty();
+
+    ServerSetImpl server = createServerSet();
+
+    ServerSet.EndpointStatus foo = join(server, "foo");
+    assertChangeFired("foo");
+    ServerSet.EndpointStatus bar = join(server, "bar");
+    assertChangeFired("foo", "bar");
+
+    stopMonitoring.execute();
+
+    // No new updates should be received since monitoring has stopped.
+    foo.leave();
+    assertTrue(serverSetBuffer.isEmpty());
+
+    // Expiration event.
+    assertTrue(serverSetBuffer.isEmpty());
+  }
+
+  @Test
+  public void testOrdering() throws Exception {
+    ServerSetImpl client = createServerSet();
+    client.watch(serverSetMonitor);
+    assertChangeFiredEmpty();
+
+    Map<String, InetSocketAddress> server1Ports = makePortMap("http-admin1", 8080);
+    Map<String, InetSocketAddress> server2Ports = makePortMap("http-admin2", 8081);
+    Map<String, InetSocketAddress> server3Ports = makePortMap("http-admin3", 8082);
+
+    ServerSetImpl server1 = createServerSet();
+    ServerSetImpl server2 = createServerSet();
+    ServerSetImpl server3 = createServerSet();
+
+    ServiceInstance instance1 = new ServiceInstance(
+        new Endpoint("foo", 1000),
+        ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)),
+        Status.ALIVE);
+    ServiceInstance instance2 = new ServiceInstance(
+        new Endpoint("foo", 1001),
+        ImmutableMap.of("http-admin2", new Endpoint("foo", 8081)),
+        Status.ALIVE);
+    ServiceInstance instance3 = new ServiceInstance(
+        new Endpoint("foo", 1002),
+        ImmutableMap.of("http-admin3", new Endpoint("foo", 8082)),
+        Status.ALIVE);
+
+    server1.join(InetSocketAddress.createUnresolved("foo", 1000), server1Ports);
+    assertEquals(ImmutableList.of(instance1), ImmutableList.copyOf(serverSetBuffer.take()));
+
+    ServerSet.EndpointStatus status2 = server2.join(
+        InetSocketAddress.createUnresolved("foo", 1001),
+        server2Ports);
+    assertEquals(ImmutableList.of(instance1, instance2),
+        ImmutableList.copyOf(serverSetBuffer.take()));
+
+    server3.join(InetSocketAddress.createUnresolved("foo", 1002), server3Ports);
+    assertEquals(ImmutableList.of(instance1, instance2, instance3),
+        ImmutableList.copyOf(serverSetBuffer.take()));
+
+    status2.leave();
+    assertEquals(ImmutableList.of(instance1, instance3),
+        ImmutableList.copyOf(serverSetBuffer.take()));
+  }
+
+  @Test
+  public void testUnwatchOnException() throws Exception {
+    IMocksControl control = createControl();
+
+    ZooKeeperClient zkClient = control.createMock(ZooKeeperClient.class);
+    Watcher onExpirationWatcher = control.createMock(Watcher.class);
+
+    expect(zkClient.registerExpirationHandler(anyObject(Command.class)))
+        .andReturn(onExpirationWatcher);
+
+    expect(zkClient.get()).andThrow(new InterruptedException());  // See interrupted() note below.
+    expect(zkClient.unregister(onExpirationWatcher)).andReturn(true);
+    control.replay();
+
+    Group group = new Group(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, "/blabla");
+    ServerSetImpl serverset = new ServerSetImpl(zkClient, group);
+
+    try {
+      serverset.watch(hostSet -> {});
+      fail("Expected MonitorException");
+    } catch (DynamicHostSet.MonitorException e) {
+      // NB: The assert is not important to this test, but the call to `Thread.interrupted()` is.
+      // That call both returns the current interrupted status as well as clearing it.  The clearing
+      // is crucial depending on the order tests are run in this class.  If this test runs before
+      // one of the tests above that uses a `ZooKeeperClient` for example, those tests will fail
+      // executing `ZooKeeperClient.get` which internally blocks on s sync-point that takes part in
+      // the interruption mechanism and so immediately throws `InterruptedException` based on the
+      // un-cleared interrupted bit.
+      assertTrue(Thread.interrupted());
+    }
+    control.verify();
+  }
+
+  private static Map<String, InetSocketAddress> makePortMap(String name, int port) {
+    return ImmutableMap.of(name, InetSocketAddress.createUnresolved("foo", port));
+  }
+
+  private ServerSet.EndpointStatus join(ServerSet serverSet, String host)
+      throws JoinException, InterruptedException {
+
+    return serverSet.join(
+        InetSocketAddress.createUnresolved(host, 42), ImmutableMap.<String, InetSocketAddress>of());
+  }
+
+  private void assertChangeFired(String... serviceHosts)
+      throws InterruptedException {
+
+    assertChangeFired(ImmutableSet.copyOf(Iterables.transform(ImmutableSet.copyOf(serviceHosts),
+        serviceHost -> new ServiceInstance(new Endpoint(serviceHost, 42),
+            ImmutableMap.<String, Endpoint>of(), Status.ALIVE))));
+  }
+
+  protected void assertChangeFiredEmpty() throws InterruptedException {
+    assertChangeFired(ImmutableSet.<ServiceInstance>of());
+  }
+
+  protected void assertChangeFired(ServiceInstance... serviceInstances)
+      throws InterruptedException {
+    assertChangeFired(ImmutableSet.copyOf(serviceInstances));
+  }
+
+  protected void assertChangeFired(ImmutableSet<ServiceInstance> serviceInstances)
+      throws InterruptedException {
+    assertEquals(serviceInstances, serverSetBuffer.take());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java
new file mode 100644
index 0000000..0e67191
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ServerSetsTest {
+  @Test
+  public void testSimpleSerialization() throws Exception {
+    InetSocketAddress endpoint = new InetSocketAddress(12345);
+    Map<String, Endpoint > additionalEndpoints = ImmutableMap.of();
+    Status status = Status.ALIVE;
+
+    byte[] data = ServerSets.serializeServiceInstance(
+        endpoint, additionalEndpoints, status, ServerSet.JSON_CODEC);
+
+    ServiceInstance instance = ServerSets.deserializeServiceInstance(data, ServerSet.JSON_CODEC);
+
+    assertEquals(endpoint.getPort(), instance.getServiceEndpoint().getPort());
+    assertEquals(additionalEndpoints, instance.getAdditionalEndpoints());
+    assertEquals(Status.ALIVE, instance.getStatus());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java
new file mode 100644
index 0000000..5f6cdd8
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import org.apache.aurora.common.base.ExceptionalCommand;
+import org.apache.aurora.common.zookeeper.Candidate.Leader;
+import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl;
+import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
+import org.easymock.Capture;
+import org.easymock.IExpectationSetters;
+import org.easymock.IMocksControl;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.common.testing.easymock.EasyMockTest.createCapture;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.createControl;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.fail;
+
+public class SingletonServiceImplTest extends BaseZooKeeperClientTest {
+  private static final int PORT_A = 1234;
+  private static final int PORT_B = 8080;
+  private static final InetSocketAddress PRIMARY_ENDPOINT =
+      InetSocketAddress.createUnresolved("foo", PORT_A);
+  private static final Map<String, InetSocketAddress> AUX_ENDPOINTS =
+      ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo", PORT_B));
+
+  private IMocksControl control;
+  private SingletonServiceImpl.LeadershipListener listener;
+  private ServerSet serverSet;
+  private ServerSet.EndpointStatus endpointStatus;
+  private Candidate candidate;
+  private ExceptionalCommand<Group.JoinException> abdicate;
+
+  private SingletonService service;
+
+  @Before
+  @SuppressWarnings("unchecked")
+  public void mySetUp() throws IOException {
+    control = createControl();
+    addTearDown(control::verify);
+    listener = control.createMock(SingletonServiceImpl.LeadershipListener.class);
+    serverSet = control.createMock(ServerSet.class);
+    candidate = control.createMock(Candidate.class);
+    endpointStatus = control.createMock(ServerSet.EndpointStatus.class);
+    abdicate = control.createMock(ExceptionalCommand.class);
+
+    service = new SingletonServiceImpl(serverSet, candidate);
+  }
+
+  private void newLeader(
+      final String hostName,
+      Capture<Leader> leader,
+      LeadershipListener listener) throws Exception {
+
+    service.lead(InetSocketAddress.createUnresolved(hostName, PORT_A),
+        ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved(hostName, PORT_B)),
+        listener);
+
+    // This actually elects the leader.
+    leader.getValue().onElected(abdicate);
+  }
+
+  private void newLeader(String hostName, Capture<Leader> leader) throws Exception {
+    newLeader(hostName, leader, listener);
+  }
+
+  private IExpectationSetters<ServerSet.EndpointStatus> expectJoin() throws Exception {
+    return expect(serverSet.join(PRIMARY_ENDPOINT, AUX_ENDPOINTS));
+  }
+
+  @Test
+  public void testLeadAdvertise() throws Exception {
+    Capture<Leader> leaderCapture = createCapture();
+
+    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+    Capture<LeaderControl> controlCapture = createCapture();
+    listener.onLeading(capture(controlCapture));
+
+    expectJoin().andReturn(endpointStatus);
+    endpointStatus.leave();
+    abdicate.execute();
+
+    control.replay();
+
+    newLeader("foo", leaderCapture);
+    controlCapture.getValue().advertise();
+    controlCapture.getValue().leave();
+  }
+
+  @Test
+  public void teatLeadLeaveNoAdvertise() throws Exception {
+    Capture<Leader> leaderCapture = createCapture();
+
+    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+    abdicate.execute();
+
+    Capture<LeaderControl> controlCapture = createCapture();
+    listener.onLeading(capture(controlCapture));
+
+    control.replay();
+
+    newLeader("foo", leaderCapture);
+    controlCapture.getValue().leave();
+  }
+
+  @Test
+  public void testLeadJoinFailure() throws Exception {
+    Capture<Leader> leaderCapture = new Capture<Leader>();
+
+    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+    Capture<LeaderControl> controlCapture = createCapture();
+    listener.onLeading(capture(controlCapture));
+
+    expectJoin().andThrow(new Group.JoinException("Injected join failure.", new Exception()));
+    abdicate.execute();
+
+    control.replay();
+
+    newLeader("foo", leaderCapture);
+
+    try {
+      controlCapture.getValue().advertise();
+      fail("Join should have failed.");
+    } catch (SingletonService.AdvertiseException e) {
+      // Expected.
+    }
+
+    controlCapture.getValue().leave();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testMultipleAdvertise() throws Exception {
+    Capture<Leader> leaderCapture = createCapture();
+
+    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+    Capture<LeaderControl> controlCapture = createCapture();
+    listener.onLeading(capture(controlCapture));
+
+    expectJoin().andReturn(endpointStatus);
+
+    control.replay();
+
+    newLeader("foo", leaderCapture);
+    controlCapture.getValue().advertise();
+    controlCapture.getValue().advertise();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testMultipleLeave() throws Exception {
+    Capture<Leader> leaderCapture = createCapture();
+
+    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+    Capture<LeaderControl> controlCapture = createCapture();
+    listener.onLeading(capture(controlCapture));
+
+    expectJoin().andReturn(endpointStatus);
+    endpointStatus.leave();
+    abdicate.execute();
+
+    control.replay();
+
+    newLeader("foo", leaderCapture);
+    controlCapture.getValue().advertise();
+    controlCapture.getValue().leave();
+    controlCapture.getValue().leave();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testAdvertiseAfterLeave() throws Exception {
+    Capture<Leader> leaderCapture = createCapture();
+
+    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+    Capture<LeaderControl> controlCapture = createCapture();
+    listener.onLeading(capture(controlCapture));
+
+    abdicate.execute();
+
+    control.replay();
+
+    newLeader("foo", leaderCapture);
+    controlCapture.getValue().leave();
+    controlCapture.getValue().advertise();
+  }
+
+  @Test
+  public void testLeadMulti() throws Exception {
+    List<Capture<Leader>> leaderCaptures = Lists.newArrayList();
+    List<Capture<LeaderControl>> leaderControlCaptures = Lists.newArrayList();
+
+    for (int i = 0; i < 5; i++) {
+      Capture<Leader> leaderCapture = new Capture<Leader>();
+      leaderCaptures.add(leaderCapture);
+      Capture<LeaderControl> controlCapture = createCapture();
+      leaderControlCaptures.add(controlCapture);
+
+      expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+      listener.onLeading(capture(controlCapture));
+      InetSocketAddress primary = InetSocketAddress.createUnresolved("foo" + i, PORT_A);
+      Map<String, InetSocketAddress> aux =
+          ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo" + i, PORT_B));
+      expect(serverSet.join(primary, aux)).andReturn(endpointStatus);
+      endpointStatus.leave();
+      abdicate.execute();
+    }
+
+    control.replay();
+
+    for (int i = 0; i < 5; i++) {
+      final String leaderName = "foo" + i;
+      newLeader(leaderName, leaderCaptures.get(i));
+      leaderControlCaptures.get(i).getValue().advertise();
+      leaderControlCaptures.get(i).getValue().leave();
+    }
+  }
+
+  @Test
+  public void testLeaderLeaves() throws Exception {
+    control.replay();
+    shutdownNetwork();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java
new file mode 100644
index 0000000..5eee235
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
+import org.apache.zookeeper.KeeperException.NoAuthException;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * @author John Sirois
+ */
+public class ZooKeeperClientTest extends BaseZooKeeperClientTest {
+
+  public ZooKeeperClientTest() {
+    super(Amount.of(1, Time.DAYS));
+  }
+
+  @Test
+  public void testGet() throws Exception {
+    final ZooKeeperClient zkClient = createZkClient();
+    shutdownNetwork();
+    try {
+      zkClient.get(Amount.of(50L, Time.MILLISECONDS));
+      fail("Expected client connection to timeout while network down");
+    } catch (TimeoutException e) {
+      assertTrue(zkClient.isClosed());
+    }
+    assertNull(zkClient.getZooKeeperClientForTests());
+
+    final CountDownLatch blockingGetComplete = new CountDownLatch(1);
+    final AtomicReference<ZooKeeper> client = new AtomicReference<ZooKeeper>();
+    new Thread(() -> {
+      try {
+        client.set(zkClient.get());
+      } catch (ZooKeeperConnectionException e) {
+        throw new RuntimeException(e);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      } finally {
+        blockingGetComplete.countDown();
+      }
+    }).start();
+
+    restartNetwork();
+
+    // Hung blocking connects should succeed when server connection comes up
+    blockingGetComplete.await();
+    assertNotNull(client.get());
+
+    // New connections should succeed now that network is back up
+    long sessionId = zkClient.get().getSessionId();
+
+    // While connected the same client should be reused (no new connections while healthy)
+    assertSame(client.get(), zkClient.get());
+
+    shutdownNetwork();
+    // Our client doesn't know the network is down yet so we should be able to get()
+    ZooKeeper zooKeeper = zkClient.get();
+    try {
+      zooKeeper.exists("/", false);
+      fail("Expected client operation to fail while network down");
+    } catch (ConnectionLossException e) {
+      // expected
+    }
+
+    restartNetwork();
+    assertEquals("Expected connection to be re-established with existing session",
+        sessionId, zkClient.get().getSessionId());
+  }
+
+  /**
+   * Test that if a blocking get() call gets interrupted, after a connection has been created
+   * but before it's connected, the zk connection gets closed.
+   */
+  @Test
+  public void testGetInterrupted() throws Exception {
+    final ZooKeeperClient zkClient = createZkClient();
+    shutdownNetwork();
+
+    final CountDownLatch blockingGetComplete = new CountDownLatch(1);
+    final AtomicBoolean interrupted = new AtomicBoolean();
+    final AtomicReference<ZooKeeper> client = new AtomicReference<ZooKeeper>();
+    Thread getThread = new Thread(() -> {
+      try {
+        client.set(zkClient.get());
+      } catch (ZooKeeperConnectionException e) {
+        throw new RuntimeException(e);
+      } catch (InterruptedException e) {
+        interrupted.set(true);
+        throw new RuntimeException(e);
+      } finally {
+        blockingGetComplete.countDown();
+      }
+    });
+    getThread.start();
+
+    while (zkClient.getZooKeeperClientForTests() == null) {
+      Thread.sleep(100);
+    }
+
+    getThread.interrupt();
+    blockingGetComplete.await();
+
+    assertNull("The zk connection should have been closed", zkClient.getZooKeeperClientForTests());
+    assertTrue("The waiter thread should have been interrupted", interrupted.get());
+    assertTrue(zkClient.isClosed());
+  }
+
+  @Test
+  public void testClose() throws Exception {
+    ZooKeeperClient zkClient = createZkClient();
+    zkClient.close();
+
+    // Close should be idempotent
+    zkClient.close();
+
+    long firstSessionId = zkClient.get().getSessionId();
+
+    // Close on an open client should force session re-establishment
+    zkClient.close();
+
+    assertNotEquals(firstSessionId, zkClient.get().getSessionId());
+  }
+
+  @Test
+  public void testCredentials() throws Exception {
+    String path = "/test";
+    ZooKeeperClient authenticatedClient = createZkClient("creator", "creator");
+    assertEquals(path,
+        authenticatedClient.get().create(path, "42".getBytes(),
+            ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT));
+
+    ZooKeeperClient unauthenticatedClient = createZkClient();
+    assertEquals("42", getData(unauthenticatedClient, path));
+    try {
+      setData(unauthenticatedClient, path, "37");
+      fail("Expected unauthenticated write attempt to fail");
+    } catch (NoAuthException e) {
+      assertEquals("42", getData(unauthenticatedClient, path));
+    }
+
+    ZooKeeperClient nonOwnerClient = createZkClient("nonowner", "nonowner");
+    assertEquals("42", getData(nonOwnerClient, path));
+    try {
+      setData(nonOwnerClient, path, "37");
+      fail("Expected non owner write attempt to fail");
+    } catch (NoAuthException e) {
+      assertEquals("42", getData(nonOwnerClient, path));
+    }
+
+    ZooKeeperClient authenticatedClient2 = createZkClient("creator", "creator");
+    setData(authenticatedClient2, path, "37");
+    assertEquals("37", getData(authenticatedClient2, path));
+  }
+
+  @Test
+  public void testChrootPath() throws Exception {
+    ZooKeeperClient rootClient = createZkClient();
+    String rootPath = "/test";
+    String subPath = "/test/subtest";
+    assertEquals(rootPath,
+            rootClient.get().create(rootPath, "42".getBytes(),
+                ZooKeeperUtils.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+    assertEquals(subPath,
+            rootClient.get().create(subPath, "37".getBytes(),
+                ZooKeeperUtils.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+
+    ZooKeeperClient chrootedClient = createZkClient(rootPath);
+    assertArrayEquals("37".getBytes(), chrootedClient.get().getData("/subtest", false, null));
+  }
+
+  private void setData(ZooKeeperClient zkClient, String path, String data) throws Exception {
+    zkClient.get().setData(path, data.getBytes(), ZooKeeperUtils.ANY_VERSION);
+  }
+
+  private String getData(ZooKeeperClient zkClient, String path) throws Exception {
+    return new String(zkClient.get().getData(path, false, null));
+  }
+}


[2/4] aurora git commit: Revert removal of twitter/commons/zk based leadership code

Posted by dm...@apache.org.
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java
new file mode 100644
index 0000000..9e482a6
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import com.google.common.base.Charsets;
+
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.BadVersionException;
+import org.apache.zookeeper.KeeperException.NoAuthException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.Stat;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * @author John Sirois
+ */
+public class ZooKeeperUtilsTest extends BaseZooKeeperClientTest {
+  @Test
+  public void testEnsurePath() throws Exception {
+    ZooKeeperClient zkClient = createZkClient();
+    zkClient.get().addAuthInfo("digest", "client1:boo".getBytes(Charsets.UTF_8));
+
+    assertNull(zkClient.get().exists("/foo", false));
+    ZooKeeperUtils.ensurePath(zkClient, ZooDefs.Ids.CREATOR_ALL_ACL, "/foo/bar/baz");
+
+    zkClient = createZkClient();
+    zkClient.get().addAuthInfo("digest", "client2:bap".getBytes(Charsets.UTF_8));
+
+    // Anyone can check for existence in ZK
+    assertNotNull(zkClient.get().exists("/foo", false));
+    assertNotNull(zkClient.get().exists("/foo/bar", false));
+    assertNotNull(zkClient.get().exists("/foo/bar/baz", false));
+
+    try {
+      zkClient.get().delete("/foo/bar/baz", -1 /* delete no matter what */);
+      fail("Expected CREATOR_ALL_ACL to be applied to created path and client2 mutations to be "
+           + "rejected");
+    } catch (NoAuthException e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testMagicVersionNumberAllowsUnconditionalUpdate() throws Exception {
+    String nodePath = "/foo";
+    ZooKeeperClient zkClient = createZkClient();
+
+    zkClient.get().create(nodePath, "init".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+        CreateMode.PERSISTENT);
+
+    Stat initStat = new Stat();
+    byte[] initialData = zkClient.get().getData(nodePath, false, initStat);
+    assertArrayEquals("init".getBytes(), initialData);
+
+    // bump the version
+    Stat rev1Stat = zkClient.get().setData(nodePath, "rev1".getBytes(), initStat.getVersion());
+
+    try {
+      zkClient.get().setData(nodePath, "rev2".getBytes(), initStat.getVersion());
+      fail("expected correct version to be required");
+    } catch (BadVersionException e) {
+      // expected
+    }
+
+    // expect using the correct version to work
+    Stat rev2Stat = zkClient.get().setData(nodePath, "rev2".getBytes(), rev1Stat.getVersion());
+    assertNotEquals(ZooKeeperUtils.ANY_VERSION, rev2Stat.getVersion());
+
+    zkClient.get().setData(nodePath, "force-write".getBytes(), ZooKeeperUtils.ANY_VERSION);
+    Stat forceWriteStat = new Stat();
+    byte[] forceWriteData = zkClient.get().getData(nodePath, false, forceWriteStat);
+    assertArrayEquals("force-write".getBytes(), forceWriteData);
+
+    assertTrue(forceWriteStat.getVersion() > rev2Stat.getVersion());
+    assertNotEquals(ZooKeeperUtils.ANY_VERSION, forceWriteStat.getVersion());
+  }
+
+  @Test
+  public void testNormalizingPath() throws Exception {
+    assertEquals("/", ZooKeeperUtils.normalizePath("/"));
+    assertEquals("/foo", ZooKeeperUtils.normalizePath("/foo/"));
+    assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("/foo//bar"));
+    assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("//foo/bar"));
+    assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("/foo/bar/"));
+    assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("/foo/bar//"));
+    assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("/foo/bar"));
+  }
+
+  @Test
+  public void testLenientPaths() {
+    assertEquals("/", ZooKeeperUtils.normalizePath("///"));
+    assertEquals("/a/group", ZooKeeperUtils.normalizePath("/a/group"));
+    assertEquals("/a/group", ZooKeeperUtils.normalizePath("/a/group/"));
+    assertEquals("/a/group", ZooKeeperUtils.normalizePath("/a//group"));
+    assertEquals("/a/group", ZooKeeperUtils.normalizePath("/a//group//"));
+
+    try {
+      ZooKeeperUtils.normalizePath("a/group");
+      fail("Relative paths should not be allowed.");
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+
+    try {
+      ZooKeeperUtils.normalizePath("/a/./group");
+      fail("Relative paths should not be allowed.");
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+
+    try {
+      ZooKeeperUtils.normalizePath("/a/../group");
+      fail("Relative paths should not be allowed.");
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/config/findbugs/excludeFilter.xml
----------------------------------------------------------------------
diff --git a/config/findbugs/excludeFilter.xml b/config/findbugs/excludeFilter.xml
index 5eaa11a..f7d5ae0 100644
--- a/config/findbugs/excludeFilter.xml
+++ b/config/findbugs/excludeFilter.xml
@@ -74,14 +74,6 @@ limitations under the License.
     <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS" />
   </Match>
 
-  <!-- We're forced to use the platform default encoding to generate a byte array from digest
-       credentials because the underlying ZooKeeper API dictates this - also noted in the
-       offending code. -->
-  <Match>
-    <Class name="org.apache.aurora.scheduler.discovery.Credentials" />
-    <Bug pattern="DM_DEFAULT_ENCODING" />
-  </Match>
-
   <!-- Technical debt. -->
   <Match>
     <Class name="org.apache.aurora.scheduler.log.mesos.MesosLog$LogStream" />

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/docs/features/service-discovery.md
----------------------------------------------------------------------
diff --git a/docs/features/service-discovery.md b/docs/features/service-discovery.md
index 511c96d..36823c8 100644
--- a/docs/features/service-discovery.md
+++ b/docs/features/service-discovery.md
@@ -6,7 +6,7 @@ the purpose of service discovery.  ServerSets use the Zookeeper [group membershi
 of which there are several reference implementations:
 
   - [C++](https://github.com/apache/mesos/blob/master/src/zookeeper/group.cpp)
-  - [Java](http://curator.apache.org/curator-recipes/group-member.html)
+  - [Java](https://github.com/twitter/commons/blob/master/src/java/com/twitter/common/zookeeper/ServerSetImpl.java#L221)
   - [Python](https://github.com/twitter/commons/blob/master/src/python/twitter/common/zookeeper/serverset/serverset.py#L51)
 
 These can also be used natively in Finagle using the [ZookeeperServerSetCluster](https://github.com/twitter/finagle/blob/master/finagle-serversets/src/main/scala/com/twitter/finagle/zookeeper/ZookeeperServerSetCluster.scala).

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/docs/reference/scheduler-configuration.md
----------------------------------------------------------------------
diff --git a/docs/reference/scheduler-configuration.md b/docs/reference/scheduler-configuration.md
index 8955653..d4e0a9a 100644
--- a/docs/reference/scheduler-configuration.md
+++ b/docs/reference/scheduler-configuration.md
@@ -252,5 +252,7 @@ Optional flags:
 	Launches an embedded zookeeper server for local testing causing -zk_endpoints to be ignored if specified.
 -zk_session_timeout (default (4, secs))
 	The ZooKeeper session timeout.
+-zk_use_curator (default true)
+	DEPRECATED: Uses Apache Curator as the zookeeper client; otherwise a copy of Twitter commons/zookeeper (the legacy library) is used.
 -------------------------------------------------------------------------
 ```

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
index 76209b1..4e354b6 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
@@ -46,8 +46,8 @@ import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.common.util.StateMachine;
 import org.apache.aurora.common.util.StateMachine.Transition;
-import org.apache.aurora.scheduler.discovery.SingletonService;
-import org.apache.aurora.scheduler.discovery.SingletonService.LeaderControl;
+import org.apache.aurora.common.zookeeper.SingletonService;
+import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.mesos.Driver;
@@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory;
 
 import static java.util.Objects.requireNonNull;
 
-import static org.apache.aurora.scheduler.discovery.SingletonService.LeadershipListener;
+import static org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener;
 
 /**
  * The central driver of the scheduler runtime lifecycle.  Handles the transitions from startup and

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
index e0d32de..43cc5b4 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
@@ -42,6 +42,8 @@ import org.apache.aurora.common.args.constraints.NotEmpty;
 import org.apache.aurora.common.args.constraints.NotNull;
 import org.apache.aurora.common.inject.Bindings;
 import org.apache.aurora.common.stats.Stats;
+import org.apache.aurora.common.zookeeper.SingletonService;
+import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener;
 import org.apache.aurora.gen.ServerInfo;
 import org.apache.aurora.scheduler.AppStartup;
 import org.apache.aurora.scheduler.SchedulerLifecycle;
@@ -50,8 +52,6 @@ import org.apache.aurora.scheduler.configuration.executor.ExecutorModule;
 import org.apache.aurora.scheduler.cron.quartz.CronModule;
 import org.apache.aurora.scheduler.discovery.FlaggedZooKeeperConfig;
 import org.apache.aurora.scheduler.discovery.ServiceDiscoveryModule;
-import org.apache.aurora.scheduler.discovery.SingletonService;
-import org.apache.aurora.scheduler.discovery.SingletonService.LeadershipListener;
 import org.apache.aurora.scheduler.events.WebhookModule;
 import org.apache.aurora.scheduler.http.HttpService;
 import org.apache.aurora.scheduler.log.mesos.MesosLogStreamModule;

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java
new file mode 100644
index 0000000..a1329fd
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.app;
+
+import java.io.Closeable;
+import java.util.function.Supplier;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.thrift.ServiceInstance;
+
+/**
+ * Monitors a service group's membership and supplies a live view of the most recent set.
+ */
+public interface ServiceGroupMonitor extends Supplier<ImmutableSet<ServiceInstance>>, Closeable {
+
+  /**
+   * Indicates a problem initiating monitoring of a service group.
+   */
+  class MonitorException extends Exception {
+    public MonitorException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Starts monitoring the service group.
+   *
+   * When the service group membership no longer needs to be maintained, this monitor should be
+   * {@link #close() closed}.
+   *
+   * @throws MonitorException if there is a problem initiating monitoring of the service group.
+   */
+  void start() throws MonitorException;
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java
new file mode 100644
index 0000000..339f63b
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.discovery;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import javax.inject.Singleton;
+
+import com.google.inject.Exposed;
+import com.google.inject.PrivateModule;
+import com.google.inject.Provides;
+
+import org.apache.aurora.common.net.pool.DynamicHostSet;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.zookeeper.ServerSetImpl;
+import org.apache.aurora.common.zookeeper.SingletonService;
+import org.apache.aurora.common.zookeeper.SingletonServiceImpl;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient;
+import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+import org.apache.zookeeper.data.ACL;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Binding module for utilities to advertise the network presence of the scheduler.
+ *
+ * Uses a fork of Twitter commons/zookeeper.
+ */
+class CommonsServiceDiscoveryModule extends PrivateModule {
+
+  private final String discoveryPath;
+  private final ZooKeeperConfig zooKeeperConfig;
+
+  CommonsServiceDiscoveryModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig) {
+    this.discoveryPath = ZooKeeperUtils.normalizePath(discoveryPath);
+    this.zooKeeperConfig = requireNonNull(zooKeeperConfig);
+  }
+
+  @Override
+  protected void configure() {
+    requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY);
+    requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY);
+
+    bind(ServiceGroupMonitor.class).to(CommonsServiceGroupMonitor.class).in(Singleton.class);
+    expose(ServiceGroupMonitor.class);
+  }
+
+  @Provides
+  @Singleton
+  ZooKeeperClient provideZooKeeperClient(
+      @ServiceDiscoveryBindings.ZooKeeper Iterable<InetSocketAddress> zooKeeperCluster) {
+
+    return new ZooKeeperClient(
+        zooKeeperConfig.getSessionTimeout(),
+        zooKeeperConfig.getCredentials(),
+        zooKeeperConfig.getChrootPath(),
+        zooKeeperCluster);
+  }
+
+  @Provides
+  @Singleton
+  ServerSetImpl provideServerSet(
+      ZooKeeperClient client,
+      @ServiceDiscoveryBindings.ZooKeeper List<ACL> zooKeeperAcls) {
+
+    return new ServerSetImpl(client, zooKeeperAcls, discoveryPath);
+  }
+
+  @Provides
+  @Singleton
+  DynamicHostSet<ServiceInstance> provideServerSet(ServerSetImpl serverSet) {
+    // Used for a type re-binding of the server set.
+    return serverSet;
+  }
+
+  // NB: We only take a ServerSetImpl instead of a ServerSet here to simplify binding.
+  @Provides
+  @Singleton
+  @Exposed
+  SingletonService provideSingletonService(
+      ZooKeeperClient client,
+      ServerSetImpl serverSet,
+      @ServiceDiscoveryBindings.ZooKeeper List<ACL> zookeeperAcls) {
+
+    return new SingletonServiceImpl(
+        serverSet,
+        SingletonServiceImpl.createSingletonCandidate(client, discoveryPath, zookeeperAcls));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java
new file mode 100644
index 0000000..9161455
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.discovery;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.inject.Inject;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.net.pool.DynamicHostSet;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+
+import static java.util.Objects.requireNonNull;
+
+class CommonsServiceGroupMonitor implements ServiceGroupMonitor {
+  private Optional<Command> closeCommand = Optional.empty();
+  private final DynamicHostSet<ServiceInstance> serverSet;
+  private final AtomicReference<ImmutableSet<ServiceInstance>> services =
+      new AtomicReference<>(ImmutableSet.of());
+
+  @Inject
+  CommonsServiceGroupMonitor(DynamicHostSet<ServiceInstance> serverSet) {
+    this.serverSet = requireNonNull(serverSet);
+  }
+
+  @Override
+  public void start() throws MonitorException {
+    try {
+      closeCommand = Optional.of(serverSet.watch(services::set));
+    } catch (DynamicHostSet.MonitorException e) {
+      throw new MonitorException("Unable to watch scheduler host set.", e);
+    }
+  }
+
+  @Override
+  public void close() {
+    closeCommand.ifPresent(Command::execute);
+  }
+
+  @Override
+  public ImmutableSet<ServiceInstance> get() {
+    return services.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/Credentials.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/Credentials.java b/src/main/java/org/apache/aurora/scheduler/discovery/Credentials.java
deleted file mode 100644
index 75d58e7..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/Credentials.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import java.util.Arrays;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-
-import org.apache.aurora.common.base.MorePreconditions;
-import org.apache.commons.lang.builder.EqualsBuilder;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Encapsulates a user's ZooKeeper credentials.
- */
-public final class Credentials {
-
-  /**
-   * Creates a set of credentials for the ZooKeeper digest authentication mechanism.
-   *
-   * @param username the username to authenticate with
-   * @param password the password to authenticate with
-   * @return a set of credentials that can be used to authenticate the zoo keeper client
-   */
-  public static Credentials digestCredentials(String username, String password) {
-    MorePreconditions.checkNotBlank(username);
-    Preconditions.checkNotNull(password);
-
-    // TODO(John Sirois): DigestAuthenticationProvider is broken - uses platform default charset
-    // (on server) and so we just have to hope here that clients are deployed in compatible jvms.
-    // Consider writing and installing a version of DigestAuthenticationProvider that controls its
-    // Charset explicitly.
-    return new Credentials("digest", (username + ":" + password).getBytes());
-  }
-
-  private final String authScheme;
-  private final byte[] authToken;
-
-  /**
-   * Creates a new set of credentials for the given ZooKeeper authentication scheme.
-   *
-   * @param scheme The name of the authentication scheme the {@code token} is valid in.
-   * @param token The authentication token for the given {@code scheme}.
-   */
-  public Credentials(String scheme, byte[] token) {
-    authScheme = MorePreconditions.checkNotBlank(scheme);
-    authToken = requireNonNull(token);
-  }
-
-  /**
-   * Returns the authentication scheme these credentials are for.
-   *
-   * @return the scheme these credentials are for.
-   */
-  public String scheme() {
-    return authScheme;
-  }
-
-  /**
-   * Returns the authentication token.
-   *
-   * @return the authentication token.
-   */
-  public byte[] token() {
-    return Arrays.copyOf(authToken, authToken.length);
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (!(o instanceof Credentials)) {
-      return false;
-    }
-
-    Credentials other = (Credentials) o;
-    return new EqualsBuilder()
-        .append(authScheme, other.scheme())
-        .append(authToken, other.token())
-        .isEquals();
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(authScheme, authToken);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java
index e690d14..999a542 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java
@@ -33,6 +33,10 @@ import org.apache.aurora.common.net.InetSocketAddressHelper;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.zookeeper.Credentials;
+import org.apache.aurora.common.zookeeper.ServerSet;
+import org.apache.aurora.common.zookeeper.SingletonService;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -64,7 +68,7 @@ class CuratorServiceDiscoveryModule extends PrivateModule {
     requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY);
     requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY);
 
-    bind(new TypeLiteral<Codec<ServiceInstance>>() { }).toInstance(JsonCodec.INSTANCE);
+    bind(new TypeLiteral<Codec<ServiceInstance>>() { }).toInstance(ServerSet.JSON_CODEC);
   }
 
   @Provides
@@ -102,7 +106,7 @@ class CuratorServiceDiscoveryModule extends PrivateModule {
 
     if (zooKeeperConfig.getCredentials().isPresent()) {
       Credentials credentials = zooKeeperConfig.getCredentials().get();
-      builder.authorization(credentials.scheme(), credentials.token());
+      builder.authorization(credentials.scheme(), credentials.authToken());
     }
 
     CuratorFramework curatorFramework = builder.build();

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java
index db886df..0b86fb6 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java
@@ -24,6 +24,7 @@ import org.apache.aurora.GuavaUtils;
 import org.apache.aurora.common.io.Codec;
 import org.apache.aurora.common.thrift.ServiceInstance;
 import org.apache.aurora.scheduler.app.SchedulerMain;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.utils.ZKPaths;

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java
index 321bbb3..c378172 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java
@@ -27,6 +27,7 @@ import org.apache.aurora.common.io.Codec;
 import org.apache.aurora.common.thrift.Endpoint;
 import org.apache.aurora.common.thrift.ServiceInstance;
 import org.apache.aurora.common.thrift.Status;
+import org.apache.aurora.common.zookeeper.SingletonService;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.leader.LeaderLatch;
 import org.apache.curator.framework.recipes.leader.LeaderLatchListener;

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java b/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java
index d5019bf..e8aafe4 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java
@@ -15,10 +15,10 @@ package org.apache.aurora.scheduler.discovery;
 
 import java.net.InetSocketAddress;
 import java.util.List;
-import java.util.Optional;
 
 import javax.annotation.Nullable;
 
+import com.google.common.base.Optional;
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 
@@ -27,12 +27,23 @@ import org.apache.aurora.common.args.CmdLine;
 import org.apache.aurora.common.args.constraints.NotEmpty;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.Credentials;
+import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A factory that creates a {@link ZooKeeperConfig} instance based on command line argument
  * values.
  */
 public final class FlaggedZooKeeperConfig {
+  private static final Logger LOG = LoggerFactory.getLogger(FlaggedZooKeeperConfig.class);
+
+  @CmdLine(name = "zk_use_curator",
+      help = "DEPRECATED: Uses Apache Curator as the zookeeper client; otherwise a copy of Twitter "
+          + "commons/zookeeper (the legacy library) is used.")
+  private static final Arg<Boolean> USE_CURATOR = Arg.create(true);
+
   @CmdLine(name = "zk_in_proc",
       help = "Launches an embedded zookeeper server for local testing causing -zk_endpoints "
           + "to be ignored if specified.")
@@ -63,9 +74,13 @@ public final class FlaggedZooKeeperConfig {
    * @return Configuration instance.
    */
   public static ZooKeeperConfig create() {
+    if (USE_CURATOR.hasAppliedValue()) {
+      LOG.warn("The -zk_use_curator flag is deprecated and will be removed in a future release.");
+    }
     return new ZooKeeperConfig(
+        USE_CURATOR.get(),
         ZK_ENDPOINTS.get(),
-        Optional.ofNullable(CHROOT_PATH.get()),
+        Optional.fromNullable(CHROOT_PATH.get()),
         IN_PROCESS.get(),
         SESSION_TIMEOUT.get(),
         getCredentials(DIGEST_CREDENTIALS.get()));
@@ -73,7 +88,7 @@ public final class FlaggedZooKeeperConfig {
 
   private static Optional<Credentials> getCredentials(@Nullable String userAndPass) {
     if (userAndPass == null) {
-      return Optional.empty();
+      return Optional.absent();
     }
 
     List<String> parts = ImmutableList.copyOf(Splitter.on(":").split(userAndPass));

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/JsonCodec.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/JsonCodec.java b/src/main/java/org/apache/aurora/scheduler/discovery/JsonCodec.java
deleted file mode 100644
index 9d22b76..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/JsonCodec.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.nio.charset.Charset;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.gson.JsonIOException;
-import com.google.gson.JsonParseException;
-
-import org.apache.aurora.common.io.Codec;
-import org.apache.aurora.common.thrift.Endpoint;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.thrift.Status;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Encodes a {@link ServiceInstance} as a JSON object.
- */
-class JsonCodec implements Codec<ServiceInstance> {
-
-  private static void assertRequiredField(String fieldName, Object fieldValue) {
-    if (fieldValue == null) {
-      throw new JsonParseException(String.format("Field %s is required", fieldName));
-    }
-  }
-
-  private static class EndpointSchema {
-    private final String host;
-    private final Integer port;
-
-    EndpointSchema(Endpoint endpoint) {
-      host = endpoint.getHost();
-      port = endpoint.getPort();
-    }
-
-    Endpoint asEndpoint() {
-      assertRequiredField("host", host);
-      assertRequiredField("port", port);
-
-      return new Endpoint(host, port);
-    }
-  }
-
-  private static class ServiceInstanceSchema {
-    private final EndpointSchema serviceEndpoint;
-    private final Map<String, EndpointSchema> additionalEndpoints;
-    private final Status status;
-    @Nullable private final Integer shard;
-
-    ServiceInstanceSchema(ServiceInstance instance) {
-      serviceEndpoint = new EndpointSchema(instance.getServiceEndpoint());
-      if (instance.isSetAdditionalEndpoints()) {
-        additionalEndpoints =
-            Maps.transformValues(instance.getAdditionalEndpoints(), EndpointSchema::new);
-      } else {
-        additionalEndpoints = ImmutableMap.of();
-      }
-      status  = instance.getStatus();
-      shard = instance.isSetShard() ? instance.getShard() : null;
-    }
-
-    ServiceInstance asServiceInstance() {
-      assertRequiredField("serviceEndpoint", serviceEndpoint);
-      assertRequiredField("status", status);
-
-      Map<String, EndpointSchema> extraEndpoints =
-          additionalEndpoints == null ? ImmutableMap.of() : additionalEndpoints;
-
-      ServiceInstance instance =
-          new ServiceInstance(
-              serviceEndpoint.asEndpoint(),
-              Maps.transformValues(extraEndpoints, EndpointSchema::asEndpoint),
-              status);
-      if (shard != null) {
-        instance.setShard(shard);
-      }
-      return instance;
-    }
-  }
-
-  /**
-   * The encoding for service instance data in ZooKeeper expected by Aurora clients.
-   */
-  static final Codec<ServiceInstance> INSTANCE = new JsonCodec();
-
-  private static final Charset ENCODING = Charsets.UTF_8;
-
-  private final Gson gson;
-
-  private JsonCodec() {
-    this(new Gson());
-  }
-
-  JsonCodec(Gson gson) {
-    this.gson = requireNonNull(gson);
-  }
-
-  @Override
-  public void serialize(ServiceInstance instance, OutputStream sink) throws IOException {
-    Writer writer = new OutputStreamWriter(sink, ENCODING);
-    try {
-      gson.toJson(new ServiceInstanceSchema(instance), writer);
-    } catch (JsonIOException e) {
-      throw new IOException(String.format("Problem serializing %s to JSON", instance), e);
-    }
-    writer.flush();
-  }
-
-  @Override
-  public ServiceInstance deserialize(InputStream source) throws IOException {
-    InputStreamReader reader = new InputStreamReader(source, ENCODING);
-    try {
-      @Nullable ServiceInstanceSchema schema = gson.fromJson(reader, ServiceInstanceSchema.class);
-      if (schema == null) {
-        throw new IOException("JSON did not include a ServiceInstance object");
-      }
-      return schema.asServiceInstance();
-    } catch (JsonParseException e) {
-      throw new IOException("Problem parsing JSON ServiceInstance.", e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java
index b7ca62c..07a6302 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java
@@ -25,14 +25,16 @@ import com.google.common.io.Files;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
+import com.google.inject.Module;
 import com.google.inject.Provider;
 import com.google.inject.Provides;
 import com.google.inject.binder.LinkedBindingBuilder;
 
 import org.apache.aurora.common.application.ShutdownRegistry;
 import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
+import org.apache.aurora.common.zookeeper.testing.ZooKeeperTestServer;
 import org.apache.aurora.scheduler.SchedulerServicesModule;
-import org.apache.aurora.scheduler.discovery.testing.ZooKeeperTestServer;
 import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,15 +46,15 @@ import static java.util.Objects.requireNonNull;
  */
 public class ServiceDiscoveryModule extends AbstractModule {
 
-  private static final Logger LOG = LoggerFactory.getLogger(ServiceDiscoveryModule.class);
+  private static final Logger LOG = LoggerFactory.getLogger(CommonsServiceDiscoveryModule.class);
 
   private final ZooKeeperConfig zooKeeperConfig;
   private final String discoveryPath;
 
   /**
    * Creates a Guice module that will bind a
-   * {@link SingletonService} for scheduler leader election and a
-   * {@link ServiceGroupMonitor} that can be used to find the
+   * {@link org.apache.aurora.common.zookeeper.SingletonService} for scheduler leader election and a
+   * {@link org.apache.aurora.scheduler.app.ServiceGroupMonitor} that can be used to find the
    * leading scheduler.
    *
    * @param zooKeeperConfig The ZooKeeper client configuration to use to interact with ZooKeeper.
@@ -80,7 +82,15 @@ public class ServiceDiscoveryModule extends AbstractModule {
       clusterBinder.toInstance(zooKeeperConfig.getServers());
     }
 
-    install(new CuratorServiceDiscoveryModule(discoveryPath, zooKeeperConfig));
+    install(discoveryModule());
+  }
+
+  private Module discoveryModule() {
+    if (zooKeeperConfig.isUseCurator()) {
+      return new CuratorServiceDiscoveryModule(discoveryPath, zooKeeperConfig);
+    } else {
+      return new CommonsServiceDiscoveryModule(discoveryPath, zooKeeperConfig);
+    }
   }
 
   @Provides

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/ServiceGroupMonitor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceGroupMonitor.java
deleted file mode 100644
index fea896c..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceGroupMonitor.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import java.io.Closeable;
-import java.util.function.Supplier;
-
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.common.thrift.ServiceInstance;
-
-/**
- * Monitors a service group's membership and supplies a live view of the most recent set.
- */
-public interface ServiceGroupMonitor extends Supplier<ImmutableSet<ServiceInstance>>, Closeable {
-
-  /**
-   * Indicates a problem initiating monitoring of a service group.
-   */
-  class MonitorException extends Exception {
-    MonitorException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  /**
-   * Starts monitoring the service group.
-   *
-   * When the service group membership no longer needs to be maintained, this monitor should be
-   * {@link #close() closed}.
-   *
-   * @throws MonitorException if there is a problem initiating monitoring of the service group.
-   */
-  void start() throws MonitorException;
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/SingletonService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/SingletonService.java b/src/main/java/org/apache/aurora/scheduler/discovery/SingletonService.java
deleted file mode 100644
index adbc318..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/SingletonService.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-
-/**
- * A service that uses master election to only allow a single service instance to be active amongst
- * a set of potential servers at a time.
- */
-public interface SingletonService {
-
-  /**
-   * Indicates an error attempting to lead a group of servers.
-   */
-  class LeadException extends Exception {
-    LeadException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  /**
-   * Indicates an error attempting to advertise leadership of a group of servers.
-   */
-  class AdvertiseException extends Exception {
-    AdvertiseException(String message) {
-      super(message);
-    }
-
-    AdvertiseException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  /**
-   * Indicates an error attempting to leave a group of servers, abdicating leadership of the group.
-   */
-  class LeaveException extends Exception {
-    LeaveException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  /**
-   * Attempts to lead the singleton service.
-   *
-   * @param endpoint The primary endpoint to register as a leader candidate in the service.
-   * @param additionalEndpoints Additional endpoints that are available on the host.
-   * @param listener Handler to call when the candidate is elected or defeated.
-   * @throws LeadException If there was a problem joining or watching the ZooKeeper group.
-   * @throws InterruptedException If the thread watching/joining the group was interrupted.
-   */
-  void lead(
-      InetSocketAddress endpoint,
-      Map<String, InetSocketAddress> additionalEndpoints,
-      LeadershipListener listener)
-      throws LeadException, InterruptedException;
-
-  /**
-   * A listener to be notified of changes in the leadership status.
-   * Implementers should be careful to avoid blocking operations in these callbacks.
-   */
-  interface LeadershipListener {
-
-    /**
-     * Notifies the listener that is is current leader.
-     *
-     * @param control A controller handle to advertise and/or leave advertised presence.
-     */
-    void onLeading(LeaderControl control);
-
-    /**
-     * Notifies the listener that it is no longer leader.
-     */
-    void onDefeated();
-  }
-
-  /**
-   * A controller for the state of the leader.  This will be provided to the leader upon election,
-   * which allows the leader to decide when to advertise as leader of the server set and terminate
-   * leadership at will.
-   */
-  interface LeaderControl {
-
-    /**
-     * Advertises the leader's server presence to clients.
-     *
-     * @throws AdvertiseException If there was an error advertising the singleton leader to clients
-     *     of the server set.
-     * @throws InterruptedException If interrupted while advertising.
-     */
-    void advertise() throws AdvertiseException, InterruptedException;
-
-    /**
-     * Leaves candidacy for leadership, removing advertised server presence if applicable.
-     *
-     * @throws LeaveException If the leader's status could not be updated or there was an error
-     *     abdicating server set leadership.
-     */
-    void leave() throws LeaveException;
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java b/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java
index acb7905..3f32a62 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java
@@ -14,11 +14,14 @@
 package org.apache.aurora.scheduler.discovery;
 
 import java.net.InetSocketAddress;
-import java.util.Optional;
+
+import com.google.common.base.Optional;
 
 import org.apache.aurora.common.base.MorePreconditions;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.Credentials;
+import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
 
 import static java.util.Objects.requireNonNull;
 
@@ -32,18 +35,21 @@ public class ZooKeeperConfig {
   /**
    * Creates a new client configuration with defaults for the session timeout and credentials.
    *
+   * @param useCurator {@code true} to use Apache Curator; otherwise commons/zookeeper is used.
    * @param servers ZooKeeper server addresses.
    * @return A new configuration.
    */
-  public static ZooKeeperConfig create(Iterable<InetSocketAddress> servers) {
+  public static ZooKeeperConfig create(boolean useCurator, Iterable<InetSocketAddress> servers) {
     return new ZooKeeperConfig(
+        useCurator,
         servers,
-        Optional.empty(), // chrootPath
+        Optional.absent(), // chrootPath
         false,
         ZooKeeperUtils.DEFAULT_ZK_SESSION_TIMEOUT,
-        Optional.empty()); // credentials
+        Optional.absent()); // credentials
   }
 
+  private final boolean useCurator;
   private final Iterable<InetSocketAddress> servers;
   private final boolean inProcess;
   private final Amount<Integer, Time> sessionTimeout;
@@ -60,12 +66,14 @@ public class ZooKeeperConfig {
    * @param credentials ZooKeeper authentication credentials.
    */
   ZooKeeperConfig(
+      boolean useCurator,
       Iterable<InetSocketAddress> servers,
       Optional<String> chrootPath,
       boolean inProcess,
       Amount<Integer, Time> sessionTimeout,
       Optional<Credentials> credentials) {
 
+    this.useCurator = useCurator;
     this.servers = MorePreconditions.checkNotBlank(servers);
     this.chrootPath = requireNonNull(chrootPath);
     this.inProcess = inProcess;
@@ -82,6 +90,7 @@ public class ZooKeeperConfig {
    */
   public ZooKeeperConfig withCredentials(Credentials newCredentials) {
     return new ZooKeeperConfig(
+        useCurator,
         servers,
         chrootPath,
         inProcess,
@@ -89,6 +98,10 @@ public class ZooKeeperConfig {
         Optional.of(newCredentials));
   }
 
+  boolean isUseCurator() {
+    return useCurator;
+  }
+
   public Iterable<InetSocketAddress> getServers() {
     return servers;
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperUtils.java b/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperUtils.java
deleted file mode 100644
index 211aa50..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperUtils.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import com.google.common.collect.ImmutableList;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.ACL;
-
-/**
- * Utilities for dealing with ZooKeeper.
- */
-final class ZooKeeperUtils {
-
-  /**
-   * An appropriate default session timeout for ZooKeeper clusters.
-   */
-  static final Amount<Integer, Time> DEFAULT_ZK_SESSION_TIMEOUT = Amount.of(4, Time.SECONDS);
-
-  /**
-   * An ACL that gives all permissions any user authenticated or not.
-   */
-  static final ImmutableList<ACL> OPEN_ACL_UNSAFE =
-      ImmutableList.copyOf(Ids.OPEN_ACL_UNSAFE);
-
-  /**
-   * An ACL that gives all permissions to node creators and read permissions only to everyone else.
-   */
-  static final ImmutableList<ACL> EVERYONE_READ_CREATOR_ALL =
-      ImmutableList.<ACL>builder()
-          .addAll(Ids.CREATOR_ALL_ACL)
-          .addAll(Ids.READ_ACL_UNSAFE)
-          .build();
-
-  private ZooKeeperUtils() {
-    // utility
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/testing/BaseZooKeeperTest.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/testing/BaseZooKeeperTest.java b/src/main/java/org/apache/aurora/scheduler/discovery/testing/BaseZooKeeperTest.java
deleted file mode 100644
index d84037e..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/testing/BaseZooKeeperTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery.testing;
-
-import org.apache.aurora.common.testing.TearDownTestCase;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * A base-class for in-process zookeeper tests.
- */
-public abstract class BaseZooKeeperTest extends TearDownTestCase {
-
-  private ZooKeeperTestServer zkTestServer;
-
-  @Rule
-  public TemporaryFolder tmpFolder = new TemporaryFolder();
-
-  @Before
-  public final void setUp() throws Exception {
-    zkTestServer = new ZooKeeperTestServer(tmpFolder.newFolder(), tmpFolder.newFolder());
-    addTearDown(zkTestServer::stop);
-    zkTestServer.startNetwork();
-  }
-
-  /**
-   * Returns the running in-process ZooKeeper server.
-   *
-   * @return The in-process ZooKeeper server.
-   */
-  protected final ZooKeeperTestServer getServer() {
-    return zkTestServer;
-  }
-
-  /**
-   * Returns the current port to connect to the in-process zookeeper instance.
-   */
-  protected final int getPort() {
-    return getServer().getPort();
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/testing/ZooKeeperTestServer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/testing/ZooKeeperTestServer.java b/src/main/java/org/apache/aurora/scheduler/discovery/testing/ZooKeeperTestServer.java
deleted file mode 100644
index a7bb48b..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/testing/ZooKeeperTestServer.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery.testing;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.zookeeper.server.NIOServerCnxnFactory;
-import org.apache.zookeeper.server.ServerCnxnFactory;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.ZooKeeperServer.BasicDataTreeBuilder;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-
-/**
- * A helper class for starting in-process ZooKeeper server and clients.
- *
- * <p>This is ONLY meant to be used for testing.
- */
-public class ZooKeeperTestServer {
-
-  private final File dataDir;
-  private final File snapDir;
-
-  private ZooKeeperServer zooKeeperServer;
-  private ServerCnxnFactory connectionFactory;
-  private int port;
-
-  public ZooKeeperTestServer(File dataDir, File snapDir) {
-    this.dataDir = Preconditions.checkNotNull(dataDir);
-    this.snapDir = Preconditions.checkNotNull(snapDir);
-  }
-
-  /**
-   * Starts zookeeper up on an ephemeral port.
-   */
-  public void startNetwork() throws IOException, InterruptedException {
-    zooKeeperServer =
-        new ZooKeeperServer(
-            new FileTxnSnapLog(dataDir, snapDir),
-            new BasicDataTreeBuilder()) {
-
-          // TODO(John Sirois): Introduce a builder to configure the in-process server if and when
-          // some folks need JMX for in-process tests.
-          @Override protected void registerJMX() {
-            // noop
-          }
-        };
-
-    connectionFactory = new NIOServerCnxnFactory();
-    connectionFactory.configure(
-        new InetSocketAddress(port),
-        60 /* Semi-arbitrary, max 60 connections is the default used by NIOServerCnxnFactory */);
-    connectionFactory.startup(zooKeeperServer);
-    port = zooKeeperServer.getClientPort();
-  }
-
-  /**
-   * Stops the zookeeper server.
-   */
-  public void stop() {
-    if (connectionFactory != null) {
-      connectionFactory.shutdown(); // Also shuts down zooKeeperServer.
-      connectionFactory = null;
-    }
-  }
-
-  /**
-   * Expires the client session with the given {@code sessionId}.
-   *
-   * @param sessionId The id of the client session to expire.
-   */
-  public final void expireClientSession(long sessionId) {
-    zooKeeperServer.closeSession(sessionId);
-  }
-
-  /**
-   * Returns the current port to connect to the in-process zookeeper instance.
-   */
-  public final int getPort() {
-    checkEphemeralPortAssigned();
-    return port;
-  }
-
-  private void checkEphemeralPortAssigned() {
-    Preconditions.checkState(port > 0, "startNetwork must be called first");
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java b/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java
index af8567f..53ebc0b 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java
@@ -64,7 +64,7 @@ import org.apache.aurora.common.net.http.handlers.TimeSeriesDataSource;
 import org.apache.aurora.common.net.http.handlers.VarsHandler;
 import org.apache.aurora.common.net.http.handlers.VarsJsonHandler;
 import org.apache.aurora.scheduler.SchedulerServicesModule;
-import org.apache.aurora.scheduler.discovery.ServiceGroupMonitor.MonitorException;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor.MonitorException;
 import org.apache.aurora.scheduler.http.api.ApiModule;
 import org.apache.aurora.scheduler.http.api.security.HttpSecurityModule;
 import org.apache.aurora.scheduler.thrift.ThriftModule;

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java b/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java
index 662d6d5..bc0e2a8 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java
@@ -27,8 +27,8 @@ import com.google.common.net.HostAndPort;
 
 import org.apache.aurora.common.thrift.Endpoint;
 import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.scheduler.discovery.ServiceGroupMonitor;
-import org.apache.aurora.scheduler.discovery.ServiceGroupMonitor.MonitorException;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor.MonitorException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java b/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java
index c7c0387..6704a32 100644
--- a/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java
@@ -33,8 +33,8 @@ import org.apache.aurora.common.args.CmdLine;
 import org.apache.aurora.common.net.InetSocketAddressHelper;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.Credentials;
 import org.apache.aurora.gen.storage.LogEntry;
-import org.apache.aurora.scheduler.discovery.Credentials;
 import org.apache.aurora.scheduler.discovery.ZooKeeperConfig;
 import org.apache.aurora.scheduler.log.mesos.LogInterface.ReaderInterface;
 import org.apache.aurora.scheduler.log.mesos.LogInterface.WriterInterface;
@@ -157,7 +157,7 @@ public class MesosLogStreamModule extends PrivateModule {
           zkClientConfig.getSessionTimeout().getUnit().getTimeUnit(),
           zkLogGroupPath,
           zkCredentials.scheme(),
-          zkCredentials.token());
+          zkCredentials.authToken());
     } else {
       return new Log(
           QUORUM_SIZE.get(),

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
index 4324ea9..051c520 100644
--- a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
@@ -21,9 +21,9 @@ import org.apache.aurora.common.application.ShutdownRegistry;
 import org.apache.aurora.common.base.Command;
 import org.apache.aurora.common.base.ExceptionalCommand;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl;
+import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener;
 import org.apache.aurora.scheduler.SchedulerLifecycle.DelayedActions;
-import org.apache.aurora.scheduler.discovery.SingletonService.LeaderControl;
-import org.apache.aurora.scheduler.discovery.SingletonService.LeadershipListener;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
 import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.storage.Storage.StorageException;

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index 84d7753..29a3b4a 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -17,7 +17,6 @@ import java.io.File;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -44,6 +43,10 @@ import org.apache.aurora.GuavaUtils;
 import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
 import org.apache.aurora.common.application.Lifecycle;
 import org.apache.aurora.common.stats.Stats;
+import org.apache.aurora.common.zookeeper.Credentials;
+import org.apache.aurora.common.zookeeper.ServerSetImpl;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
 import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduleStatus;
@@ -60,11 +63,8 @@ import org.apache.aurora.scheduler.AppStartup;
 import org.apache.aurora.scheduler.TierModule;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
-import org.apache.aurora.scheduler.discovery.Credentials;
 import org.apache.aurora.scheduler.discovery.ServiceDiscoveryModule;
-import org.apache.aurora.scheduler.discovery.ServiceGroupMonitor;
 import org.apache.aurora.scheduler.discovery.ZooKeeperConfig;
-import org.apache.aurora.scheduler.discovery.testing.BaseZooKeeperTest;
 import org.apache.aurora.scheduler.log.Log;
 import org.apache.aurora.scheduler.log.Log.Entry;
 import org.apache.aurora.scheduler.log.Log.Position;
@@ -107,9 +107,10 @@ import static org.easymock.EasyMock.createControl;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class SchedulerIT extends BaseZooKeeperTest {
+public class SchedulerIT extends BaseZooKeeperClientTest {
 
   private static final Logger LOG = LoggerFactory.getLogger(SchedulerIT.class);
 
@@ -144,6 +145,7 @@ public class SchedulerIT extends BaseZooKeeperTest {
   private Stream logStream;
   private StreamMatcher streamMatcher;
   private EntrySerializer entrySerializer;
+  private ZooKeeperClient zkClient;
   private File backupDir;
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -171,9 +173,11 @@ public class SchedulerIT extends BaseZooKeeperTest {
     entrySerializer = new EntrySerializer.EntrySerializerImpl(
         LogStorageModule.MAX_LOG_ENTRY_SIZE.get(),
         Hashing.md5());
+
+    zkClient = createZkClient();
   }
 
-  private Callable<Void> startScheduler() throws Exception {
+  private void startScheduler() throws Exception {
     // TODO(wfarner): Try to accomplish all this by subclassing SchedulerMain and actually using
     // AppLauncher.
     Module testModule = new AbstractModule() {
@@ -198,8 +202,10 @@ public class SchedulerIT extends BaseZooKeeperTest {
     };
     ZooKeeperConfig zkClientConfig =
         ZooKeeperConfig.create(
+            true, // useCurator
             ImmutableList.of(InetSocketAddress.createUnresolved("localhost", getPort())))
             .withCredentials(Credentials.digestCredentials("mesos", "mesos"));
+    SchedulerMain main = SchedulerMain.class.newInstance();
     Injector injector = Guice.createInjector(
         ImmutableList.<Module>builder()
             .add(SchedulerMain.getUniversalModule())
@@ -209,8 +215,8 @@ public class SchedulerIT extends BaseZooKeeperTest {
             .add(testModule)
             .build()
     );
-    SchedulerMain main = new SchedulerMain();
     injector.injectMembers(main);
+    Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
 
     executor.submit(() -> {
       try {
@@ -220,32 +226,28 @@ public class SchedulerIT extends BaseZooKeeperTest {
         executor.shutdownNow();
       }
     });
-
-    Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
     addTearDown(() -> {
       lifecycle.shutdown();
       MoreExecutors.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
     });
-
     injector.getInstance(Key.get(GuavaUtils.ServiceManagerIface.class, AppStartup.class))
         .awaitHealthy();
+  }
 
-    ServiceGroupMonitor schedulerMonitor = injector.getInstance(ServiceGroupMonitor.class);
-    CountDownLatch schedulerReady = new CountDownLatch(1);
+  private void awaitSchedulerReady() throws Exception {
     executor.submit(() -> {
-      while (schedulerMonitor.get().isEmpty()) {
-        try {
-          Thread.sleep(10);
-        } catch (InterruptedException e) {
-          throw new RuntimeException(e);
+      ServerSetImpl schedulerService = new ServerSetImpl(zkClient, SERVERSET_PATH);
+      final CountDownLatch schedulerReady = new CountDownLatch(1);
+      schedulerService.watch(hostSet -> {
+        if (!hostSet.isEmpty()) {
+          schedulerReady.countDown();
         }
-      }
-      schedulerReady.countDown();
-    });
-    return () -> {
-      schedulerReady.await();
+      });
+      // A timeout is used because certain types of assertion errors (mocks) will not surface
+      // until the main test thread exits this body of code.
+      assertTrue(schedulerReady.await(5L, TimeUnit.MINUTES));
       return null;
-    };
+    }).get();
   }
 
   private final AtomicInteger curPosition = new AtomicInteger();
@@ -330,14 +332,14 @@ public class SchedulerIT extends BaseZooKeeperTest {
     expect(driver.stop(true)).andReturn(Status.DRIVER_STOPPED).anyTimes();
 
     control.replay();
-    Callable<Void> awaitSchedulerReady = startScheduler();
+    startScheduler();
 
     driverStarted.await();
     scheduler.getValue().registered(driver,
         FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(),
         MasterInfo.getDefaultInstance());
 
-    awaitSchedulerReady.call();
+    awaitSchedulerReady();
 
     assertEquals(0L, Stats.<Long>getVariable("task_store_PENDING").read().longValue());
     assertEquals(1L, Stats.<Long>getVariable("task_store_ASSIGNED").read().longValue());

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java
new file mode 100644
index 0000000..d90192b
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.discovery;
+
+import java.net.InetSocketAddress;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.testing.TearDownTestCase;
+import org.apache.aurora.common.zookeeper.Credentials;
+import org.apache.aurora.common.zookeeper.SingletonService;
+import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+
+abstract class AbstractDiscoveryModuleTest extends TearDownTestCase {
+
+  @Test
+  public void testBindingContract() {
+    ZooKeeperConfig zooKeeperConfig =
+        new ZooKeeperConfig(
+            isCurator(),
+            ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42)),
+            Optional.of("/chroot"),
+            false, // inProcess
+            Amount.of(1, Time.DAYS),
+            Optional.of(Credentials.digestCredentials("test", "user")));
+
+    Injector injector =
+        Guice.createInjector(
+            new AbstractModule() {
+              @Override
+              protected void configure() {
+                bind(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY)
+                    .toInstance(
+                        ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42)));
+                bind(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY)
+                    .toInstance(ZooKeeperUtils.OPEN_ACL_UNSAFE);
+
+                bindExtraRequirements(binder());
+              }
+            },
+            createModule("/discovery/path", zooKeeperConfig));
+
+    assertNotNull(injector.getBinding(SingletonService.class).getProvider().get());
+    assertNotNull(injector.getBinding(ServiceGroupMonitor.class).getProvider().get());
+  }
+
+  void bindExtraRequirements(Binder binder) {
+    // Noop.
+  }
+
+  abstract Module createModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig);
+
+  abstract boolean isCurator();
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java
index 9f86add..a2b4125 100644
--- a/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java
@@ -21,10 +21,13 @@ import java.util.function.Predicate;
 
 import com.google.common.collect.ImmutableMap;
 
+import org.apache.aurora.common.io.Codec;
 import org.apache.aurora.common.thrift.Endpoint;
 import org.apache.aurora.common.thrift.ServiceInstance;
 import org.apache.aurora.common.thrift.Status;
-import org.apache.aurora.scheduler.discovery.testing.BaseZooKeeperTest;
+import org.apache.aurora.common.zookeeper.ServerSet;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
@@ -35,6 +38,7 @@ class BaseCuratorDiscoveryTest extends BaseZooKeeperTest {
 
   static final String GROUP_PATH = "/group/root";
   static final String MEMBER_TOKEN = "member_";
+  static final Codec<ServiceInstance> CODEC = ServerSet.JSON_CODEC;
   static final int PRIMARY_PORT = 42;
 
   private CuratorFramework client;
@@ -51,7 +55,7 @@ class BaseCuratorDiscoveryTest extends BaseZooKeeperTest {
     groupCache.getListenable().addListener((c, event) -> groupEvents.put(event));
 
     Predicate<String> memberSelector = name -> name.contains(MEMBER_TOKEN);
-    groupMonitor = new CuratorServiceGroupMonitor(groupCache, memberSelector, JsonCodec.INSTANCE);
+    groupMonitor = new CuratorServiceGroupMonitor(groupCache, memberSelector, ServerSet.JSON_CODEC);
   }
 
   final CuratorFramework startNewClient() {
@@ -97,7 +101,7 @@ class BaseCuratorDiscoveryTest extends BaseZooKeeperTest {
 
   final byte[] serialize(ServiceInstance serviceInstance) throws IOException {
     ByteArrayOutputStream sink = new ByteArrayOutputStream();
-    JsonCodec.INSTANCE.serialize(serviceInstance, sink);
+    CODEC.serialize(serviceInstance, sink);
     return sink.toByteArray();
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java
new file mode 100644
index 0000000..7a4c4dd
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.discovery;
+
+import com.google.inject.Module;
+
+public class CommonsDiscoveryModuleTest extends AbstractDiscoveryModuleTest {
+
+  @Override
+  Module createModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig) {
+    return new CommonsServiceDiscoveryModule(discoveryPath, zooKeeperConfig);
+  }
+
+  @Override
+  boolean isCurator() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java
new file mode 100644
index 0000000..42a2224
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.discovery;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.net.pool.DynamicHostSet;
+import org.apache.aurora.common.net.pool.DynamicHostSet.HostChangeMonitor;
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+import org.easymock.Capture;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class CommonsServiceGroupMonitorTest extends EasyMockTest {
+
+  private DynamicHostSet<ServiceInstance> serverSet;
+  private Capture<HostChangeMonitor<ServiceInstance>> hostChangeMonitorCapture;
+  private Command stopCommand;
+
+  @Before
+  public void setUp() throws Exception {
+    serverSet = createMock(new Clazz<DynamicHostSet<ServiceInstance>>() { });
+    hostChangeMonitorCapture = createCapture();
+    stopCommand = createMock(Command.class);
+  }
+
+  private void expectSuccessfulWatch() throws Exception {
+    expect(serverSet.watch(capture(hostChangeMonitorCapture))).andReturn(stopCommand);
+  }
+
+  private void expectFailedWatch() throws Exception {
+    DynamicHostSet.MonitorException watchError =
+        new DynamicHostSet.MonitorException(
+            "Problem watching service group",
+            new RuntimeException());
+    expect(serverSet.watch(capture(hostChangeMonitorCapture))).andThrow(watchError);
+  }
+
+  @Test
+  public void testNominalLifecycle() throws Exception {
+    expectSuccessfulWatch();
+
+    stopCommand.execute();
+    expectLastCall();
+
+    control.replay();
+
+    CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet);
+    groupMonitor.start();
+    groupMonitor.close();
+  }
+
+  @Test
+  public void testExceptionalLifecycle() throws Exception {
+    expectFailedWatch();
+    control.replay();
+
+    CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet);
+    try {
+      groupMonitor.start();
+      fail();
+    } catch (ServiceGroupMonitor.MonitorException e) {
+      // expected
+    }
+
+    // Close on a non-started monitor should be allowed.
+    groupMonitor.close();
+  }
+
+  @Test
+  public void testNoHosts() throws Exception {
+    expectSuccessfulWatch();
+    control.replay();
+
+    CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet);
+    assertEquals(ImmutableSet.of(), groupMonitor.get());
+
+    groupMonitor.start();
+    assertEquals(ImmutableSet.of(), groupMonitor.get());
+
+    hostChangeMonitorCapture.getValue().onChange(ImmutableSet.of());
+    assertEquals(ImmutableSet.of(), groupMonitor.get());
+  }
+
+  @Test
+  public void testHostUpdates() throws Exception {
+    expectSuccessfulWatch();
+    control.replay();
+
+    CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet);
+    groupMonitor.start();
+
+    ImmutableSet<ServiceInstance> twoHosts =
+        ImmutableSet.of(serviceInstance("one"), serviceInstance("two"));
+    hostChangeMonitorCapture.getValue().onChange(twoHosts);
+    assertEquals(twoHosts, groupMonitor.get());
+
+    ImmutableSet<ServiceInstance> oneHost = ImmutableSet.of(serviceInstance("one"));
+    hostChangeMonitorCapture.getValue().onChange(oneHost);
+    assertEquals(oneHost, groupMonitor.get());
+
+    ImmutableSet<ServiceInstance> anotherHost = ImmutableSet.of(serviceInstance("three"));
+    hostChangeMonitorCapture.getValue().onChange(anotherHost);
+    assertEquals(anotherHost, groupMonitor.get());
+
+    ImmutableSet<ServiceInstance> noHosts = ImmutableSet.of();
+    hostChangeMonitorCapture.getValue().onChange(noHosts);
+    assertEquals(noHosts, groupMonitor.get());
+  }
+
+  private ServiceInstance serviceInstance(String hostName) {
+    return new ServiceInstance(new Endpoint(hostName, 42), ImmutableMap.of(), Status.ALIVE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java
index 4ebda5e..f1a02e4 100644
--- a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java
@@ -13,27 +13,37 @@
  */
 package org.apache.aurora.scheduler.discovery;
 
-import java.net.InetSocketAddress;
-import java.util.Optional;
-
 import com.google.common.collect.ImmutableList;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
+import com.google.inject.Binder;
+import com.google.inject.Module;
 
 import org.apache.aurora.common.application.ShutdownRegistry;
 import org.apache.aurora.common.application.ShutdownRegistry.ShutdownRegistryImpl;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.testing.TearDownTestCase;
+import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
 import org.apache.curator.framework.api.ACLProvider;
 import org.apache.zookeeper.data.ACL;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 
-public class CuratorDiscoveryModuleTest extends TearDownTestCase {
+public class CuratorDiscoveryModuleTest extends AbstractDiscoveryModuleTest {
+
+  @Override
+  void bindExtraRequirements(Binder binder) {
+    ShutdownRegistryImpl shutdownRegistry = new ShutdownRegistryImpl();
+    binder.bind(ShutdownRegistry.class).toInstance(shutdownRegistry);
+    addTearDown(shutdownRegistry::execute);
+  }
+
+  @Override
+  Module createModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig) {
+    return new CuratorServiceDiscoveryModule(discoveryPath, zooKeeperConfig);
+  }
+
+  @Override
+  boolean isCurator() {
+    return false;
+  }
 
   @Test
   public void testSingleACLProvider() {
@@ -54,36 +64,4 @@ public class CuratorDiscoveryModuleTest extends TearDownTestCase {
   public void testSingleACLProviderEmpty() {
     new CuratorServiceDiscoveryModule.SingleACLProvider(ImmutableList.of());
   }
-
-  @Test
-  public void testBindingContract() {
-    ZooKeeperConfig zooKeeperConfig =
-        new ZooKeeperConfig(
-            ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42)),
-            Optional.of("/chroot"),
-            false, // inProcess
-            Amount.of(1, Time.DAYS),
-            Optional.of(Credentials.digestCredentials("test", "user")));
-
-    Injector injector =
-        Guice.createInjector(
-            new AbstractModule() {
-              @Override
-              protected void configure() {
-                bind(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY)
-                    .toInstance(
-                        ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42)));
-                bind(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY)
-                    .toInstance(ZooKeeperUtils.OPEN_ACL_UNSAFE);
-
-                ShutdownRegistryImpl shutdownRegistry = new ShutdownRegistryImpl();
-                binder().bind(ShutdownRegistry.class).toInstance(shutdownRegistry);
-                addTearDown(shutdownRegistry::execute);
-              }
-            },
-            new CuratorServiceDiscoveryModule("/discovery/path", zooKeeperConfig));
-
-    assertNotNull(injector.getBinding(SingletonService.class).getProvider().get());
-    assertNotNull(injector.getBinding(ServiceGroupMonitor.class).getProvider().get());
-  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java
index bb3d080..6ea49b0 100644
--- a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java
@@ -19,6 +19,7 @@ import java.util.concurrent.CountDownLatch;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
+import org.apache.aurora.common.zookeeper.SingletonService;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.easymock.Capture;
@@ -56,7 +57,7 @@ public class CuratorSingletonServiceTest extends BaseCuratorDiscoveryTest {
       throws Exception {
 
     CuratorSingletonService singletonService =
-        new CuratorSingletonService(client, GROUP_PATH, MEMBER_TOKEN, JsonCodec.INSTANCE);
+        new CuratorSingletonService(client, GROUP_PATH, MEMBER_TOKEN, CODEC);
     InetSocketAddress leaderEndpoint = InetSocketAddress.createUnresolved(hostName, PRIMARY_PORT);
     singletonService.lead(leaderEndpoint, ImmutableMap.of(), listener);
   }


[4/4] aurora git commit: Revert removal of twitter/commons/zk based leadership code

Posted by dm...@apache.org.
Revert removal of twitter/commons/zk based leadership code

See discussion here: https://issues.apache.org/jira/browse/AURORA-1840

Reviewed at https://reviews.apache.org/r/54250/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/16e4651d
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/16e4651d
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/16e4651d

Branch: refs/heads/master
Commit: 16e4651d5ff038dad0e9977edea7c57aeb37fe12
Parents: 8bcad84
Author: David McLaughlin <da...@dmclaughlin.com>
Authored: Thu Dec 1 09:01:33 2016 -0800
Committer: David McLaughlin <dm...@twitter.com>
Committed: Thu Dec 1 09:01:33 2016 -0800

----------------------------------------------------------------------
 RELEASE-NOTES.md                                |   1 +
 build.gradle                                    |  16 +-
 .../aurora/common/zookeeper/Candidate.java      |  78 +++
 .../aurora/common/zookeeper/CandidateImpl.java  | 127 ++++
 .../aurora/common/zookeeper/Credentials.java    |  90 +++
 .../apache/aurora/common/zookeeper/Group.java   | 674 +++++++++++++++++++
 .../aurora/common/zookeeper/JsonCodec.java      | 139 ++++
 .../aurora/common/zookeeper/ServerSet.java      |  74 ++
 .../aurora/common/zookeeper/ServerSetImpl.java  | 349 ++++++++++
 .../aurora/common/zookeeper/ServerSets.java     | 118 ++++
 .../common/zookeeper/SingletonService.java      | 114 ++++
 .../common/zookeeper/SingletonServiceImpl.java  | 122 ++++
 .../common/zookeeper/ZooKeeperClient.java       | 372 ++++++++++
 .../aurora/common/zookeeper/ZooKeeperUtils.java | 167 +++++
 .../testing/BaseZooKeeperClientTest.java        | 140 ++++
 .../zookeeper/testing/BaseZooKeeperTest.java    |  46 ++
 .../zookeeper/testing/ZooKeeperTestServer.java  | 121 ++++
 .../common/zookeeper/CandidateImplTest.java     | 165 +++++
 .../aurora/common/zookeeper/GroupTest.java      | 321 +++++++++
 .../aurora/common/zookeeper/JsonCodecTest.java  | 151 +++++
 .../common/zookeeper/ServerSetImplTest.java     | 258 +++++++
 .../aurora/common/zookeeper/ServerSetsTest.java |  44 ++
 .../zookeeper/SingletonServiceImplTest.java     | 243 +++++++
 .../common/zookeeper/ZooKeeperClientTest.java   | 210 ++++++
 .../common/zookeeper/ZooKeeperUtilsTest.java    | 139 ++++
 config/findbugs/excludeFilter.xml               |   8 -
 docs/features/service-discovery.md              |   2 +-
 docs/reference/scheduler-configuration.md       |   2 +
 .../aurora/scheduler/SchedulerLifecycle.java    |   6 +-
 .../aurora/scheduler/app/SchedulerMain.java     |   4 +-
 .../scheduler/app/ServiceGroupMonitor.java      |  46 ++
 .../CommonsServiceDiscoveryModule.java          | 102 +++
 .../discovery/CommonsServiceGroupMonitor.java   |  59 ++
 .../aurora/scheduler/discovery/Credentials.java |  98 ---
 .../CuratorServiceDiscoveryModule.java          |   8 +-
 .../discovery/CuratorServiceGroupMonitor.java   |   1 +
 .../discovery/CuratorSingletonService.java      |   1 +
 .../discovery/FlaggedZooKeeperConfig.java       |  21 +-
 .../aurora/scheduler/discovery/JsonCodec.java   | 147 ----
 .../discovery/ServiceDiscoveryModule.java       |  20 +-
 .../discovery/ServiceGroupMonitor.java          |  46 --
 .../scheduler/discovery/SingletonService.java   | 114 ----
 .../scheduler/discovery/ZooKeeperConfig.java    |  21 +-
 .../scheduler/discovery/ZooKeeperUtils.java     |  51 --
 .../discovery/testing/BaseZooKeeperTest.java    |  53 --
 .../discovery/testing/ZooKeeperTestServer.java  | 101 ---
 .../scheduler/http/JettyServerModule.java       |   2 +-
 .../aurora/scheduler/http/LeaderRedirect.java   |   4 +-
 .../log/mesos/MesosLogStreamModule.java         |   4 +-
 .../scheduler/SchedulerLifecycleTest.java       |   4 +-
 .../aurora/scheduler/app/SchedulerIT.java       |  52 +-
 .../discovery/AbstractDiscoveryModuleTest.java  |  77 +++
 .../discovery/BaseCuratorDiscoveryTest.java     |  10 +-
 .../discovery/CommonsDiscoveryModuleTest.java   |  29 +
 .../CommonsServiceGroupMonitorTest.java         | 137 ++++
 .../discovery/CuratorDiscoveryModuleTest.java   |  64 +-
 .../discovery/CuratorSingletonServiceTest.java  |   3 +-
 .../scheduler/discovery/JsonCodecTest.java      | 159 -----
 .../discovery/ZooKeeperConfigTest.java          |  17 +-
 .../scheduler/http/AbstractJettyTest.java       |  15 +-
 .../scheduler/http/LeaderRedirectTest.java      |   4 +-
 .../aurora/scheduler/thrift/ThriftIT.java       |   2 +-
 62 files changed, 4871 insertions(+), 902 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 96926f4..7a3d331 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -26,6 +26,7 @@
 - The scheduler flag `-zk_use_curator` has been removed. If you have never set the flag and are
   upgrading you should take care as described in the [note](#zk_use_curator_upgrade) below.
 
+=======
 0.16.0
 ======
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index f257440..2f23b85 100644
--- a/build.gradle
+++ b/build.gradle
@@ -164,7 +164,6 @@ project(':commons') {
   dependencies {
     compile project(':commons-args')
 
-    compile "ch.qos.logback:logback-classic:${logbackRev}"
     compile "com.google.code.findbugs:jsr305:${jsrRev}"
     compile "com.google.code.gson:gson:${gsonRev}"
     compile "com.google.guava:guava:${guavaRev}"
@@ -175,13 +174,17 @@ project(':commons') {
     compile "javax.servlet:javax.servlet-api:${servletRev}"
     compile "joda-time:joda-time:2.9.1"
     compile "org.antlr:stringtemplate:${stringTemplateRev}"
+    compile "org.apache.zookeeper:zookeeper:${zookeeperRev}"
     compile "org.easymock:easymock:3.4"
 
     // There are a few testing support libs in the src/main/java trees that use junit - currently:
+    //   src/main/java/org/apache/aurora/common/zookeeper/testing
     //   src/main/java/org/apache/aurora/common/testing
     compile "junit:junit:${junitRev}"
 
     testCompile "junit:junit:${junitRev}"
+    testCompile "org.powermock:powermock-module-junit4:1.6.4"
+    testCompile "org.powermock:powermock-api-easymock:1.6.4"
   }
 }
 
@@ -347,11 +350,9 @@ dependencies {
   compile project(':commons')
   compile project(':commons-args')
 
-
   compile 'aopalliance:aopalliance:1.0'
-  compile "ch.qos.logback:logback-classic:${logbackRev}"
+  compile 'ch.qos.logback:logback-classic:1.1.3'
   compile "com.google.code.findbugs:jsr305:${jsrRev}"
-  compile "com.google.code.gson:gson:${gsonRev}"
   compile "com.google.inject:guice:${guiceRev}"
   compile "com.google.inject.extensions:guice-assistedinject:${guiceRev}"
   compile "com.google.protobuf:protobuf-java:${protobufRev}"
@@ -385,15 +386,8 @@ dependencies {
   compile 'org.quartz-scheduler:quartz:2.2.2'
   compile "uno.perk:forward:1.0.0"
 
-  // There are a few testing support libs in the src/main/java trees that use junit - currently:
-  //   src/main/java/org/apache/aurora/common/zookeeper/testing
-  compile "junit:junit:${junitRev}"
-
   testCompile "com.sun.jersey:jersey-client:${jerseyRev}"
   testCompile "junit:junit:${junitRev}"
-  testCompile "org.powermock:powermock-module-junit4:1.6.4"
-  testCompile "org.powermock:powermock-api-easymock:1.6.4"
-
 }
 
 // For normal developer builds, avoid running the often-time-consuming code quality checks.

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java
new file mode 100644
index 0000000..75c1b14
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+
+import org.apache.aurora.common.base.ExceptionalCommand;
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+import org.apache.aurora.common.zookeeper.Group.WatchException;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Interface definition for becoming or querying for a ZooKeeper-based group leader.
+ */
+public interface Candidate {
+
+  /**
+   * Returns the current group leader by querying ZooKeeper synchronously.
+   *
+   * @return the current group leader's identifying data or {@link Optional#absent()} if there is
+   *     no leader
+   * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
+   * @throws KeeperException if there was a problem reading the leader information
+   * @throws InterruptedException if this thread is interrupted getting the leader
+   */
+  public Optional<byte[]> getLeaderData()
+      throws ZooKeeperConnectionException, KeeperException, InterruptedException;
+
+  /**
+   * Encapsulates a leader that can be elected and subsequently defeated.
+   */
+  interface Leader {
+
+    /**
+     * Called when this leader has been elected.
+     *
+     * @param abdicate a command that can be used to abdicate leadership and force a new election
+     */
+    void onElected(ExceptionalCommand<JoinException> abdicate);
+
+    /**
+     * Called when the leader has been ousted.  Can occur either if the leader abdicates or if an
+     * external event causes the leader to lose its leadership role (session expiration).
+     */
+    void onDefeated();
+  }
+
+  /**
+   * Offers this candidate in leadership elections for as long as the current jvm process is alive.
+   * Upon election, the {@code onElected} callback will be executed and a command that can be used
+   * to abdicate leadership will be passed in.  If the elected leader jvm process dies or the
+   * elected leader successfully abdicates then a new leader will be elected.  Leaders that
+   * successfully abdicate are removed from the group and will not be eligible for leadership
+   * election unless {@link #offerLeadership(Leader)} is called again.
+   *
+   * @param leader the leader to notify of election and defeat events
+   * @throws JoinException if there was a problem joining the group
+   * @throws WatchException if there is a problem generating the 1st group membership list
+   * @throws InterruptedException if interrupted waiting to join the group and determine initial
+   *     election results
+   * @return a supplier that can be queried to find out if this leader is currently elected
+   */
+  public Supplier<Boolean> offerLeadership(Leader leader)
+        throws JoinException, WatchException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java
new file mode 100644
index 0000000..98b5ee4
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+import org.apache.aurora.common.zookeeper.Group.Membership;
+import org.apache.aurora.common.zookeeper.Group.WatchException;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements leader election for small groups of candidates.  This implementation is subject to the
+ * <a href="http://hadoop.apache.org/zookeeper/docs/r3.2.1/recipes.html#sc_leaderElection">
+ * herd effect</a> for a given group and should only be used for small (~10 member) candidate pools.
+ */
+public class CandidateImpl implements Candidate {
+  private static final Logger LOG = LoggerFactory.getLogger(CandidateImpl.class);
+
+  private static final byte[] UNKNOWN_CANDIDATE_DATA = "<unknown>".getBytes(Charsets.UTF_8);
+
+  private static final Supplier<byte[]> IP_ADDRESS_DATA_SUPPLIER = () -> {
+    try {
+      return InetAddress.getLocalHost().getHostAddress().getBytes();
+    } catch (UnknownHostException e) {
+      LOG.warn("Failed to determine local address!", e);
+      return UNKNOWN_CANDIDATE_DATA;
+    }
+  };
+
+  private static final Function<Iterable<String>, String> MOST_RECENT_JUDGE =
+      candidates -> Ordering.natural().min(candidates);
+
+  private final Group group;
+
+  /**
+   * Creates a candidate that can be used to offer leadership for the given {@code group}.
+   */
+  public CandidateImpl(Group group) {
+    this.group = Preconditions.checkNotNull(group);
+  }
+
+  @Override
+  public Optional<byte[]> getLeaderData()
+      throws ZooKeeperConnectionException, KeeperException, InterruptedException {
+
+    String leaderId = getLeader(group.getMemberIds());
+    return leaderId == null
+        ? Optional.<byte[]>absent()
+        : Optional.of(group.getMemberData(leaderId));
+  }
+
+  @Override
+  public Supplier<Boolean> offerLeadership(final Leader leader)
+      throws JoinException, WatchException, InterruptedException {
+
+    final Membership membership = group.join(IP_ADDRESS_DATA_SUPPLIER, leader::onDefeated);
+
+    final AtomicBoolean elected = new AtomicBoolean(false);
+    final AtomicBoolean abdicated = new AtomicBoolean(false);
+    group.watch(memberIds -> {
+      boolean noCandidates = Iterables.isEmpty(memberIds);
+      String memberId = membership.getMemberId();
+
+      if (noCandidates) {
+        LOG.warn("All candidates have temporarily left the group: " + group);
+      } else if (!Iterables.contains(memberIds, memberId)) {
+        LOG.error(
+            "Current member ID {} is not a candidate for leader, current voting: {}",
+            memberId, memberIds);
+      } else {
+        boolean electedLeader = memberId.equals(getLeader(memberIds));
+        boolean previouslyElected = elected.getAndSet(electedLeader);
+
+        if (!previouslyElected && electedLeader) {
+          LOG.info("Candidate {} is now leader of group: {}",
+              membership.getMemberPath(), memberIds);
+
+          leader.onElected(() -> {
+            membership.cancel();
+            abdicated.set(true);
+          });
+        } else if (!electedLeader) {
+          if (previouslyElected) {
+            leader.onDefeated();
+          }
+          LOG.info(
+              "Candidate {} waiting for the next leader election, current voting: {}",
+              membership.getMemberPath(), memberIds);
+        }
+      }
+    });
+
+    return () -> !abdicated.get() && elected.get();
+  }
+
+  @Nullable
+  private String getLeader(Iterable<String> memberIds) {
+    return Iterables.isEmpty(memberIds) ? null : MOST_RECENT_JUDGE.apply(memberIds);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/Credentials.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Credentials.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Credentials.java
new file mode 100644
index 0000000..18319a3
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/Credentials.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.commons.lang.builder.EqualsBuilder;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Encapsulates a user's ZooKeeper credentials.
+ */
+public final class Credentials {
+
+  /**
+   * Creates a set of credentials for the ZooKeeper digest authentication mechanism.
+   *
+   * @param username the username to authenticate with
+   * @param password the password to authenticate with
+   * @return a set of credentials that can be used to authenticate the zoo keeper client
+   */
+  public static Credentials digestCredentials(String username, String password) {
+    MorePreconditions.checkNotBlank(username);
+    Preconditions.checkNotNull(password);
+
+    // TODO(John Sirois): DigestAuthenticationProvider is broken - uses platform default charset
+    // (on server) and so we just have to hope here that clients are deployed in compatible jvms.
+    // Consider writing and installing a version of DigestAuthenticationProvider that controls its
+    // Charset explicitly.
+    return new Credentials("digest", (username + ":" + password).getBytes());
+  }
+
+  private final String scheme;
+  private final byte[] authToken;
+
+  public Credentials(String scheme, byte[] authToken) {
+    this.scheme = MorePreconditions.checkNotBlank(scheme);
+    this.authToken = requireNonNull(authToken);
+  }
+
+  /**
+   * Returns the authentication scheme these credentials are for.
+   *
+   * @return the scheme these credentials are for.
+   */
+  public String scheme() {
+    return scheme;
+  }
+
+  /**
+   * Returns the authentication token.
+   *
+   * @return the authentication token.
+   */
+  public byte[] authToken() {
+    return authToken;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof Credentials)) {
+      return false;
+    }
+
+    Credentials other = (Credentials) o;
+    return new EqualsBuilder()
+        .append(scheme, other.scheme())
+        .append(authToken, other.authToken())
+        .isEquals();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(scheme, authToken);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java
new file mode 100644
index 0000000..2720dd1
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java
@@ -0,0 +1,674 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.base.Commands;
+import org.apache.aurora.common.base.ExceptionalSupplier;
+import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.aurora.common.util.BackoffHelper;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class exposes methods for joining and monitoring distributed groups.  The groups this class
+ * monitors are realized as persistent paths in ZooKeeper with ephemeral child nodes for
+ * each member of a group.
+ */
+public class Group {
+  private static final Logger LOG = LoggerFactory.getLogger(Group.class);
+
+  private static final Supplier<byte[]> NO_MEMBER_DATA = Suppliers.ofInstance(null);
+  private static final String DEFAULT_NODE_NAME_PREFIX = "member_";
+
+  private final ZooKeeperClient zkClient;
+  private final ImmutableList<ACL> acl;
+  private final String path;
+
+  private final NodeScheme nodeScheme;
+  private final Predicate<String> nodeNameFilter;
+
+  private final BackoffHelper backoffHelper;
+
+  /**
+   * Creates a group rooted at the given {@code path}.  Paths must be absolute and trailing or
+   * duplicate slashes will be normalized.  For example, all the following paths would create a
+   * group at the normalized path /my/distributed/group:
+   * <ul>
+   *   <li>/my/distributed/group
+   *   <li>/my/distributed/group/
+   *   <li>/my/distributed//group
+   * </ul>
+   *
+   * @param zkClient the client to use for interactions with ZooKeeper
+   * @param acl the ACL to use for creating the persistent group path if it does not already exist
+   * @param path the absolute persistent path that represents this group
+   * @param nodeScheme the scheme that defines how nodes are created
+   */
+  public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, NodeScheme nodeScheme) {
+    this.zkClient = Preconditions.checkNotNull(zkClient);
+    this.acl = ImmutableList.copyOf(acl);
+    this.path = ZooKeeperUtils.normalizePath(Preconditions.checkNotNull(path));
+
+    this.nodeScheme = Preconditions.checkNotNull(nodeScheme);
+    nodeNameFilter = Group.this.nodeScheme::isMember;
+
+    backoffHelper = new BackoffHelper();
+  }
+
+  /**
+   * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, String)} with a
+   * {@code namePrefix} of 'member_'.
+   */
+  public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) {
+    this(zkClient, acl, path, DEFAULT_NODE_NAME_PREFIX);
+  }
+
+  /**
+   * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, NodeScheme)} with a
+   * {@link DefaultScheme} using {@code namePrefix}.
+   */
+  public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, String namePrefix) {
+    this(zkClient, acl, path, new DefaultScheme(namePrefix));
+  }
+
+  public String getMemberPath(String memberId) {
+    return path + "/" + MorePreconditions.checkNotBlank(memberId);
+  }
+
+  public String getPath() {
+    return path;
+  }
+
+  public String getMemberId(String nodePath) {
+    MorePreconditions.checkNotBlank(nodePath);
+    Preconditions.checkArgument(nodePath.startsWith(path + "/"),
+        "Not a member of this group[%s]: %s", path, nodePath);
+
+    String memberId = StringUtils.substringAfterLast(nodePath, "/");
+    Preconditions.checkArgument(nodeScheme.isMember(memberId),
+        "Not a group member: %s", memberId);
+    return memberId;
+  }
+
+  /**
+   * Returns the current list of group member ids by querying ZooKeeper synchronously.
+   *
+   * @return the ids of all the present members of this group
+   * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
+   * @throws KeeperException if there was a problem reading this group's member ids
+   * @throws InterruptedException if this thread is interrupted listing the group members
+   */
+  public Iterable<String> getMemberIds()
+      throws ZooKeeperConnectionException, KeeperException, InterruptedException {
+    return Iterables.filter(zkClient.get().getChildren(path, false), nodeNameFilter);
+  }
+
+  /**
+   * Gets the data for one of this groups members by querying ZooKeeper synchronously.
+   *
+   * @param memberId the id of the member whose data to retrieve
+   * @return the data associated with the {@code memberId}
+   * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
+   * @throws KeeperException if there was a problem reading this member's data
+   * @throws InterruptedException if this thread is interrupted retrieving the member data
+   */
+  public byte[] getMemberData(String memberId)
+      throws ZooKeeperConnectionException, KeeperException, InterruptedException {
+    return zkClient.get().getData(getMemberPath(memberId), false, null);
+  }
+
+  /**
+   * Represents membership in a distributed group.
+   */
+  public interface Membership {
+
+    /**
+     * Returns the persistent ZooKeeper path that represents this group.
+     */
+    String getGroupPath();
+
+    /**
+     * Returns the id (ZooKeeper node name) of this group member.  May change over time if the
+     * ZooKeeper session expires.
+     */
+    String getMemberId();
+
+    /**
+     * Returns the full ZooKeeper path to this group member.  May change over time if the
+     * ZooKeeper session expires.
+     */
+    String getMemberPath();
+
+    /**
+     * Updates the membership data synchronously using the {@code Supplier<byte[]>} passed to
+     * {@link Group#join()}.
+     *
+     * @return the new membership data
+     * @throws UpdateException if there was a problem updating the membership data
+     */
+    byte[] updateMemberData() throws UpdateException;
+
+    /**
+     * Cancels group membership by deleting the associated ZooKeeper member node.
+     *
+     * @throws JoinException if there is a problem deleting the node
+     */
+    void cancel() throws JoinException;
+  }
+
+  /**
+   * Indicates an error joining a group.
+   */
+  public static class JoinException extends Exception {
+    public JoinException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Indicates an error updating a group member's data.
+   */
+  public static class UpdateException extends Exception {
+    public UpdateException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Equivalent to calling {@code join(null, null)}.
+   */
+  public final Membership join() throws JoinException, InterruptedException {
+    return join(NO_MEMBER_DATA, null);
+  }
+
+  /**
+   * Equivalent to calling {@code join(memberData, null)}.
+   */
+  public final Membership join(Supplier<byte[]> memberData)
+      throws JoinException, InterruptedException {
+
+    return join(memberData, null);
+  }
+
+  /**
+   * Equivalent to calling {@code join(null, onLoseMembership)}.
+   */
+  public final Membership join(@Nullable final Command onLoseMembership)
+      throws JoinException, InterruptedException {
+
+    return join(NO_MEMBER_DATA, onLoseMembership);
+  }
+
+  /**
+   * Joins this group and returns the resulting Membership when successful.  Membership will be
+   * automatically cancelled when the current jvm process dies; however the returned Membership
+   * object can be used to cancel membership earlier.  Unless
+   * {@link Group.Membership#cancel()} is called the membership will
+   * be maintained by re-establishing it silently in the background.
+   *
+   * <p>Any {@code memberData} given is persisted in the member node in ZooKeeper.  If an
+   * {@code onLoseMembership} callback is supplied, it will be notified each time this member loses
+   * membership in the group.
+   *
+   * @param memberData a supplier of the data to store in the member node
+   * @param onLoseMembership a callback to notify when membership is lost
+   * @return a Membership object with the member details
+   * @throws JoinException if there was a problem joining the group
+   * @throws InterruptedException if this thread is interrupted awaiting completion of the join
+   */
+  public final Membership join(Supplier<byte[]> memberData, @Nullable Command onLoseMembership)
+      throws JoinException, InterruptedException {
+
+    Preconditions.checkNotNull(memberData);
+    ensurePersistentGroupPath();
+
+    final ActiveMembership groupJoiner = new ActiveMembership(memberData, onLoseMembership);
+    return backoffHelper.doUntilResult(() -> {
+      try {
+        return groupJoiner.join();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new JoinException("Interrupted trying to join group at path: " + path, e);
+      } catch (ZooKeeperConnectionException e) {
+        LOG.warn("Temporary error trying to join group at path: " + path, e);
+        return null;
+      } catch (KeeperException e) {
+        if (zkClient.shouldRetry(e)) {
+          LOG.warn("Temporary error trying to join group at path: " + path, e);
+          return null;
+        } else {
+          throw new JoinException("Problem joining partition group at path: " + path, e);
+        }
+      }
+    });
+  }
+
+  private void ensurePersistentGroupPath() throws JoinException, InterruptedException {
+    backoffHelper.doUntilSuccess(() -> {
+      try {
+        ZooKeeperUtils.ensurePath(zkClient, acl, path);
+        return true;
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new JoinException("Interrupted trying to ensure group at path: " + path, e);
+      } catch (ZooKeeperConnectionException e) {
+        LOG.warn("Problem connecting to ZooKeeper, retrying", e);
+        return false;
+      } catch (KeeperException e) {
+        if (zkClient.shouldRetry(e)) {
+          LOG.warn("Temporary error ensuring path: " + path, e);
+          return false;
+        } else {
+          throw new JoinException("Problem ensuring group at path: " + path, e);
+        }
+      }
+    });
+  }
+
+  private class ActiveMembership implements Membership {
+    private final Supplier<byte[]> memberData;
+    private final Command onLoseMembership;
+    private String nodePath;
+    private String memberId;
+    private volatile boolean cancelled;
+    private byte[] membershipData;
+
+    public ActiveMembership(Supplier<byte[]> memberData, @Nullable Command onLoseMembership) {
+      this.memberData = memberData;
+      this.onLoseMembership = (onLoseMembership == null) ? Commands.NOOP : onLoseMembership;
+    }
+
+    @Override
+    public String getGroupPath() {
+      return path;
+    }
+
+    @Override
+    public synchronized String getMemberId() {
+      return memberId;
+    }
+
+    @Override
+    public synchronized String getMemberPath() {
+      return nodePath;
+    }
+
+    @Override
+    public synchronized byte[] updateMemberData() throws UpdateException {
+      byte[] membershipData = memberData.get();
+      if (!ArrayUtils.isEquals(this.membershipData, membershipData)) {
+        try {
+          zkClient.get().setData(nodePath, membershipData, ZooKeeperUtils.ANY_VERSION);
+          this.membershipData = membershipData;
+        } catch (KeeperException e) {
+          throw new UpdateException("Problem updating membership data.", e);
+        } catch (InterruptedException e) {
+          throw new UpdateException("Interrupted attempting to update membership data.", e);
+        } catch (ZooKeeperConnectionException e) {
+          throw new UpdateException(
+              "Could not connect to the ZooKeeper cluster to update membership data.", e);
+        }
+      }
+      return membershipData;
+    }
+
+    @Override
+    public synchronized void cancel() throws JoinException {
+      if (!cancelled) {
+        try {
+          backoffHelper.doUntilSuccess(() -> {
+            try {
+              zkClient.get().delete(nodePath, ZooKeeperUtils.ANY_VERSION);
+              return true;
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              throw new JoinException("Interrupted trying to cancel membership: " + nodePath, e);
+            } catch (ZooKeeperConnectionException e) {
+              LOG.warn("Problem connecting to ZooKeeper, retrying", e);
+              return false;
+            } catch (NoNodeException e) {
+              LOG.info("Membership already cancelled, node at path: " + nodePath +
+                       " has been deleted");
+              return true;
+            } catch (KeeperException e) {
+              if (zkClient.shouldRetry(e)) {
+                LOG.warn("Temporary error cancelling membership: " + nodePath, e);
+                return false;
+              } else {
+                throw new JoinException("Problem cancelling membership: " + nodePath, e);
+              }
+            }
+          });
+          cancelled = true; // Prevent auto-re-join logic from undoing this cancel.
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new JoinException("Problem cancelling membership: " + nodePath, e);
+        }
+      }
+    }
+
+    private class CancelledException extends IllegalStateException { /* marker */ }
+
+    synchronized Membership join()
+        throws ZooKeeperConnectionException, InterruptedException, KeeperException {
+
+      if (cancelled) {
+        throw new CancelledException();
+      }
+
+      if (nodePath == null) {
+        // Re-join if our ephemeral node goes away due to session expiry - only needs to be
+        // registered once.
+        zkClient.registerExpirationHandler(this::tryJoin);
+      }
+
+      byte[] membershipData = memberData.get();
+      String nodeName = nodeScheme.createName(membershipData);
+      CreateMode createMode = nodeScheme.isSequential()
+          ? CreateMode.EPHEMERAL_SEQUENTIAL
+          : CreateMode.EPHEMERAL;
+      nodePath = zkClient.get().create(path + "/" + nodeName, membershipData, acl, createMode);
+      memberId = Group.this.getMemberId(nodePath);
+      LOG.info("Set group member ID to " + memberId);
+      this.membershipData = membershipData;
+
+      // Re-join if our ephemeral node goes away due to maliciousness.
+      zkClient.get().exists(nodePath, event -> {
+        if (event.getType() == EventType.NodeDeleted) {
+          tryJoin();
+        }
+      });
+
+      return this;
+    }
+
+    private final ExceptionalSupplier<Boolean, InterruptedException> tryJoin =
+        () -> {
+          try {
+            join();
+            return true;
+          } catch (CancelledException e) {
+            // Lost a cancel race - that's ok.
+            return true;
+          } catch (ZooKeeperConnectionException e) {
+            LOG.warn("Problem connecting to ZooKeeper, retrying", e);
+            return false;
+          } catch (KeeperException e) {
+            if (zkClient.shouldRetry(e)) {
+              LOG.warn("Temporary error re-joining group: " + path, e);
+              return false;
+            } else {
+              throw new IllegalStateException("Permanent problem re-joining group: " + path, e);
+            }
+          }
+        };
+
+    private synchronized void tryJoin() {
+      onLoseMembership.execute();
+      try {
+        backoffHelper.doUntilSuccess(tryJoin);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(
+            String.format("Interrupted while trying to re-join group: %s, giving up", path), e);
+      }
+    }
+  }
+
+  /**
+   * An interface to an object that listens for changes to a group's membership.
+   */
+  public interface GroupChangeListener {
+
+    /**
+     * Called whenever group membership changes with the new list of member ids.
+     *
+     * @param memberIds the current member ids
+     */
+    void onGroupChange(Iterable<String> memberIds);
+  }
+
+  /**
+   * An interface that dictates the scheme to use for storing and filtering nodes that represent
+   * members of a distributed group.
+   */
+  public interface NodeScheme {
+    /**
+     * Determines if a child node is a member of a group by examining the node's name.
+     *
+     * @param nodeName the name of a child node found in a group
+     * @return {@code true} if {@code nodeName} identifies a group member in this scheme
+     */
+    boolean isMember(String nodeName);
+
+    /**
+     * Generates a node name for the node representing this process in the distributed group.
+     *
+     * @param membershipData the data that will be stored in this node
+     * @return the name for the node that will represent this process in the group
+     */
+    String createName(byte[] membershipData);
+
+    /**
+     * Indicates whether this scheme needs ephemeral sequential nodes or just ephemeral nodes.
+     *
+     * @return {@code true} if this scheme requires sequential node names; {@code false} otherwise
+     */
+    boolean isSequential();
+  }
+
+  /**
+   * Indicates an error watching a group.
+   */
+  public static class WatchException extends Exception {
+    public WatchException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Watches this group for the lifetime of this jvm process.  This method will block until the
+   * current group members are available, notify the {@code groupChangeListener} and then return.
+   * All further changes to the group membership will cause notifications on a background thread.
+   *
+   * @param groupChangeListener the listener to notify of group membership change events
+   * @return A command which, when executed, will stop watching the group.
+   * @throws WatchException if there is a problem generating the 1st group membership list
+   * @throws InterruptedException if interrupted waiting to gather the 1st group membership list
+   */
+  public final Command watch(final GroupChangeListener groupChangeListener)
+      throws WatchException, InterruptedException {
+    Preconditions.checkNotNull(groupChangeListener);
+
+    try {
+      ensurePersistentGroupPath();
+    } catch (JoinException e) {
+      throw new WatchException("Failed to create group path: " + path, e);
+    }
+
+    final GroupMonitor groupMonitor = new GroupMonitor(groupChangeListener);
+    backoffHelper.doUntilSuccess(() -> {
+      try {
+        groupMonitor.watchGroup();
+        return true;
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new WatchException("Interrupted trying to watch group at path: " + path, e);
+      } catch (ZooKeeperConnectionException e) {
+        LOG.warn("Temporary error trying to watch group at path: " + path, e);
+        return null;
+      } catch (KeeperException e) {
+        if (zkClient.shouldRetry(e)) {
+          LOG.warn("Temporary error trying to watch group at path: " + path, e);
+          return null;
+        } else {
+          throw new WatchException("Problem trying to watch group at path: " + path, e);
+        }
+      }
+    });
+    return groupMonitor::stopWatching;
+  }
+
+  /**
+   * Helps continuously monitor a group for membership changes.
+   */
+  private class GroupMonitor {
+    private final GroupChangeListener groupChangeListener;
+    private volatile boolean stopped = false;
+    private Set<String> members;
+
+    GroupMonitor(GroupChangeListener groupChangeListener) {
+      this.groupChangeListener = groupChangeListener;
+    }
+
+    private final Watcher groupWatcher = event -> {
+      if (event.getType() == EventType.NodeChildrenChanged) {
+        tryWatchGroup();
+      }
+    };
+
+    private final ExceptionalSupplier<Boolean, InterruptedException> tryWatchGroup =
+        () -> {
+          try {
+            watchGroup();
+            return true;
+          } catch (ZooKeeperConnectionException e) {
+            LOG.warn("Problem connecting to ZooKeeper, retrying", e);
+            return false;
+          } catch (KeeperException e) {
+            if (zkClient.shouldRetry(e)) {
+              LOG.warn("Temporary error re-watching group: " + path, e);
+              return false;
+            } else {
+              throw new IllegalStateException("Permanent problem re-watching group: " + path, e);
+            }
+          }
+        };
+
+    private void tryWatchGroup() {
+      if (stopped) {
+        return;
+      }
+
+      try {
+        backoffHelper.doUntilSuccess(tryWatchGroup);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(
+            String.format("Interrupted while trying to re-watch group: %s, giving up", path), e);
+      }
+    }
+
+    private void watchGroup()
+        throws ZooKeeperConnectionException, InterruptedException, KeeperException {
+
+      if (stopped) {
+        return;
+      }
+
+      List<String> children = zkClient.get().getChildren(path, groupWatcher);
+      setMembers(Iterables.filter(children, nodeNameFilter));
+    }
+
+    private void stopWatching() {
+      // TODO(William Farner): Cancel the watch when
+      // https://issues.apache.org/jira/browse/ZOOKEEPER-442 is resolved.
+      LOG.info("Stopping watch on " + this);
+      stopped = true;
+    }
+
+    synchronized void setMembers(Iterable<String> members) {
+      if (stopped) {
+        LOG.info("Suppressing membership update, no longer watching " + this);
+        return;
+      }
+
+      if (this.members == null) {
+        // Reset our watch on the group if session expires - only needs to be registered once.
+        zkClient.registerExpirationHandler(this::tryWatchGroup);
+      }
+
+      Set<String> membership = ImmutableSet.copyOf(members);
+      if (!membership.equals(this.members)) {
+        groupChangeListener.onGroupChange(members);
+        this.members = membership;
+      }
+    }
+  }
+
+  /**
+   * Default naming scheme implementation. Stores nodes at [given path] + "/" + [given prefix] +
+   * ZooKeeper-generated member ID. For example, if the path is "/discovery/servicename", and the
+   * prefix is "member_", the node's full path will look something like
+   * {@code /discovery/servicename/member_0000000007}.
+   */
+  public static class DefaultScheme implements NodeScheme {
+    private final String namePrefix;
+    private final Pattern namePattern;
+
+    /**
+     * Creates a sequential node scheme based on the given node name prefix.
+     *
+     * @param namePrefix the prefix for the names of the member nodes
+     */
+    public DefaultScheme(String namePrefix) {
+      this.namePrefix = MorePreconditions.checkNotBlank(namePrefix);
+      namePattern = Pattern.compile("^" + Pattern.quote(namePrefix) + "-?[0-9]+$");
+    }
+
+    @Override
+    public boolean isMember(String nodeName) {
+      return namePattern.matcher(nodeName).matches();
+    }
+
+    @Override
+    public String createName(byte[] membershipData) {
+      return namePrefix;
+    }
+
+    @Override
+    public boolean isSequential() {
+      return true;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "Group " + path;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java
new file mode 100644
index 0000000..9d31608
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.Charset;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.JsonIOException;
+import com.google.gson.JsonParseException;
+
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+
+import static java.util.Objects.requireNonNull;
+
+class JsonCodec implements Codec<ServiceInstance> {
+
+  private static void assertRequiredField(String fieldName, Object fieldValue) {
+    if (fieldValue == null) {
+      throw new JsonParseException(String.format("Field %s is required", fieldName));
+    }
+  }
+
+  private static class EndpointSchema {
+    private final String host;
+    private final Integer port;
+
+    EndpointSchema(Endpoint endpoint) {
+      host = endpoint.getHost();
+      port = endpoint.getPort();
+    }
+
+    Endpoint asEndpoint() {
+      assertRequiredField("host", host);
+      assertRequiredField("port", port);
+
+      return new Endpoint(host, port);
+    }
+  }
+
+  private static class ServiceInstanceSchema {
+    private final EndpointSchema serviceEndpoint;
+    private final Map<String, EndpointSchema> additionalEndpoints;
+    private final Status status;
+    private final @Nullable Integer shard;
+
+    ServiceInstanceSchema(ServiceInstance instance) {
+      serviceEndpoint = new EndpointSchema(instance.getServiceEndpoint());
+      if (instance.isSetAdditionalEndpoints()) {
+        additionalEndpoints =
+            Maps.transformValues(instance.getAdditionalEndpoints(), EndpointSchema::new);
+      } else {
+        additionalEndpoints = ImmutableMap.of();
+      }
+      status  = instance.getStatus();
+      shard = instance.isSetShard() ? instance.getShard() : null;
+    }
+
+    ServiceInstance asServiceInstance() {
+      assertRequiredField("serviceEndpoint", serviceEndpoint);
+      assertRequiredField("status", status);
+
+      Map<String, EndpointSchema> extraEndpoints =
+          additionalEndpoints == null ? ImmutableMap.of() : additionalEndpoints;
+
+      ServiceInstance instance =
+          new ServiceInstance(
+              serviceEndpoint.asEndpoint(),
+              Maps.transformValues(extraEndpoints, EndpointSchema::asEndpoint),
+              status);
+      if (shard != null) {
+        instance.setShard(shard);
+      }
+      return instance;
+    }
+  }
+
+  private static final Charset ENCODING = Charsets.UTF_8;
+
+  private final Gson gson;
+
+  JsonCodec() {
+    this(new Gson());
+  }
+
+  JsonCodec(Gson gson) {
+    this.gson = requireNonNull(gson);
+  }
+
+  @Override
+  public void serialize(ServiceInstance instance, OutputStream sink) throws IOException {
+    Writer writer = new OutputStreamWriter(sink, ENCODING);
+    try {
+      gson.toJson(new ServiceInstanceSchema(instance), writer);
+    } catch (JsonIOException e) {
+      throw new IOException(String.format("Problem serializing %s to JSON", instance), e);
+    }
+    writer.flush();
+  }
+
+  @Override
+  public ServiceInstance deserialize(InputStream source) throws IOException {
+    InputStreamReader reader = new InputStreamReader(source, ENCODING);
+    try {
+      @Nullable ServiceInstanceSchema schema = gson.fromJson(reader, ServiceInstanceSchema.class);
+      if (schema == null) {
+        throw new IOException("JSON did not include a ServiceInstance object");
+      }
+      return schema.asServiceInstance();
+    } catch (JsonParseException e) {
+      throw new IOException("Problem parsing JSON ServiceInstance.", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
new file mode 100644
index 0000000..aeea02d
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+
+/**
+ * A logical set of servers registered in ZooKeeper.  Intended to be used by servers in a
+ * common service to advertise their presence to server-set protocol-aware clients.
+ *
+ * Standard implementations should use the {@link #JSON_CODEC} to serialize the service instance
+ * rendezvous data to zookeeper so that standard clients can interoperate.
+ */
+public interface ServerSet {
+
+  /**
+   * Encodes a {@link ServiceInstance} as a JSON object.
+   *
+   * This is the default encoding for service instance data in ZooKeeper.
+   */
+  Codec<ServiceInstance> JSON_CODEC = new JsonCodec();
+
+  /**
+   * Attempts to join a server set for this logical service group.
+   *
+   * @param endpoint the primary service endpoint
+   * @param additionalEndpoints and additional endpoints keyed by their logical name
+   * @return an EndpointStatus object that allows the endpoint to adjust its status
+   * @throws JoinException if there was a problem joining the server set
+   * @throws InterruptedException if interrupted while waiting to join the server set
+   */
+  EndpointStatus join(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> additionalEndpoints)
+      throws JoinException, InterruptedException;
+
+  /**
+   * A handle to a service endpoint's status data that allows updating it to track current events.
+   */
+  interface EndpointStatus {
+
+    /**
+     * Removes the endpoint from the server set.
+     *
+     * @throws UpdateException if there was a problem leaving the ServerSet.
+     */
+    void leave() throws UpdateException;
+  }
+
+  /**
+   * Indicates an error updating a service's status information.
+   */
+  class UpdateException extends Exception {
+    public UpdateException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
new file mode 100644
index 0000000..ace4980
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Predicates;
+import com.google.common.base.Supplier;
+import com.google.common.base.Throwables;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Sets.SetView;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.net.pool.DynamicHostSet;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.apache.aurora.common.util.BackoffHelper;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * ZooKeeper-backed implementation of {@link ServerSet} and {@link DynamicHostSet}.
+ */
+public class ServerSetImpl implements ServerSet, DynamicHostSet<ServiceInstance> {
+  private static final Logger LOG = LoggerFactory.getLogger(ServerSetImpl.class);
+
+  private final ZooKeeperClient zkClient;
+  private final Group group;
+  private final Codec<ServiceInstance> codec;
+  private final BackoffHelper backoffHelper;
+
+  /**
+   * Creates a new ServerSet using open ZooKeeper node ACLs.
+   *
+   * @param zkClient the client to use for interactions with ZooKeeper
+   * @param path the name-service path of the service to connect to
+   */
+  public ServerSetImpl(ZooKeeperClient zkClient, String path) {
+    this(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, path);
+  }
+
+  /**
+   * Creates a new ServerSet for the given service {@code path}.
+   *
+   * @param zkClient the client to use for interactions with ZooKeeper
+   * @param acl the ACL to use for creating the persistent group path if it does not already exist
+   * @param path the name-service path of the service to connect to
+   */
+  public ServerSetImpl(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) {
+    this(zkClient, new Group(zkClient, acl, path), JSON_CODEC);
+  }
+
+  /**
+   * Creates a new ServerSet using the given service {@code group}.
+   *
+   * @param zkClient the client to use for interactions with ZooKeeper
+   * @param group the server group
+   */
+  public ServerSetImpl(ZooKeeperClient zkClient, Group group) {
+    this(zkClient, group, JSON_CODEC);
+  }
+
+  /**
+   * Creates a new ServerSet using the given service {@code group} and a custom {@code codec}.
+   *
+   * @param zkClient the client to use for interactions with ZooKeeper
+   * @param group the server group
+   * @param codec a codec to use for serializing and de-serializing the ServiceInstance data to and
+   *     from a byte array
+   */
+  public ServerSetImpl(ZooKeeperClient zkClient, Group group, Codec<ServiceInstance> codec) {
+    this.zkClient = checkNotNull(zkClient);
+    this.group = checkNotNull(group);
+    this.codec = checkNotNull(codec);
+
+    // TODO(John Sirois): Inject the helper so that backoff strategy can be configurable.
+    backoffHelper = new BackoffHelper();
+  }
+
+  @VisibleForTesting
+  ZooKeeperClient getZkClient() {
+    return zkClient;
+  }
+
+  @Override
+  public EndpointStatus join(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> additionalEndpoints)
+      throws Group.JoinException, InterruptedException {
+
+    checkNotNull(endpoint);
+    checkNotNull(additionalEndpoints);
+
+    MemberStatus memberStatus = new MemberStatus(endpoint, additionalEndpoints);
+    Supplier<byte[]> serviceInstanceSupplier = memberStatus::serializeServiceInstance;
+    Group.Membership membership = group.join(serviceInstanceSupplier);
+
+    return () -> memberStatus.leave(membership);
+  }
+
+  @Override
+  public Command watch(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
+    ServerSetWatcher serverSetWatcher = new ServerSetWatcher(zkClient, monitor);
+    try {
+      return serverSetWatcher.watch();
+    } catch (Group.WatchException e) {
+      throw new MonitorException("ZooKeeper watch failed.", e);
+    } catch (InterruptedException e) {
+      throw new MonitorException("Interrupted while watching ZooKeeper.", e);
+    }
+  }
+
+  private class MemberStatus {
+    private final InetSocketAddress endpoint;
+    private final Map<String, InetSocketAddress> additionalEndpoints;
+
+    private MemberStatus(
+        InetSocketAddress endpoint,
+        Map<String, InetSocketAddress> additionalEndpoints) {
+
+      this.endpoint = endpoint;
+      this.additionalEndpoints = additionalEndpoints;
+    }
+
+    synchronized void leave(Group.Membership membership) throws UpdateException {
+      try {
+        membership.cancel();
+      } catch (Group.JoinException e) {
+        throw new UpdateException(
+            "Failed to auto-cancel group membership on transition to DEAD status", e);
+      }
+    }
+
+    byte[] serializeServiceInstance() {
+      ServiceInstance serviceInstance = new ServiceInstance(
+          ServerSets.toEndpoint(endpoint),
+          Maps.transformValues(additionalEndpoints, ServerSets.TO_ENDPOINT),
+          Status.ALIVE);
+
+      LOG.debug("updating endpoint data to:\n\t" + serviceInstance);
+      try {
+        return ServerSets.serializeServiceInstance(serviceInstance, codec);
+      } catch (IOException e) {
+        throw new IllegalStateException("Unexpected problem serializing thrift struct " +
+            serviceInstance + "to a byte[]", e);
+      }
+    }
+  }
+
+  private static class ServiceInstanceFetchException extends RuntimeException {
+    ServiceInstanceFetchException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  private static class ServiceInstanceDeletedException extends RuntimeException {
+    ServiceInstanceDeletedException(String path) {
+      super(path);
+    }
+  }
+
+  private class ServerSetWatcher {
+    private final ZooKeeperClient zkClient;
+    private final HostChangeMonitor<ServiceInstance> monitor;
+    @Nullable private ImmutableSet<ServiceInstance> serverSet;
+
+    ServerSetWatcher(ZooKeeperClient zkClient, HostChangeMonitor<ServiceInstance> monitor) {
+      this.zkClient = zkClient;
+      this.monitor = monitor;
+    }
+
+    public Command watch() throws Group.WatchException, InterruptedException {
+      Watcher onExpirationWatcher = zkClient.registerExpirationHandler(this::rebuildServerSet);
+
+      try {
+        return group.watch(this::notifyGroupChange);
+      } catch (Group.WatchException e) {
+        zkClient.unregister(onExpirationWatcher);
+        throw e;
+      } catch (InterruptedException e) {
+        zkClient.unregister(onExpirationWatcher);
+        throw e;
+      }
+    }
+
+    private ServiceInstance getServiceInstance(final String nodePath) {
+      try {
+        return backoffHelper.doUntilResult(() -> {
+          try {
+            byte[] data = zkClient.get().getData(nodePath, false, null);
+            return ServerSets.deserializeServiceInstance(data, codec);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new ServiceInstanceFetchException(
+                "Interrupted updating service data for: " + nodePath, e);
+          } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+            LOG.warn("Temporary error trying to updating service data for: " + nodePath, e);
+            return null;
+          } catch (NoNodeException e) {
+            invalidateNodePath(nodePath);
+            throw new ServiceInstanceDeletedException(nodePath);
+          } catch (KeeperException e) {
+            if (zkClient.shouldRetry(e)) {
+              LOG.warn("Temporary error trying to update service data for: " + nodePath, e);
+              return null;
+            } else {
+              throw new ServiceInstanceFetchException(
+                  "Failed to update service data for: " + nodePath, e);
+            }
+          } catch (IOException e) {
+            throw new ServiceInstanceFetchException(
+                "Failed to deserialize the ServiceInstance data for: " + nodePath, e);
+          }
+        });
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new ServiceInstanceFetchException(
+            "Interrupted trying to update service data for: " + nodePath, e);
+      }
+    }
+
+    private final LoadingCache<String, ServiceInstance> servicesByMemberId =
+        CacheBuilder.newBuilder().build(new CacheLoader<String, ServiceInstance>() {
+          @Override public ServiceInstance load(String memberId) {
+            return getServiceInstance(group.getMemberPath(memberId));
+          }
+        });
+
+    private void rebuildServerSet() {
+      Set<String> memberIds = ImmutableSet.copyOf(servicesByMemberId.asMap().keySet());
+      servicesByMemberId.invalidateAll();
+      notifyGroupChange(memberIds);
+    }
+
+    private String invalidateNodePath(String deletedPath) {
+      String memberId = group.getMemberId(deletedPath);
+      servicesByMemberId.invalidate(memberId);
+      return memberId;
+    }
+
+    private final Function<String, ServiceInstance> MAYBE_FETCH_NODE =
+        memberId -> {
+          // This get will trigger a fetch
+          try {
+            return servicesByMemberId.getUnchecked(memberId);
+          } catch (UncheckedExecutionException e) {
+            Throwable cause = e.getCause();
+            if (!(cause instanceof ServiceInstanceDeletedException)) {
+              Throwables.propagateIfInstanceOf(cause, ServiceInstanceFetchException.class);
+              throw new IllegalStateException(
+                  "Unexpected error fetching member data for: " + memberId, e);
+            }
+            return null;
+          }
+        };
+
+    private synchronized void notifyGroupChange(Iterable<String> memberIds) {
+      ImmutableSet<String> newMemberIds = ImmutableSortedSet.copyOf(memberIds);
+      Set<String> existingMemberIds = servicesByMemberId.asMap().keySet();
+
+      // Ignore no-op state changes except for the 1st when we've seen no group yet.
+      if ((serverSet == null) || !newMemberIds.equals(existingMemberIds)) {
+        SetView<String> deletedMemberIds = Sets.difference(existingMemberIds, newMemberIds);
+        // Implicit removal from servicesByMemberId.
+        existingMemberIds.removeAll(ImmutableSet.copyOf(deletedMemberIds));
+
+        Iterable<ServiceInstance> serviceInstances = Iterables.filter(
+            Iterables.transform(newMemberIds, MAYBE_FETCH_NODE), Predicates.notNull());
+
+        notifyServerSetChange(ImmutableSet.copyOf(serviceInstances));
+      }
+    }
+
+    private void notifyServerSetChange(ImmutableSet<ServiceInstance> currentServerSet) {
+      // ZK nodes may have changed if there was a session expiry for a server in the server set, but
+      // if the server's status has not changed, we can skip any onChange updates.
+      if (!currentServerSet.equals(serverSet)) {
+        if (currentServerSet.isEmpty()) {
+          LOG.warn("server set empty for path " + group.getPath());
+        } else {
+          if (serverSet == null) {
+            LOG.info("received initial membership {}", currentServerSet);
+          } else {
+            logChange(currentServerSet);
+          }
+        }
+        serverSet = currentServerSet;
+        monitor.onChange(serverSet);
+      }
+    }
+
+    private void logChange(ImmutableSet<ServiceInstance> newServerSet) {
+      StringBuilder message = new StringBuilder("server set " + group.getPath() + " change: ");
+      if (serverSet.size() != newServerSet.size()) {
+        message.append("from ").append(serverSet.size())
+            .append(" members to ").append(newServerSet.size());
+      }
+
+      Joiner joiner = Joiner.on("\n\t\t");
+
+      SetView<ServiceInstance> left = Sets.difference(serverSet, newServerSet);
+      if (!left.isEmpty()) {
+        message.append("\n\tleft:\n\t\t").append(joiner.join(left));
+      }
+
+      SetView<ServiceInstance> joined = Sets.difference(newServerSet, serverSet);
+      if (!joined.isEmpty()) {
+        message.append("\n\tjoined:\n\t\t").append(joiner.join(joined));
+      }
+
+      LOG.info(message.toString());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
new file mode 100644
index 0000000..01a54a5
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.apache.zookeeper.data.ACL;
+
+/**
+ * Common ServerSet related functions
+ */
+public class ServerSets {
+
+  private ServerSets() {
+    // Utility class.
+  }
+
+  /**
+   * A function that invokes {@link #toEndpoint(InetSocketAddress)}.
+   */
+  public static final Function<InetSocketAddress, Endpoint> TO_ENDPOINT =
+      ServerSets::toEndpoint;
+
+  /**
+   * Creates a server set that registers at a single path applying the given ACL to all nodes
+   * created in the path.
+   *
+   * @param zkClient ZooKeeper client to register with.
+   * @param acl The ACL to apply to the {@code zkPath} nodes the ServerSet creates.
+   * @param zkPath Path to register at.  @see #create(ZooKeeperClient, java.util.Set)
+   * @return A server set that registers at {@code zkPath}.
+   */
+  public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, String zkPath) {
+    Preconditions.checkNotNull(zkClient);
+    MorePreconditions.checkNotBlank(acl);
+    MorePreconditions.checkNotBlank(zkPath);
+
+    return new ServerSetImpl(zkClient, acl, zkPath);
+  }
+
+  /**
+   * Returns a serialized Thrift service instance object, with given endpoints and codec.
+   *
+   * @param serviceInstance the Thrift service instance object to be serialized
+   * @param codec the codec to use to serialize a Thrift service instance object
+   * @return byte array that contains a serialized Thrift service instance
+   */
+  public static byte[] serializeServiceInstance(
+      ServiceInstance serviceInstance, Codec<ServiceInstance> codec) throws IOException {
+
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+    codec.serialize(serviceInstance, output);
+    return output.toByteArray();
+  }
+
+  /**
+   * Serializes a service instance based on endpoints.
+   * @see #serializeServiceInstance(ServiceInstance, Codec)
+   *
+   * @param address the target address of the service instance
+   * @param additionalEndpoints additional endpoints of the service instance
+   * @param status service status
+   */
+  public static byte[] serializeServiceInstance(
+      InetSocketAddress address,
+      Map<String, Endpoint> additionalEndpoints,
+      Status status,
+      Codec<ServiceInstance> codec) throws IOException {
+
+    ServiceInstance serviceInstance =
+        new ServiceInstance(toEndpoint(address), additionalEndpoints, status);
+    return serializeServiceInstance(serviceInstance, codec);
+  }
+
+  /**
+   * Creates a service instance object deserialized from byte array.
+   *
+   * @param data the byte array contains a serialized Thrift service instance
+   * @param codec the codec to use to deserialize the byte array
+   */
+  public static ServiceInstance deserializeServiceInstance(
+      byte[] data, Codec<ServiceInstance> codec) throws IOException {
+
+    return codec.deserialize(new ByteArrayInputStream(data));
+  }
+
+  /**
+   * Creates an endpoint for the given InetSocketAddress.
+   *
+   * @param address the target address to create the endpoint for
+   */
+  public static Endpoint toEndpoint(InetSocketAddress address) {
+    return new Endpoint(address.getHostName(), address.getPort());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java
new file mode 100644
index 0000000..7f962eb
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+/**
+ * A service that uses master election to only allow a single service instance to be active amongst
+ * a set of potential servers at a time.
+ */
+public interface SingletonService {
+
+  /**
+   * Indicates an error attempting to lead a group of servers.
+   */
+  class LeadException extends Exception {
+    public LeadException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Indicates an error attempting to advertise leadership of a group of servers.
+   */
+  class AdvertiseException extends Exception {
+    public AdvertiseException(String message) {
+      super(message);
+    }
+
+    public AdvertiseException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Indicates an error attempting to leave a group of servers, abdicating leadership of the group.
+   */
+  class LeaveException extends Exception {
+    public LeaveException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Attempts to lead the singleton service.
+   *
+   * @param endpoint The primary endpoint to register as a leader candidate in the service.
+   * @param additionalEndpoints Additional endpoints that are available on the host.
+   * @param listener Handler to call when the candidate is elected or defeated.
+   * @throws LeadException If there was a problem joining or watching the ZooKeeper group.
+   * @throws InterruptedException If the thread watching/joining the group was interrupted.
+   */
+  void lead(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> additionalEndpoints,
+      LeadershipListener listener)
+      throws LeadException, InterruptedException;
+
+  /**
+   * A listener to be notified of changes in the leadership status.
+   * Implementers should be careful to avoid blocking operations in these callbacks.
+   */
+  interface LeadershipListener {
+
+    /**
+     * Notifies the listener that is is current leader.
+     *
+     * @param control A controller handle to advertise and/or leave advertised presence.
+     */
+    void onLeading(LeaderControl control);
+
+    /**
+     * Notifies the listener that it is no longer leader.
+     */
+    void onDefeated();
+  }
+
+  /**
+   * A controller for the state of the leader.  This will be provided to the leader upon election,
+   * which allows the leader to decide when to advertise as leader of the server set and terminate
+   * leadership at will.
+   */
+  interface LeaderControl {
+
+    /**
+     * Advertises the leader's server presence to clients.
+     *
+     * @throws AdvertiseException If there was an error advertising the singleton leader to clients
+     *     of the server set.
+     * @throws InterruptedException If interrupted while advertising.
+     */
+    void advertise() throws AdvertiseException, InterruptedException;
+
+    /**
+     * Leaves candidacy for leadership, removing advertised server presence if applicable.
+     *
+     * @throws LeaveException If the leader's status could not be updated or there was an error
+     *     abdicating server set leadership.
+     */
+    void leave() throws LeaveException;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
new file mode 100644
index 0000000..d9978a9
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.common.base.ExceptionalCommand;
+import org.apache.aurora.common.zookeeper.Candidate.Leader;
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+import org.apache.zookeeper.data.ACL;
+
+public class SingletonServiceImpl implements SingletonService {
+  @VisibleForTesting
+  static final String LEADER_ELECT_NODE_PREFIX = "singleton_candidate_";
+
+  /**
+   * Creates a candidate that can be combined with an existing server set to form a singleton
+   * service using {@link #SingletonServiceImpl(ServerSet, Candidate)}.
+   *
+   * @param zkClient The ZooKeeper client to use.
+   * @param servicePath The path where service nodes live.
+   * @param acl The acl to apply to newly created candidate nodes and serverset nodes.
+   * @return A candidate that can be housed with a standard server set under a single zk path.
+   */
+  public static Candidate createSingletonCandidate(
+      ZooKeeperClient zkClient,
+      String servicePath,
+      Iterable<ACL> acl) {
+
+    return new CandidateImpl(new Group(zkClient, acl, servicePath, LEADER_ELECT_NODE_PREFIX));
+  }
+
+  private final ServerSet serverSet;
+  private final Candidate candidate;
+
+  /**
+   * Creates a new singleton service that uses the supplied candidate to vie for leadership and then
+   * advertises itself in the given server set once elected.
+   *
+   * @param serverSet The server set to advertise in on election.
+   * @param candidate The candidacy to use to vie for election.
+   */
+  public SingletonServiceImpl(ServerSet serverSet, Candidate candidate) {
+    this.serverSet = Preconditions.checkNotNull(serverSet);
+    this.candidate = Preconditions.checkNotNull(candidate);
+  }
+
+  @Override
+  public void lead(final InetSocketAddress endpoint,
+                   final Map<String, InetSocketAddress> additionalEndpoints,
+                   final LeadershipListener listener)
+                   throws LeadException, InterruptedException {
+
+    Preconditions.checkNotNull(listener);
+
+    try {
+      candidate.offerLeadership(new Leader() {
+        @Override public void onElected(final ExceptionalCommand<JoinException> abdicate) {
+          listener.onLeading(new LeaderControl() {
+            ServerSet.EndpointStatus endpointStatus = null;
+            final AtomicBoolean left = new AtomicBoolean(false);
+
+            // Methods are synchronized to prevent simultaneous invocations.
+            @Override public synchronized void advertise()
+                throws AdvertiseException, InterruptedException {
+
+              Preconditions.checkState(!left.get(), "Cannot advertise after leaving.");
+              Preconditions.checkState(endpointStatus == null, "Cannot advertise more than once.");
+              try {
+                endpointStatus = serverSet.join(endpoint, additionalEndpoints);
+              } catch (JoinException e) {
+                throw new AdvertiseException("Problem advertising endpoint " + endpoint, e);
+              }
+            }
+
+            @Override public synchronized void leave() throws LeaveException {
+              Preconditions.checkState(left.compareAndSet(false, true),
+                  "Cannot leave more than once.");
+              if (endpointStatus != null) {
+                try {
+                  endpointStatus.leave();
+                } catch (ServerSet.UpdateException e) {
+                  throw new LeaveException("Problem updating endpoint status for abdicating leader " +
+                      "at endpoint " + endpoint, e);
+                }
+              }
+              try {
+                abdicate.execute();
+              } catch (JoinException e) {
+                throw new LeaveException("Problem abdicating leadership for endpoint " + endpoint, e);
+              }
+            }
+          });
+        }
+
+        @Override public void onDefeated() {
+          listener.onDefeated();
+        }
+      });
+    } catch (JoinException e) {
+      throw new LeadException("Problem joining leadership group for endpoint " + endpoint, e);
+    } catch (Group.WatchException e) {
+      throw new LeadException("Problem getting initial membership list for leadership group.", e);
+    }
+  }
+}