You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by dm...@apache.org on 2016/12/01 17:02:31 UTC

[2/4] aurora git commit: Revert removal of twitter/commons/zk based leadership code

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java
new file mode 100644
index 0000000..9e482a6
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import com.google.common.base.Charsets;
+
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.BadVersionException;
+import org.apache.zookeeper.KeeperException.NoAuthException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.Stat;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * @author John Sirois
+ */
+public class ZooKeeperUtilsTest extends BaseZooKeeperClientTest {
+  @Test
+  public void testEnsurePath() throws Exception {
+    ZooKeeperClient zkClient = createZkClient();
+    zkClient.get().addAuthInfo("digest", "client1:boo".getBytes(Charsets.UTF_8));
+
+    assertNull(zkClient.get().exists("/foo", false));
+    ZooKeeperUtils.ensurePath(zkClient, ZooDefs.Ids.CREATOR_ALL_ACL, "/foo/bar/baz");
+
+    zkClient = createZkClient();
+    zkClient.get().addAuthInfo("digest", "client2:bap".getBytes(Charsets.UTF_8));
+
+    // Anyone can check for existence in ZK
+    assertNotNull(zkClient.get().exists("/foo", false));
+    assertNotNull(zkClient.get().exists("/foo/bar", false));
+    assertNotNull(zkClient.get().exists("/foo/bar/baz", false));
+
+    try {
+      zkClient.get().delete("/foo/bar/baz", -1 /* delete no matter what */);
+      fail("Expected CREATOR_ALL_ACL to be applied to created path and client2 mutations to be "
+           + "rejected");
+    } catch (NoAuthException e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testMagicVersionNumberAllowsUnconditionalUpdate() throws Exception {
+    String nodePath = "/foo";
+    ZooKeeperClient zkClient = createZkClient();
+
+    zkClient.get().create(nodePath, "init".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+        CreateMode.PERSISTENT);
+
+    Stat initStat = new Stat();
+    byte[] initialData = zkClient.get().getData(nodePath, false, initStat);
+    assertArrayEquals("init".getBytes(), initialData);
+
+    // bump the version
+    Stat rev1Stat = zkClient.get().setData(nodePath, "rev1".getBytes(), initStat.getVersion());
+
+    try {
+      zkClient.get().setData(nodePath, "rev2".getBytes(), initStat.getVersion());
+      fail("expected correct version to be required");
+    } catch (BadVersionException e) {
+      // expected
+    }
+
+    // expect using the correct version to work
+    Stat rev2Stat = zkClient.get().setData(nodePath, "rev2".getBytes(), rev1Stat.getVersion());
+    assertNotEquals(ZooKeeperUtils.ANY_VERSION, rev2Stat.getVersion());
+
+    zkClient.get().setData(nodePath, "force-write".getBytes(), ZooKeeperUtils.ANY_VERSION);
+    Stat forceWriteStat = new Stat();
+    byte[] forceWriteData = zkClient.get().getData(nodePath, false, forceWriteStat);
+    assertArrayEquals("force-write".getBytes(), forceWriteData);
+
+    assertTrue(forceWriteStat.getVersion() > rev2Stat.getVersion());
+    assertNotEquals(ZooKeeperUtils.ANY_VERSION, forceWriteStat.getVersion());
+  }
+
+  @Test
+  public void testNormalizingPath() throws Exception {
+    assertEquals("/", ZooKeeperUtils.normalizePath("/"));
+    assertEquals("/foo", ZooKeeperUtils.normalizePath("/foo/"));
+    assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("/foo//bar"));
+    assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("//foo/bar"));
+    assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("/foo/bar/"));
+    assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("/foo/bar//"));
+    assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("/foo/bar"));
+  }
+
+  @Test
+  public void testLenientPaths() {
+    assertEquals("/", ZooKeeperUtils.normalizePath("///"));
+    assertEquals("/a/group", ZooKeeperUtils.normalizePath("/a/group"));
+    assertEquals("/a/group", ZooKeeperUtils.normalizePath("/a/group/"));
+    assertEquals("/a/group", ZooKeeperUtils.normalizePath("/a//group"));
+    assertEquals("/a/group", ZooKeeperUtils.normalizePath("/a//group//"));
+
+    try {
+      ZooKeeperUtils.normalizePath("a/group");
+      fail("Relative paths should not be allowed.");
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+
+    try {
+      ZooKeeperUtils.normalizePath("/a/./group");
+      fail("Relative paths should not be allowed.");
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+
+    try {
+      ZooKeeperUtils.normalizePath("/a/../group");
+      fail("Relative paths should not be allowed.");
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/config/findbugs/excludeFilter.xml
----------------------------------------------------------------------
diff --git a/config/findbugs/excludeFilter.xml b/config/findbugs/excludeFilter.xml
index 5eaa11a..f7d5ae0 100644
--- a/config/findbugs/excludeFilter.xml
+++ b/config/findbugs/excludeFilter.xml
@@ -74,14 +74,6 @@ limitations under the License.
     <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS" />
   </Match>
 
-  <!-- We're forced to use the platform default encoding to generate a byte array from digest
-       credentials because the underlying ZooKeeper API dictates this - also noted in the
-       offending code. -->
-  <Match>
-    <Class name="org.apache.aurora.scheduler.discovery.Credentials" />
-    <Bug pattern="DM_DEFAULT_ENCODING" />
-  </Match>
-
   <!-- Technical debt. -->
   <Match>
     <Class name="org.apache.aurora.scheduler.log.mesos.MesosLog$LogStream" />

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/docs/features/service-discovery.md
----------------------------------------------------------------------
diff --git a/docs/features/service-discovery.md b/docs/features/service-discovery.md
index 511c96d..36823c8 100644
--- a/docs/features/service-discovery.md
+++ b/docs/features/service-discovery.md
@@ -6,7 +6,7 @@ the purpose of service discovery.  ServerSets use the Zookeeper [group membershi
 of which there are several reference implementations:
 
   - [C++](https://github.com/apache/mesos/blob/master/src/zookeeper/group.cpp)
-  - [Java](http://curator.apache.org/curator-recipes/group-member.html)
+  - [Java](https://github.com/twitter/commons/blob/master/src/java/com/twitter/common/zookeeper/ServerSetImpl.java#L221)
   - [Python](https://github.com/twitter/commons/blob/master/src/python/twitter/common/zookeeper/serverset/serverset.py#L51)
 
 These can also be used natively in Finagle using the [ZookeeperServerSetCluster](https://github.com/twitter/finagle/blob/master/finagle-serversets/src/main/scala/com/twitter/finagle/zookeeper/ZookeeperServerSetCluster.scala).

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/docs/reference/scheduler-configuration.md
----------------------------------------------------------------------
diff --git a/docs/reference/scheduler-configuration.md b/docs/reference/scheduler-configuration.md
index 8955653..d4e0a9a 100644
--- a/docs/reference/scheduler-configuration.md
+++ b/docs/reference/scheduler-configuration.md
@@ -252,5 +252,7 @@ Optional flags:
 	Launches an embedded zookeeper server for local testing causing -zk_endpoints to be ignored if specified.
 -zk_session_timeout (default (4, secs))
 	The ZooKeeper session timeout.
+-zk_use_curator (default true)
+	DEPRECATED: Uses Apache Curator as the zookeeper client; otherwise a copy of Twitter commons/zookeeper (the legacy library) is used.
 -------------------------------------------------------------------------
 ```

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
index 76209b1..4e354b6 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
@@ -46,8 +46,8 @@ import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.common.util.StateMachine;
 import org.apache.aurora.common.util.StateMachine.Transition;
-import org.apache.aurora.scheduler.discovery.SingletonService;
-import org.apache.aurora.scheduler.discovery.SingletonService.LeaderControl;
+import org.apache.aurora.common.zookeeper.SingletonService;
+import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.mesos.Driver;
@@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory;
 
 import static java.util.Objects.requireNonNull;
 
-import static org.apache.aurora.scheduler.discovery.SingletonService.LeadershipListener;
+import static org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener;
 
 /**
  * The central driver of the scheduler runtime lifecycle.  Handles the transitions from startup and

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
index e0d32de..43cc5b4 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
@@ -42,6 +42,8 @@ import org.apache.aurora.common.args.constraints.NotEmpty;
 import org.apache.aurora.common.args.constraints.NotNull;
 import org.apache.aurora.common.inject.Bindings;
 import org.apache.aurora.common.stats.Stats;
+import org.apache.aurora.common.zookeeper.SingletonService;
+import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener;
 import org.apache.aurora.gen.ServerInfo;
 import org.apache.aurora.scheduler.AppStartup;
 import org.apache.aurora.scheduler.SchedulerLifecycle;
@@ -50,8 +52,6 @@ import org.apache.aurora.scheduler.configuration.executor.ExecutorModule;
 import org.apache.aurora.scheduler.cron.quartz.CronModule;
 import org.apache.aurora.scheduler.discovery.FlaggedZooKeeperConfig;
 import org.apache.aurora.scheduler.discovery.ServiceDiscoveryModule;
-import org.apache.aurora.scheduler.discovery.SingletonService;
-import org.apache.aurora.scheduler.discovery.SingletonService.LeadershipListener;
 import org.apache.aurora.scheduler.events.WebhookModule;
 import org.apache.aurora.scheduler.http.HttpService;
 import org.apache.aurora.scheduler.log.mesos.MesosLogStreamModule;

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java
new file mode 100644
index 0000000..a1329fd
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.app;
+
+import java.io.Closeable;
+import java.util.function.Supplier;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.thrift.ServiceInstance;
+
+/**
+ * Monitors a service group's membership and supplies a live view of the most recent set.
+ */
+public interface ServiceGroupMonitor extends Supplier<ImmutableSet<ServiceInstance>>, Closeable {
+
+  /**
+   * Indicates a problem initiating monitoring of a service group.
+   */
+  class MonitorException extends Exception {
+    public MonitorException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Starts monitoring the service group.
+   *
+   * When the service group membership no longer needs to be maintained, this monitor should be
+   * {@link #close() closed}.
+   *
+   * @throws MonitorException if there is a problem initiating monitoring of the service group.
+   */
+  void start() throws MonitorException;
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java
new file mode 100644
index 0000000..339f63b
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.discovery;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import javax.inject.Singleton;
+
+import com.google.inject.Exposed;
+import com.google.inject.PrivateModule;
+import com.google.inject.Provides;
+
+import org.apache.aurora.common.net.pool.DynamicHostSet;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.zookeeper.ServerSetImpl;
+import org.apache.aurora.common.zookeeper.SingletonService;
+import org.apache.aurora.common.zookeeper.SingletonServiceImpl;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient;
+import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+import org.apache.zookeeper.data.ACL;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Binding module for utilities to advertise the network presence of the scheduler.
+ *
+ * Uses a fork of Twitter commons/zookeeper.
+ */
+class CommonsServiceDiscoveryModule extends PrivateModule {
+
+  private final String discoveryPath;
+  private final ZooKeeperConfig zooKeeperConfig;
+
+  CommonsServiceDiscoveryModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig) {
+    this.discoveryPath = ZooKeeperUtils.normalizePath(discoveryPath);
+    this.zooKeeperConfig = requireNonNull(zooKeeperConfig);
+  }
+
+  @Override
+  protected void configure() {
+    requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY);
+    requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY);
+
+    bind(ServiceGroupMonitor.class).to(CommonsServiceGroupMonitor.class).in(Singleton.class);
+    expose(ServiceGroupMonitor.class);
+  }
+
+  @Provides
+  @Singleton
+  ZooKeeperClient provideZooKeeperClient(
+      @ServiceDiscoveryBindings.ZooKeeper Iterable<InetSocketAddress> zooKeeperCluster) {
+
+    return new ZooKeeperClient(
+        zooKeeperConfig.getSessionTimeout(),
+        zooKeeperConfig.getCredentials(),
+        zooKeeperConfig.getChrootPath(),
+        zooKeeperCluster);
+  }
+
+  @Provides
+  @Singleton
+  ServerSetImpl provideServerSet(
+      ZooKeeperClient client,
+      @ServiceDiscoveryBindings.ZooKeeper List<ACL> zooKeeperAcls) {
+
+    return new ServerSetImpl(client, zooKeeperAcls, discoveryPath);
+  }
+
+  @Provides
+  @Singleton
+  DynamicHostSet<ServiceInstance> provideServerSet(ServerSetImpl serverSet) {
+    // Used for a type re-binding of the server set.
+    return serverSet;
+  }
+
+  // NB: We only take a ServerSetImpl instead of a ServerSet here to simplify binding.
+  @Provides
+  @Singleton
+  @Exposed
+  SingletonService provideSingletonService(
+      ZooKeeperClient client,
+      ServerSetImpl serverSet,
+      @ServiceDiscoveryBindings.ZooKeeper List<ACL> zookeeperAcls) {
+
+    return new SingletonServiceImpl(
+        serverSet,
+        SingletonServiceImpl.createSingletonCandidate(client, discoveryPath, zookeeperAcls));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java
new file mode 100644
index 0000000..9161455
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.discovery;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.inject.Inject;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.net.pool.DynamicHostSet;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+
+import static java.util.Objects.requireNonNull;
+
+class CommonsServiceGroupMonitor implements ServiceGroupMonitor {
+  private Optional<Command> closeCommand = Optional.empty();
+  private final DynamicHostSet<ServiceInstance> serverSet;
+  private final AtomicReference<ImmutableSet<ServiceInstance>> services =
+      new AtomicReference<>(ImmutableSet.of());
+
+  @Inject
+  CommonsServiceGroupMonitor(DynamicHostSet<ServiceInstance> serverSet) {
+    this.serverSet = requireNonNull(serverSet);
+  }
+
+  @Override
+  public void start() throws MonitorException {
+    try {
+      closeCommand = Optional.of(serverSet.watch(services::set));
+    } catch (DynamicHostSet.MonitorException e) {
+      throw new MonitorException("Unable to watch scheduler host set.", e);
+    }
+  }
+
+  @Override
+  public void close() {
+    closeCommand.ifPresent(Command::execute);
+  }
+
+  @Override
+  public ImmutableSet<ServiceInstance> get() {
+    return services.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/Credentials.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/Credentials.java b/src/main/java/org/apache/aurora/scheduler/discovery/Credentials.java
deleted file mode 100644
index 75d58e7..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/Credentials.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import java.util.Arrays;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-
-import org.apache.aurora.common.base.MorePreconditions;
-import org.apache.commons.lang.builder.EqualsBuilder;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Encapsulates a user's ZooKeeper credentials.
- */
-public final class Credentials {
-
-  /**
-   * Creates a set of credentials for the ZooKeeper digest authentication mechanism.
-   *
-   * @param username the username to authenticate with
-   * @param password the password to authenticate with
-   * @return a set of credentials that can be used to authenticate the zoo keeper client
-   */
-  public static Credentials digestCredentials(String username, String password) {
-    MorePreconditions.checkNotBlank(username);
-    Preconditions.checkNotNull(password);
-
-    // TODO(John Sirois): DigestAuthenticationProvider is broken - uses platform default charset
-    // (on server) and so we just have to hope here that clients are deployed in compatible jvms.
-    // Consider writing and installing a version of DigestAuthenticationProvider that controls its
-    // Charset explicitly.
-    return new Credentials("digest", (username + ":" + password).getBytes());
-  }
-
-  private final String authScheme;
-  private final byte[] authToken;
-
-  /**
-   * Creates a new set of credentials for the given ZooKeeper authentication scheme.
-   *
-   * @param scheme The name of the authentication scheme the {@code token} is valid in.
-   * @param token The authentication token for the given {@code scheme}.
-   */
-  public Credentials(String scheme, byte[] token) {
-    authScheme = MorePreconditions.checkNotBlank(scheme);
-    authToken = requireNonNull(token);
-  }
-
-  /**
-   * Returns the authentication scheme these credentials are for.
-   *
-   * @return the scheme these credentials are for.
-   */
-  public String scheme() {
-    return authScheme;
-  }
-
-  /**
-   * Returns the authentication token.
-   *
-   * @return the authentication token.
-   */
-  public byte[] token() {
-    return Arrays.copyOf(authToken, authToken.length);
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (!(o instanceof Credentials)) {
-      return false;
-    }
-
-    Credentials other = (Credentials) o;
-    return new EqualsBuilder()
-        .append(authScheme, other.scheme())
-        .append(authToken, other.token())
-        .isEquals();
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(authScheme, authToken);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java
index e690d14..999a542 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java
@@ -33,6 +33,10 @@ import org.apache.aurora.common.net.InetSocketAddressHelper;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.zookeeper.Credentials;
+import org.apache.aurora.common.zookeeper.ServerSet;
+import org.apache.aurora.common.zookeeper.SingletonService;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -64,7 +68,7 @@ class CuratorServiceDiscoveryModule extends PrivateModule {
     requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY);
     requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY);
 
-    bind(new TypeLiteral<Codec<ServiceInstance>>() { }).toInstance(JsonCodec.INSTANCE);
+    bind(new TypeLiteral<Codec<ServiceInstance>>() { }).toInstance(ServerSet.JSON_CODEC);
   }
 
   @Provides
@@ -102,7 +106,7 @@ class CuratorServiceDiscoveryModule extends PrivateModule {
 
     if (zooKeeperConfig.getCredentials().isPresent()) {
       Credentials credentials = zooKeeperConfig.getCredentials().get();
-      builder.authorization(credentials.scheme(), credentials.token());
+      builder.authorization(credentials.scheme(), credentials.authToken());
     }
 
     CuratorFramework curatorFramework = builder.build();

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java
index db886df..0b86fb6 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java
@@ -24,6 +24,7 @@ import org.apache.aurora.GuavaUtils;
 import org.apache.aurora.common.io.Codec;
 import org.apache.aurora.common.thrift.ServiceInstance;
 import org.apache.aurora.scheduler.app.SchedulerMain;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.utils.ZKPaths;

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java
index 321bbb3..c378172 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java
@@ -27,6 +27,7 @@ import org.apache.aurora.common.io.Codec;
 import org.apache.aurora.common.thrift.Endpoint;
 import org.apache.aurora.common.thrift.ServiceInstance;
 import org.apache.aurora.common.thrift.Status;
+import org.apache.aurora.common.zookeeper.SingletonService;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.leader.LeaderLatch;
 import org.apache.curator.framework.recipes.leader.LeaderLatchListener;

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java b/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java
index d5019bf..e8aafe4 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java
@@ -15,10 +15,10 @@ package org.apache.aurora.scheduler.discovery;
 
 import java.net.InetSocketAddress;
 import java.util.List;
-import java.util.Optional;
 
 import javax.annotation.Nullable;
 
+import com.google.common.base.Optional;
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 
@@ -27,12 +27,23 @@ import org.apache.aurora.common.args.CmdLine;
 import org.apache.aurora.common.args.constraints.NotEmpty;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.Credentials;
+import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A factory that creates a {@link ZooKeeperConfig} instance based on command line argument
  * values.
  */
 public final class FlaggedZooKeeperConfig {
+  private static final Logger LOG = LoggerFactory.getLogger(FlaggedZooKeeperConfig.class);
+
+  @CmdLine(name = "zk_use_curator",
+      help = "DEPRECATED: Uses Apache Curator as the zookeeper client; otherwise a copy of Twitter "
+          + "commons/zookeeper (the legacy library) is used.")
+  private static final Arg<Boolean> USE_CURATOR = Arg.create(true);
+
   @CmdLine(name = "zk_in_proc",
       help = "Launches an embedded zookeeper server for local testing causing -zk_endpoints "
           + "to be ignored if specified.")
@@ -63,9 +74,13 @@ public final class FlaggedZooKeeperConfig {
    * @return Configuration instance.
    */
   public static ZooKeeperConfig create() {
+    if (USE_CURATOR.hasAppliedValue()) {
+      LOG.warn("The -zk_use_curator flag is deprecated and will be removed in a future release.");
+    }
     return new ZooKeeperConfig(
+        USE_CURATOR.get(),
         ZK_ENDPOINTS.get(),
-        Optional.ofNullable(CHROOT_PATH.get()),
+        Optional.fromNullable(CHROOT_PATH.get()),
         IN_PROCESS.get(),
         SESSION_TIMEOUT.get(),
         getCredentials(DIGEST_CREDENTIALS.get()));
@@ -73,7 +88,7 @@ public final class FlaggedZooKeeperConfig {
 
   private static Optional<Credentials> getCredentials(@Nullable String userAndPass) {
     if (userAndPass == null) {
-      return Optional.empty();
+      return Optional.absent();
     }
 
     List<String> parts = ImmutableList.copyOf(Splitter.on(":").split(userAndPass));

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/JsonCodec.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/JsonCodec.java b/src/main/java/org/apache/aurora/scheduler/discovery/JsonCodec.java
deleted file mode 100644
index 9d22b76..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/JsonCodec.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.nio.charset.Charset;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.gson.JsonIOException;
-import com.google.gson.JsonParseException;
-
-import org.apache.aurora.common.io.Codec;
-import org.apache.aurora.common.thrift.Endpoint;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.thrift.Status;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Encodes a {@link ServiceInstance} as a JSON object.
- */
-class JsonCodec implements Codec<ServiceInstance> {
-
-  private static void assertRequiredField(String fieldName, Object fieldValue) {
-    if (fieldValue == null) {
-      throw new JsonParseException(String.format("Field %s is required", fieldName));
-    }
-  }
-
-  private static class EndpointSchema {
-    private final String host;
-    private final Integer port;
-
-    EndpointSchema(Endpoint endpoint) {
-      host = endpoint.getHost();
-      port = endpoint.getPort();
-    }
-
-    Endpoint asEndpoint() {
-      assertRequiredField("host", host);
-      assertRequiredField("port", port);
-
-      return new Endpoint(host, port);
-    }
-  }
-
-  private static class ServiceInstanceSchema {
-    private final EndpointSchema serviceEndpoint;
-    private final Map<String, EndpointSchema> additionalEndpoints;
-    private final Status status;
-    @Nullable private final Integer shard;
-
-    ServiceInstanceSchema(ServiceInstance instance) {
-      serviceEndpoint = new EndpointSchema(instance.getServiceEndpoint());
-      if (instance.isSetAdditionalEndpoints()) {
-        additionalEndpoints =
-            Maps.transformValues(instance.getAdditionalEndpoints(), EndpointSchema::new);
-      } else {
-        additionalEndpoints = ImmutableMap.of();
-      }
-      status  = instance.getStatus();
-      shard = instance.isSetShard() ? instance.getShard() : null;
-    }
-
-    ServiceInstance asServiceInstance() {
-      assertRequiredField("serviceEndpoint", serviceEndpoint);
-      assertRequiredField("status", status);
-
-      Map<String, EndpointSchema> extraEndpoints =
-          additionalEndpoints == null ? ImmutableMap.of() : additionalEndpoints;
-
-      ServiceInstance instance =
-          new ServiceInstance(
-              serviceEndpoint.asEndpoint(),
-              Maps.transformValues(extraEndpoints, EndpointSchema::asEndpoint),
-              status);
-      if (shard != null) {
-        instance.setShard(shard);
-      }
-      return instance;
-    }
-  }
-
-  /**
-   * The encoding for service instance data in ZooKeeper expected by Aurora clients.
-   */
-  static final Codec<ServiceInstance> INSTANCE = new JsonCodec();
-
-  private static final Charset ENCODING = Charsets.UTF_8;
-
-  private final Gson gson;
-
-  private JsonCodec() {
-    this(new Gson());
-  }
-
-  JsonCodec(Gson gson) {
-    this.gson = requireNonNull(gson);
-  }
-
-  @Override
-  public void serialize(ServiceInstance instance, OutputStream sink) throws IOException {
-    Writer writer = new OutputStreamWriter(sink, ENCODING);
-    try {
-      gson.toJson(new ServiceInstanceSchema(instance), writer);
-    } catch (JsonIOException e) {
-      throw new IOException(String.format("Problem serializing %s to JSON", instance), e);
-    }
-    writer.flush();
-  }
-
-  @Override
-  public ServiceInstance deserialize(InputStream source) throws IOException {
-    InputStreamReader reader = new InputStreamReader(source, ENCODING);
-    try {
-      @Nullable ServiceInstanceSchema schema = gson.fromJson(reader, ServiceInstanceSchema.class);
-      if (schema == null) {
-        throw new IOException("JSON did not include a ServiceInstance object");
-      }
-      return schema.asServiceInstance();
-    } catch (JsonParseException e) {
-      throw new IOException("Problem parsing JSON ServiceInstance.", e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java
index b7ca62c..07a6302 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java
@@ -25,14 +25,16 @@ import com.google.common.io.Files;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
+import com.google.inject.Module;
 import com.google.inject.Provider;
 import com.google.inject.Provides;
 import com.google.inject.binder.LinkedBindingBuilder;
 
 import org.apache.aurora.common.application.ShutdownRegistry;
 import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
+import org.apache.aurora.common.zookeeper.testing.ZooKeeperTestServer;
 import org.apache.aurora.scheduler.SchedulerServicesModule;
-import org.apache.aurora.scheduler.discovery.testing.ZooKeeperTestServer;
 import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,15 +46,15 @@ import static java.util.Objects.requireNonNull;
  */
 public class ServiceDiscoveryModule extends AbstractModule {
 
-  private static final Logger LOG = LoggerFactory.getLogger(ServiceDiscoveryModule.class);
+  private static final Logger LOG = LoggerFactory.getLogger(CommonsServiceDiscoveryModule.class);
 
   private final ZooKeeperConfig zooKeeperConfig;
   private final String discoveryPath;
 
   /**
    * Creates a Guice module that will bind a
-   * {@link SingletonService} for scheduler leader election and a
-   * {@link ServiceGroupMonitor} that can be used to find the
+   * {@link org.apache.aurora.common.zookeeper.SingletonService} for scheduler leader election and a
+   * {@link org.apache.aurora.scheduler.app.ServiceGroupMonitor} that can be used to find the
    * leading scheduler.
    *
    * @param zooKeeperConfig The ZooKeeper client configuration to use to interact with ZooKeeper.
@@ -80,7 +82,15 @@ public class ServiceDiscoveryModule extends AbstractModule {
       clusterBinder.toInstance(zooKeeperConfig.getServers());
     }
 
-    install(new CuratorServiceDiscoveryModule(discoveryPath, zooKeeperConfig));
+    install(discoveryModule());
+  }
+
+  private Module discoveryModule() {
+    if (zooKeeperConfig.isUseCurator()) {
+      return new CuratorServiceDiscoveryModule(discoveryPath, zooKeeperConfig);
+    } else {
+      return new CommonsServiceDiscoveryModule(discoveryPath, zooKeeperConfig);
+    }
   }
 
   @Provides

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/ServiceGroupMonitor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceGroupMonitor.java
deleted file mode 100644
index fea896c..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceGroupMonitor.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import java.io.Closeable;
-import java.util.function.Supplier;
-
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.common.thrift.ServiceInstance;
-
-/**
- * Monitors a service group's membership and supplies a live view of the most recent set.
- */
-public interface ServiceGroupMonitor extends Supplier<ImmutableSet<ServiceInstance>>, Closeable {
-
-  /**
-   * Indicates a problem initiating monitoring of a service group.
-   */
-  class MonitorException extends Exception {
-    MonitorException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  /**
-   * Starts monitoring the service group.
-   *
-   * When the service group membership no longer needs to be maintained, this monitor should be
-   * {@link #close() closed}.
-   *
-   * @throws MonitorException if there is a problem initiating monitoring of the service group.
-   */
-  void start() throws MonitorException;
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/SingletonService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/SingletonService.java b/src/main/java/org/apache/aurora/scheduler/discovery/SingletonService.java
deleted file mode 100644
index adbc318..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/SingletonService.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-
-/**
- * A service that uses master election to only allow a single service instance to be active amongst
- * a set of potential servers at a time.
- */
-public interface SingletonService {
-
-  /**
-   * Indicates an error attempting to lead a group of servers.
-   */
-  class LeadException extends Exception {
-    LeadException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  /**
-   * Indicates an error attempting to advertise leadership of a group of servers.
-   */
-  class AdvertiseException extends Exception {
-    AdvertiseException(String message) {
-      super(message);
-    }
-
-    AdvertiseException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  /**
-   * Indicates an error attempting to leave a group of servers, abdicating leadership of the group.
-   */
-  class LeaveException extends Exception {
-    LeaveException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  /**
-   * Attempts to lead the singleton service.
-   *
-   * @param endpoint The primary endpoint to register as a leader candidate in the service.
-   * @param additionalEndpoints Additional endpoints that are available on the host.
-   * @param listener Handler to call when the candidate is elected or defeated.
-   * @throws LeadException If there was a problem joining or watching the ZooKeeper group.
-   * @throws InterruptedException If the thread watching/joining the group was interrupted.
-   */
-  void lead(
-      InetSocketAddress endpoint,
-      Map<String, InetSocketAddress> additionalEndpoints,
-      LeadershipListener listener)
-      throws LeadException, InterruptedException;
-
-  /**
-   * A listener to be notified of changes in the leadership status.
-   * Implementers should be careful to avoid blocking operations in these callbacks.
-   */
-  interface LeadershipListener {
-
-    /**
-     * Notifies the listener that is is current leader.
-     *
-     * @param control A controller handle to advertise and/or leave advertised presence.
-     */
-    void onLeading(LeaderControl control);
-
-    /**
-     * Notifies the listener that it is no longer leader.
-     */
-    void onDefeated();
-  }
-
-  /**
-   * A controller for the state of the leader.  This will be provided to the leader upon election,
-   * which allows the leader to decide when to advertise as leader of the server set and terminate
-   * leadership at will.
-   */
-  interface LeaderControl {
-
-    /**
-     * Advertises the leader's server presence to clients.
-     *
-     * @throws AdvertiseException If there was an error advertising the singleton leader to clients
-     *     of the server set.
-     * @throws InterruptedException If interrupted while advertising.
-     */
-    void advertise() throws AdvertiseException, InterruptedException;
-
-    /**
-     * Leaves candidacy for leadership, removing advertised server presence if applicable.
-     *
-     * @throws LeaveException If the leader's status could not be updated or there was an error
-     *     abdicating server set leadership.
-     */
-    void leave() throws LeaveException;
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java b/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java
index acb7905..3f32a62 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java
@@ -14,11 +14,14 @@
 package org.apache.aurora.scheduler.discovery;
 
 import java.net.InetSocketAddress;
-import java.util.Optional;
+
+import com.google.common.base.Optional;
 
 import org.apache.aurora.common.base.MorePreconditions;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.Credentials;
+import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
 
 import static java.util.Objects.requireNonNull;
 
@@ -32,18 +35,21 @@ public class ZooKeeperConfig {
   /**
    * Creates a new client configuration with defaults for the session timeout and credentials.
    *
+   * @param useCurator {@code true} to use Apache Curator; otherwise commons/zookeeper is used.
    * @param servers ZooKeeper server addresses.
    * @return A new configuration.
    */
-  public static ZooKeeperConfig create(Iterable<InetSocketAddress> servers) {
+  public static ZooKeeperConfig create(boolean useCurator, Iterable<InetSocketAddress> servers) {
     return new ZooKeeperConfig(
+        useCurator,
         servers,
-        Optional.empty(), // chrootPath
+        Optional.absent(), // chrootPath
         false,
         ZooKeeperUtils.DEFAULT_ZK_SESSION_TIMEOUT,
-        Optional.empty()); // credentials
+        Optional.absent()); // credentials
   }
 
+  private final boolean useCurator;
   private final Iterable<InetSocketAddress> servers;
   private final boolean inProcess;
   private final Amount<Integer, Time> sessionTimeout;
@@ -60,12 +66,14 @@ public class ZooKeeperConfig {
    * @param credentials ZooKeeper authentication credentials.
    */
   ZooKeeperConfig(
+      boolean useCurator,
       Iterable<InetSocketAddress> servers,
       Optional<String> chrootPath,
       boolean inProcess,
       Amount<Integer, Time> sessionTimeout,
       Optional<Credentials> credentials) {
 
+    this.useCurator = useCurator;
     this.servers = MorePreconditions.checkNotBlank(servers);
     this.chrootPath = requireNonNull(chrootPath);
     this.inProcess = inProcess;
@@ -82,6 +90,7 @@ public class ZooKeeperConfig {
    */
   public ZooKeeperConfig withCredentials(Credentials newCredentials) {
     return new ZooKeeperConfig(
+        useCurator,
         servers,
         chrootPath,
         inProcess,
@@ -89,6 +98,10 @@ public class ZooKeeperConfig {
         Optional.of(newCredentials));
   }
 
+  boolean isUseCurator() {
+    return useCurator;
+  }
+
   public Iterable<InetSocketAddress> getServers() {
     return servers;
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperUtils.java b/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperUtils.java
deleted file mode 100644
index 211aa50..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperUtils.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import com.google.common.collect.ImmutableList;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.ACL;
-
-/**
- * Utilities for dealing with ZooKeeper.
- */
-final class ZooKeeperUtils {
-
-  /**
-   * An appropriate default session timeout for ZooKeeper clusters.
-   */
-  static final Amount<Integer, Time> DEFAULT_ZK_SESSION_TIMEOUT = Amount.of(4, Time.SECONDS);
-
-  /**
-   * An ACL that gives all permissions any user authenticated or not.
-   */
-  static final ImmutableList<ACL> OPEN_ACL_UNSAFE =
-      ImmutableList.copyOf(Ids.OPEN_ACL_UNSAFE);
-
-  /**
-   * An ACL that gives all permissions to node creators and read permissions only to everyone else.
-   */
-  static final ImmutableList<ACL> EVERYONE_READ_CREATOR_ALL =
-      ImmutableList.<ACL>builder()
-          .addAll(Ids.CREATOR_ALL_ACL)
-          .addAll(Ids.READ_ACL_UNSAFE)
-          .build();
-
-  private ZooKeeperUtils() {
-    // utility
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/testing/BaseZooKeeperTest.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/testing/BaseZooKeeperTest.java b/src/main/java/org/apache/aurora/scheduler/discovery/testing/BaseZooKeeperTest.java
deleted file mode 100644
index d84037e..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/testing/BaseZooKeeperTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery.testing;
-
-import org.apache.aurora.common.testing.TearDownTestCase;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * A base-class for in-process zookeeper tests.
- */
-public abstract class BaseZooKeeperTest extends TearDownTestCase {
-
-  private ZooKeeperTestServer zkTestServer;
-
-  @Rule
-  public TemporaryFolder tmpFolder = new TemporaryFolder();
-
-  @Before
-  public final void setUp() throws Exception {
-    zkTestServer = new ZooKeeperTestServer(tmpFolder.newFolder(), tmpFolder.newFolder());
-    addTearDown(zkTestServer::stop);
-    zkTestServer.startNetwork();
-  }
-
-  /**
-   * Returns the running in-process ZooKeeper server.
-   *
-   * @return The in-process ZooKeeper server.
-   */
-  protected final ZooKeeperTestServer getServer() {
-    return zkTestServer;
-  }
-
-  /**
-   * Returns the current port to connect to the in-process zookeeper instance.
-   */
-  protected final int getPort() {
-    return getServer().getPort();
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/testing/ZooKeeperTestServer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/testing/ZooKeeperTestServer.java b/src/main/java/org/apache/aurora/scheduler/discovery/testing/ZooKeeperTestServer.java
deleted file mode 100644
index a7bb48b..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/testing/ZooKeeperTestServer.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery.testing;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.zookeeper.server.NIOServerCnxnFactory;
-import org.apache.zookeeper.server.ServerCnxnFactory;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.ZooKeeperServer.BasicDataTreeBuilder;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-
-/**
- * A helper class for starting in-process ZooKeeper server and clients.
- *
- * <p>This is ONLY meant to be used for testing.
- */
-public class ZooKeeperTestServer {
-
-  private final File dataDir;
-  private final File snapDir;
-
-  private ZooKeeperServer zooKeeperServer;
-  private ServerCnxnFactory connectionFactory;
-  private int port;
-
-  public ZooKeeperTestServer(File dataDir, File snapDir) {
-    this.dataDir = Preconditions.checkNotNull(dataDir);
-    this.snapDir = Preconditions.checkNotNull(snapDir);
-  }
-
-  /**
-   * Starts zookeeper up on an ephemeral port.
-   */
-  public void startNetwork() throws IOException, InterruptedException {
-    zooKeeperServer =
-        new ZooKeeperServer(
-            new FileTxnSnapLog(dataDir, snapDir),
-            new BasicDataTreeBuilder()) {
-
-          // TODO(John Sirois): Introduce a builder to configure the in-process server if and when
-          // some folks need JMX for in-process tests.
-          @Override protected void registerJMX() {
-            // noop
-          }
-        };
-
-    connectionFactory = new NIOServerCnxnFactory();
-    connectionFactory.configure(
-        new InetSocketAddress(port),
-        60 /* Semi-arbitrary, max 60 connections is the default used by NIOServerCnxnFactory */);
-    connectionFactory.startup(zooKeeperServer);
-    port = zooKeeperServer.getClientPort();
-  }
-
-  /**
-   * Stops the zookeeper server.
-   */
-  public void stop() {
-    if (connectionFactory != null) {
-      connectionFactory.shutdown(); // Also shuts down zooKeeperServer.
-      connectionFactory = null;
-    }
-  }
-
-  /**
-   * Expires the client session with the given {@code sessionId}.
-   *
-   * @param sessionId The id of the client session to expire.
-   */
-  public final void expireClientSession(long sessionId) {
-    zooKeeperServer.closeSession(sessionId);
-  }
-
-  /**
-   * Returns the current port to connect to the in-process zookeeper instance.
-   */
-  public final int getPort() {
-    checkEphemeralPortAssigned();
-    return port;
-  }
-
-  private void checkEphemeralPortAssigned() {
-    Preconditions.checkState(port > 0, "startNetwork must be called first");
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java b/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java
index af8567f..53ebc0b 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java
@@ -64,7 +64,7 @@ import org.apache.aurora.common.net.http.handlers.TimeSeriesDataSource;
 import org.apache.aurora.common.net.http.handlers.VarsHandler;
 import org.apache.aurora.common.net.http.handlers.VarsJsonHandler;
 import org.apache.aurora.scheduler.SchedulerServicesModule;
-import org.apache.aurora.scheduler.discovery.ServiceGroupMonitor.MonitorException;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor.MonitorException;
 import org.apache.aurora.scheduler.http.api.ApiModule;
 import org.apache.aurora.scheduler.http.api.security.HttpSecurityModule;
 import org.apache.aurora.scheduler.thrift.ThriftModule;

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java b/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java
index 662d6d5..bc0e2a8 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java
@@ -27,8 +27,8 @@ import com.google.common.net.HostAndPort;
 
 import org.apache.aurora.common.thrift.Endpoint;
 import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.scheduler.discovery.ServiceGroupMonitor;
-import org.apache.aurora.scheduler.discovery.ServiceGroupMonitor.MonitorException;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor.MonitorException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java b/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java
index c7c0387..6704a32 100644
--- a/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java
@@ -33,8 +33,8 @@ import org.apache.aurora.common.args.CmdLine;
 import org.apache.aurora.common.net.InetSocketAddressHelper;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.Credentials;
 import org.apache.aurora.gen.storage.LogEntry;
-import org.apache.aurora.scheduler.discovery.Credentials;
 import org.apache.aurora.scheduler.discovery.ZooKeeperConfig;
 import org.apache.aurora.scheduler.log.mesos.LogInterface.ReaderInterface;
 import org.apache.aurora.scheduler.log.mesos.LogInterface.WriterInterface;
@@ -157,7 +157,7 @@ public class MesosLogStreamModule extends PrivateModule {
           zkClientConfig.getSessionTimeout().getUnit().getTimeUnit(),
           zkLogGroupPath,
           zkCredentials.scheme(),
-          zkCredentials.token());
+          zkCredentials.authToken());
     } else {
       return new Log(
           QUORUM_SIZE.get(),

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
index 4324ea9..051c520 100644
--- a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
@@ -21,9 +21,9 @@ import org.apache.aurora.common.application.ShutdownRegistry;
 import org.apache.aurora.common.base.Command;
 import org.apache.aurora.common.base.ExceptionalCommand;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl;
+import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener;
 import org.apache.aurora.scheduler.SchedulerLifecycle.DelayedActions;
-import org.apache.aurora.scheduler.discovery.SingletonService.LeaderControl;
-import org.apache.aurora.scheduler.discovery.SingletonService.LeadershipListener;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
 import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.storage.Storage.StorageException;

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index 84d7753..29a3b4a 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -17,7 +17,6 @@ import java.io.File;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -44,6 +43,10 @@ import org.apache.aurora.GuavaUtils;
 import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
 import org.apache.aurora.common.application.Lifecycle;
 import org.apache.aurora.common.stats.Stats;
+import org.apache.aurora.common.zookeeper.Credentials;
+import org.apache.aurora.common.zookeeper.ServerSetImpl;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
 import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduleStatus;
@@ -60,11 +63,8 @@ import org.apache.aurora.scheduler.AppStartup;
 import org.apache.aurora.scheduler.TierModule;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
-import org.apache.aurora.scheduler.discovery.Credentials;
 import org.apache.aurora.scheduler.discovery.ServiceDiscoveryModule;
-import org.apache.aurora.scheduler.discovery.ServiceGroupMonitor;
 import org.apache.aurora.scheduler.discovery.ZooKeeperConfig;
-import org.apache.aurora.scheduler.discovery.testing.BaseZooKeeperTest;
 import org.apache.aurora.scheduler.log.Log;
 import org.apache.aurora.scheduler.log.Log.Entry;
 import org.apache.aurora.scheduler.log.Log.Position;
@@ -107,9 +107,10 @@ import static org.easymock.EasyMock.createControl;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class SchedulerIT extends BaseZooKeeperTest {
+public class SchedulerIT extends BaseZooKeeperClientTest {
 
   private static final Logger LOG = LoggerFactory.getLogger(SchedulerIT.class);
 
@@ -144,6 +145,7 @@ public class SchedulerIT extends BaseZooKeeperTest {
   private Stream logStream;
   private StreamMatcher streamMatcher;
   private EntrySerializer entrySerializer;
+  private ZooKeeperClient zkClient;
   private File backupDir;
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -171,9 +173,11 @@ public class SchedulerIT extends BaseZooKeeperTest {
     entrySerializer = new EntrySerializer.EntrySerializerImpl(
         LogStorageModule.MAX_LOG_ENTRY_SIZE.get(),
         Hashing.md5());
+
+    zkClient = createZkClient();
   }
 
-  private Callable<Void> startScheduler() throws Exception {
+  private void startScheduler() throws Exception {
     // TODO(wfarner): Try to accomplish all this by subclassing SchedulerMain and actually using
     // AppLauncher.
     Module testModule = new AbstractModule() {
@@ -198,8 +202,10 @@ public class SchedulerIT extends BaseZooKeeperTest {
     };
     ZooKeeperConfig zkClientConfig =
         ZooKeeperConfig.create(
+            true, // useCurator
             ImmutableList.of(InetSocketAddress.createUnresolved("localhost", getPort())))
             .withCredentials(Credentials.digestCredentials("mesos", "mesos"));
+    SchedulerMain main = SchedulerMain.class.newInstance();
     Injector injector = Guice.createInjector(
         ImmutableList.<Module>builder()
             .add(SchedulerMain.getUniversalModule())
@@ -209,8 +215,8 @@ public class SchedulerIT extends BaseZooKeeperTest {
             .add(testModule)
             .build()
     );
-    SchedulerMain main = new SchedulerMain();
     injector.injectMembers(main);
+    Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
 
     executor.submit(() -> {
       try {
@@ -220,32 +226,28 @@ public class SchedulerIT extends BaseZooKeeperTest {
         executor.shutdownNow();
       }
     });
-
-    Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
     addTearDown(() -> {
       lifecycle.shutdown();
       MoreExecutors.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
     });
-
     injector.getInstance(Key.get(GuavaUtils.ServiceManagerIface.class, AppStartup.class))
         .awaitHealthy();
+  }
 
-    ServiceGroupMonitor schedulerMonitor = injector.getInstance(ServiceGroupMonitor.class);
-    CountDownLatch schedulerReady = new CountDownLatch(1);
+  private void awaitSchedulerReady() throws Exception {
     executor.submit(() -> {
-      while (schedulerMonitor.get().isEmpty()) {
-        try {
-          Thread.sleep(10);
-        } catch (InterruptedException e) {
-          throw new RuntimeException(e);
+      ServerSetImpl schedulerService = new ServerSetImpl(zkClient, SERVERSET_PATH);
+      final CountDownLatch schedulerReady = new CountDownLatch(1);
+      schedulerService.watch(hostSet -> {
+        if (!hostSet.isEmpty()) {
+          schedulerReady.countDown();
         }
-      }
-      schedulerReady.countDown();
-    });
-    return () -> {
-      schedulerReady.await();
+      });
+      // A timeout is used because certain types of assertion errors (mocks) will not surface
+      // until the main test thread exits this body of code.
+      assertTrue(schedulerReady.await(5L, TimeUnit.MINUTES));
       return null;
-    };
+    }).get();
   }
 
   private final AtomicInteger curPosition = new AtomicInteger();
@@ -330,14 +332,14 @@ public class SchedulerIT extends BaseZooKeeperTest {
     expect(driver.stop(true)).andReturn(Status.DRIVER_STOPPED).anyTimes();
 
     control.replay();
-    Callable<Void> awaitSchedulerReady = startScheduler();
+    startScheduler();
 
     driverStarted.await();
     scheduler.getValue().registered(driver,
         FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(),
         MasterInfo.getDefaultInstance());
 
-    awaitSchedulerReady.call();
+    awaitSchedulerReady();
 
     assertEquals(0L, Stats.<Long>getVariable("task_store_PENDING").read().longValue());
     assertEquals(1L, Stats.<Long>getVariable("task_store_ASSIGNED").read().longValue());

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java
new file mode 100644
index 0000000..d90192b
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.discovery;
+
+import java.net.InetSocketAddress;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.testing.TearDownTestCase;
+import org.apache.aurora.common.zookeeper.Credentials;
+import org.apache.aurora.common.zookeeper.SingletonService;
+import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+
+abstract class AbstractDiscoveryModuleTest extends TearDownTestCase {
+
+  @Test
+  public void testBindingContract() {
+    ZooKeeperConfig zooKeeperConfig =
+        new ZooKeeperConfig(
+            isCurator(),
+            ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42)),
+            Optional.of("/chroot"),
+            false, // inProcess
+            Amount.of(1, Time.DAYS),
+            Optional.of(Credentials.digestCredentials("test", "user")));
+
+    Injector injector =
+        Guice.createInjector(
+            new AbstractModule() {
+              @Override
+              protected void configure() {
+                bind(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY)
+                    .toInstance(
+                        ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42)));
+                bind(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY)
+                    .toInstance(ZooKeeperUtils.OPEN_ACL_UNSAFE);
+
+                bindExtraRequirements(binder());
+              }
+            },
+            createModule("/discovery/path", zooKeeperConfig));
+
+    assertNotNull(injector.getBinding(SingletonService.class).getProvider().get());
+    assertNotNull(injector.getBinding(ServiceGroupMonitor.class).getProvider().get());
+  }
+
+  void bindExtraRequirements(Binder binder) {
+    // Noop.
+  }
+
+  abstract Module createModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig);
+
+  abstract boolean isCurator();
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java
index 9f86add..a2b4125 100644
--- a/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java
@@ -21,10 +21,13 @@ import java.util.function.Predicate;
 
 import com.google.common.collect.ImmutableMap;
 
+import org.apache.aurora.common.io.Codec;
 import org.apache.aurora.common.thrift.Endpoint;
 import org.apache.aurora.common.thrift.ServiceInstance;
 import org.apache.aurora.common.thrift.Status;
-import org.apache.aurora.scheduler.discovery.testing.BaseZooKeeperTest;
+import org.apache.aurora.common.zookeeper.ServerSet;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
@@ -35,6 +38,7 @@ class BaseCuratorDiscoveryTest extends BaseZooKeeperTest {
 
   static final String GROUP_PATH = "/group/root";
   static final String MEMBER_TOKEN = "member_";
+  static final Codec<ServiceInstance> CODEC = ServerSet.JSON_CODEC;
   static final int PRIMARY_PORT = 42;
 
   private CuratorFramework client;
@@ -51,7 +55,7 @@ class BaseCuratorDiscoveryTest extends BaseZooKeeperTest {
     groupCache.getListenable().addListener((c, event) -> groupEvents.put(event));
 
     Predicate<String> memberSelector = name -> name.contains(MEMBER_TOKEN);
-    groupMonitor = new CuratorServiceGroupMonitor(groupCache, memberSelector, JsonCodec.INSTANCE);
+    groupMonitor = new CuratorServiceGroupMonitor(groupCache, memberSelector, ServerSet.JSON_CODEC);
   }
 
   final CuratorFramework startNewClient() {
@@ -97,7 +101,7 @@ class BaseCuratorDiscoveryTest extends BaseZooKeeperTest {
 
   final byte[] serialize(ServiceInstance serviceInstance) throws IOException {
     ByteArrayOutputStream sink = new ByteArrayOutputStream();
-    JsonCodec.INSTANCE.serialize(serviceInstance, sink);
+    CODEC.serialize(serviceInstance, sink);
     return sink.toByteArray();
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java
new file mode 100644
index 0000000..7a4c4dd
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.discovery;
+
+import com.google.inject.Module;
+
+public class CommonsDiscoveryModuleTest extends AbstractDiscoveryModuleTest {
+
+  @Override
+  Module createModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig) {
+    return new CommonsServiceDiscoveryModule(discoveryPath, zooKeeperConfig);
+  }
+
+  @Override
+  boolean isCurator() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java
new file mode 100644
index 0000000..42a2224
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.discovery;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.net.pool.DynamicHostSet;
+import org.apache.aurora.common.net.pool.DynamicHostSet.HostChangeMonitor;
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+import org.easymock.Capture;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class CommonsServiceGroupMonitorTest extends EasyMockTest {
+
+  private DynamicHostSet<ServiceInstance> serverSet;
+  private Capture<HostChangeMonitor<ServiceInstance>> hostChangeMonitorCapture;
+  private Command stopCommand;
+
+  @Before
+  public void setUp() throws Exception {
+    serverSet = createMock(new Clazz<DynamicHostSet<ServiceInstance>>() { });
+    hostChangeMonitorCapture = createCapture();
+    stopCommand = createMock(Command.class);
+  }
+
+  private void expectSuccessfulWatch() throws Exception {
+    expect(serverSet.watch(capture(hostChangeMonitorCapture))).andReturn(stopCommand);
+  }
+
+  private void expectFailedWatch() throws Exception {
+    DynamicHostSet.MonitorException watchError =
+        new DynamicHostSet.MonitorException(
+            "Problem watching service group",
+            new RuntimeException());
+    expect(serverSet.watch(capture(hostChangeMonitorCapture))).andThrow(watchError);
+  }
+
+  @Test
+  public void testNominalLifecycle() throws Exception {
+    expectSuccessfulWatch();
+
+    stopCommand.execute();
+    expectLastCall();
+
+    control.replay();
+
+    CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet);
+    groupMonitor.start();
+    groupMonitor.close();
+  }
+
+  @Test
+  public void testExceptionalLifecycle() throws Exception {
+    expectFailedWatch();
+    control.replay();
+
+    CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet);
+    try {
+      groupMonitor.start();
+      fail();
+    } catch (ServiceGroupMonitor.MonitorException e) {
+      // expected
+    }
+
+    // Close on a non-started monitor should be allowed.
+    groupMonitor.close();
+  }
+
+  @Test
+  public void testNoHosts() throws Exception {
+    expectSuccessfulWatch();
+    control.replay();
+
+    CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet);
+    assertEquals(ImmutableSet.of(), groupMonitor.get());
+
+    groupMonitor.start();
+    assertEquals(ImmutableSet.of(), groupMonitor.get());
+
+    hostChangeMonitorCapture.getValue().onChange(ImmutableSet.of());
+    assertEquals(ImmutableSet.of(), groupMonitor.get());
+  }
+
+  @Test
+  public void testHostUpdates() throws Exception {
+    expectSuccessfulWatch();
+    control.replay();
+
+    CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet);
+    groupMonitor.start();
+
+    ImmutableSet<ServiceInstance> twoHosts =
+        ImmutableSet.of(serviceInstance("one"), serviceInstance("two"));
+    hostChangeMonitorCapture.getValue().onChange(twoHosts);
+    assertEquals(twoHosts, groupMonitor.get());
+
+    ImmutableSet<ServiceInstance> oneHost = ImmutableSet.of(serviceInstance("one"));
+    hostChangeMonitorCapture.getValue().onChange(oneHost);
+    assertEquals(oneHost, groupMonitor.get());
+
+    ImmutableSet<ServiceInstance> anotherHost = ImmutableSet.of(serviceInstance("three"));
+    hostChangeMonitorCapture.getValue().onChange(anotherHost);
+    assertEquals(anotherHost, groupMonitor.get());
+
+    ImmutableSet<ServiceInstance> noHosts = ImmutableSet.of();
+    hostChangeMonitorCapture.getValue().onChange(noHosts);
+    assertEquals(noHosts, groupMonitor.get());
+  }
+
+  private ServiceInstance serviceInstance(String hostName) {
+    return new ServiceInstance(new Endpoint(hostName, 42), ImmutableMap.of(), Status.ALIVE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java
index 4ebda5e..f1a02e4 100644
--- a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java
@@ -13,27 +13,37 @@
  */
 package org.apache.aurora.scheduler.discovery;
 
-import java.net.InetSocketAddress;
-import java.util.Optional;
-
 import com.google.common.collect.ImmutableList;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
+import com.google.inject.Binder;
+import com.google.inject.Module;
 
 import org.apache.aurora.common.application.ShutdownRegistry;
 import org.apache.aurora.common.application.ShutdownRegistry.ShutdownRegistryImpl;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.testing.TearDownTestCase;
+import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
 import org.apache.curator.framework.api.ACLProvider;
 import org.apache.zookeeper.data.ACL;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 
-public class CuratorDiscoveryModuleTest extends TearDownTestCase {
+public class CuratorDiscoveryModuleTest extends AbstractDiscoveryModuleTest {
+
+  @Override
+  void bindExtraRequirements(Binder binder) {
+    ShutdownRegistryImpl shutdownRegistry = new ShutdownRegistryImpl();
+    binder.bind(ShutdownRegistry.class).toInstance(shutdownRegistry);
+    addTearDown(shutdownRegistry::execute);
+  }
+
+  @Override
+  Module createModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig) {
+    return new CuratorServiceDiscoveryModule(discoveryPath, zooKeeperConfig);
+  }
+
+  @Override
+  boolean isCurator() {
+    return false;
+  }
 
   @Test
   public void testSingleACLProvider() {
@@ -54,36 +64,4 @@ public class CuratorDiscoveryModuleTest extends TearDownTestCase {
   public void testSingleACLProviderEmpty() {
     new CuratorServiceDiscoveryModule.SingleACLProvider(ImmutableList.of());
   }
-
-  @Test
-  public void testBindingContract() {
-    ZooKeeperConfig zooKeeperConfig =
-        new ZooKeeperConfig(
-            ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42)),
-            Optional.of("/chroot"),
-            false, // inProcess
-            Amount.of(1, Time.DAYS),
-            Optional.of(Credentials.digestCredentials("test", "user")));
-
-    Injector injector =
-        Guice.createInjector(
-            new AbstractModule() {
-              @Override
-              protected void configure() {
-                bind(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY)
-                    .toInstance(
-                        ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42)));
-                bind(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY)
-                    .toInstance(ZooKeeperUtils.OPEN_ACL_UNSAFE);
-
-                ShutdownRegistryImpl shutdownRegistry = new ShutdownRegistryImpl();
-                binder().bind(ShutdownRegistry.class).toInstance(shutdownRegistry);
-                addTearDown(shutdownRegistry::execute);
-              }
-            },
-            new CuratorServiceDiscoveryModule("/discovery/path", zooKeeperConfig));
-
-    assertNotNull(injector.getBinding(SingletonService.class).getProvider().get());
-    assertNotNull(injector.getBinding(ServiceGroupMonitor.class).getProvider().get());
-  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java
index bb3d080..6ea49b0 100644
--- a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java
@@ -19,6 +19,7 @@ import java.util.concurrent.CountDownLatch;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
+import org.apache.aurora.common.zookeeper.SingletonService;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.easymock.Capture;
@@ -56,7 +57,7 @@ public class CuratorSingletonServiceTest extends BaseCuratorDiscoveryTest {
       throws Exception {
 
     CuratorSingletonService singletonService =
-        new CuratorSingletonService(client, GROUP_PATH, MEMBER_TOKEN, JsonCodec.INSTANCE);
+        new CuratorSingletonService(client, GROUP_PATH, MEMBER_TOKEN, CODEC);
     InetSocketAddress leaderEndpoint = InetSocketAddress.createUnresolved(hostName, PRIMARY_PORT);
     singletonService.lead(leaderEndpoint, ImmutableMap.of(), listener);
   }